import 'dart:async'; import 'dart:convert'; import 'dart:io'; import 'package:connectivity_plus/connectivity_plus.dart'; import 'package:dio/dio.dart'; import 'package:drift/drift.dart'; import 'package:niogu_app/core/constants/app_url.dart'; import 'package:niogu_app/core/database/app_database.dart'; import 'package:niogu_app/core/enums/sync_status.dart'; import 'package:niogu_app/core/enums/table_type.dart'; import 'package:niogu_app/core/system/system_setting.dart'; import 'package:niogu_app/core/utils/log_message.dart'; import 'package:rxdart/rxdart.dart'; class SyncService { final Dio _dio; final AppDatabase _db; StreamSubscription? _syncSubscription; bool _isProcessing = false; int _retryCount = 0; SyncService(this._dio, this._db); final Map _tablePriority = { 'tenants': 1, 'tenant_payment_methods': 2, 'campaigns': 3, 'outlets': 4, 'outlet_campaigns': 5, 'users': 6, 'units': 7, 'categories': 8, 'products': 9, 'product_variants': 10, 'raw_materials': 11, 'outlet_inventories': 12, 'receipt_items': 13, 'customers': 14, 'customer_addresses': 15, 'product_likes': 16, 'product_reviews': 17, 'suppliers': 18, 'orders': 19, 'purchases': 20, 'transactions': 21, 'order_items': 22, 'purchase_items': 23, 'stock_cards': 24, }; final _imageFields = [ 'server_logo_url', 'server_photo_url', 'server_banner_url', 'server_image_url', 'payment_proof_url', 'product_image_url_snapshot', 'item_image_url_snapshot', ]; Future startWatching() async { _syncSubscription = (_db.select(_db.syncQueues)..orderBy([ (t) => OrderingTerm(expression: t.id, mode: OrderingMode.asc), ])) .watch() .debounceTime(const Duration(seconds: 3)) .listen((queuedItems) async { if (queuedItems.isNotEmpty && !_isProcessing) { await processBatch(); } }); } Future restartUpSync() async { final connectivityResult = await Connectivity().checkConnectivity(); if (connectivityResult.any((con) => con == ConnectivityResult.none)) { throw Exception(); } await (_db.update( _db.syncQueues, )..where((t) => t.retryCount.isBiggerOrEqualValue(5))).write( SyncQueuesCompanion( status: Value(SyncStatus.dirty.status), retryCount: Value(0), ), ); } Future processBatch() async { LogMessage.log.i("=== Process Batch ==="); final isLoggedIn = await SystemSetting.isLoggedIn(); if (!isLoggedIn) return; final connectivityResult = await Connectivity().checkConnectivity(); if (connectivityResult.any((con) => con == ConnectivityResult.none)) return; if (_isProcessing) return; LogMessage.log.i("=== Start Process === "); _isProcessing = true; final String currentBatchId = DateTime.now().millisecondsSinceEpoch .toString(); try { final rawItems = await (_db.select(_db.syncQueues) ..where( (t) => (t.status.equals(SyncStatus.dirty.status) | t.status.equals(SyncStatus.failed.status)) & t.retryCount.isSmallerThanValue(5), ) ..orderBy([ (t) => OrderingTerm(expression: t.id, mode: OrderingMode.asc), ])) .get(); if (rawItems.isEmpty) { _isProcessing = false; return; } final sortedItems = List.from(rawItems); sortedItems.sort((a, b) { final int priorityA = _tablePriority[a.tableType] ?? 99; final int priorityB = _tablePriority[b.tableType] ?? 99; if (priorityA == priorityB) { return a.id.compareTo(b.id); } return priorityA.compareTo(priorityB); }); final items = sortedItems.take(100).toList(); await _db.batch((batch) { for (var item in items) { batch.update( _db.syncQueues, SyncQueuesCompanion( status: Value(SyncStatus.syncing.status), batchId: Value(currentBatchId), ), where: (t) => t.id.equals(item.id), ); } }); await Future.delayed(const Duration(seconds: 3)); await _handleImageUploads(items); await Future.delayed(const Duration(seconds: 3)); final refreshedItems = await (_db.select( _db.syncQueues, )..where((t) => t.id.isIn(items.map((e) => e.id)))).get(); await Future.delayed(const Duration(seconds: 3)); final batchPayload = { 'batch_id': currentBatchId, 'items': refreshedItems .map((item) => jsonEncode(item.payload)) .toList(), }; LogMessage.log.i("=== Process To Sync Up, Try $_retryCount ==="); final response = await _dio.post(AppUrl.SYNC_UP, data: batchPayload); await Future.delayed(const Duration(seconds: 3)); final report = response.data['report']; await _db.transaction(() async { for (var item in refreshedItems) { final hasError = (report['errors'] as List).any( (e) => e['local_id'] == item.tableLocalId, ); if (!hasError) { final payload = Map.from(item.payload); await _updateMasterSyncStatus( item.tableType, item.tableLocalId, SyncStatus.synced.status, data: payload['data'], ); await (_db.delete( _db.syncQueues, )..where((t) => t.id.equals(item.id))).go(); } } }); LogMessage.log.i("=== Process Sync Up Done ==="); } catch (e, st) { LogMessage.log.e( "=== Process Sync Up Failed ===", error: e, stackTrace: st, ); await _handleBatchFailure(currentBatchId, e.toString()); rethrow; } finally { _isProcessing = false; _retryCount++; } } Future _handleImageUploads(List items) async { for (final item in items) { final payload = Map.from(item.payload); final data = Map.from(payload['data']); bool isChanged = false; for (final field in _imageFields) { if (data.containsKey(field) && data[field] != null) { final imageValue = data[field]; if (imageValue is List) { final List serverPaths = []; for (final path in imageValue) { final pathStr = path.toString(); if (_isLocalPath(pathStr)) { final folderType = _getFolderTypeFromPath(pathStr); final uploaded = await _uploadSingleFile(pathStr, folderType); if (uploaded != null) { serverPaths.add(uploaded); isChanged = true; } else { serverPaths.add(pathStr); } } else { serverPaths.add(pathStr); } } data[field] = serverPaths; } else if (imageValue is String && _isLocalPath(imageValue)) { final folderType = _getFolderTypeFromPath(imageValue); final uploaded = await _uploadSingleFile(imageValue, folderType); if (uploaded != null) { data[field] = uploaded; isChanged = true; } } } } if (isChanged) { payload['data'] = data; await (_db.update(_db.syncQueues)..where((t) => t.id.equals(item.id))) .write(SyncQueuesCompanion(payload: Value(payload))); LogMessage.log.i( "Payload updated with server paths for item: ${item.tableLocalId}", ); } } } bool _isLocalPath(String path) { return path.startsWith('/') || path.contains('app_flutter') || path.contains('cache'); } String _getFolderTypeFromPath(String path) { if (path.contains('product_category_images')) { return 'product_category_images'; } else if (path.contains('product_images')) { return 'product_images'; } else if (path.contains('product_variant_images')) { return 'product_variant_images'; } else if (path.contains('payment_proof_orders')) { return 'payment_proof_orders'; } else if (path.contains('payment_proof_purchases')) { return 'payment_proof_purchases'; } else if (path.contains('outlet_banner_images')) { return 'outlet_banner_images'; } else if(path.contains('campaign_images')) { return 'campaign_images'; } else if(path.contains('logo_images')) { return 'logo_images'; } return 'others'; } Future _uploadSingleFile(String localPath, String folderType) async { try { final file = File(localPath); if (!await file.exists()) { LogMessage.log.e("File fisik tidak ditemukan: $localPath"); return null; } final fileName = localPath.split('/').last; final formData = FormData.fromMap({ "type": folderType, "image": await MultipartFile.fromFile(localPath, filename: fileName), }); final response = await _dio.post(AppUrl.UPLOAD_IMAGE, data: formData); if (response.statusCode == 200) { return response.data['data']['server_path']; } return null; } catch (e) { LogMessage.log.e("Gagal upload ke $folderType: $e"); return null; } } Future _updateMasterSyncStatus( String tableName, String localId, String status, { Map? data, }) async { String setClause = 'SET sync_status = ?, server_id = ?'; List variables = [Variable(status), Variable(localId)]; if (data != null) { for (var field in _imageFields) { if (data.containsKey(field) && data[field] != null) { setClause += ', $field = ?'; final value = data[field] is List ? jsonEncode(data[field]) : data[field]; variables.add(Variable(value)); } } } variables.add(Variable(localId)); await _db.customUpdate( 'UPDATE $tableName $setClause WHERE local_id = ?', variables: variables, updates: {_getTableSet(TableType.values.byName(tableName))}, ); } ResultSetImplementation _getTableSet(TableType type) { switch (type) { case TableType.campaigns: return _db.campaigns; case TableType.categories: return _db.categories; case TableType.customers: return _db.customers; case TableType.order_items: return _db.orderItems; case TableType.orders: return _db.orders; case TableType.outlet_campaigns: return _db.outletCampaigns; case TableType.outlet_inventories: return _db.outletInventories; case TableType.outlets: return _db.outlets; case TableType.product_likes: return _db.productLikes; case TableType.product_reviews: return _db.productReviews; case TableType.product_variants: return _db.productVariants; case TableType.products: return _db.products; case TableType.purchase_items: return _db.purchaseItems; case TableType.purchases: return _db.purchases; case TableType.raw_materials: return _db.rawMaterials; case TableType.receipt_items: return _db.receiptItems; case TableType.stock_cards: return _db.stockCards; case TableType.suppliers: return _db.suppliers; case TableType.tenant_payment_methods: return _db.tenantPaymentMethods; case TableType.tenants: return _db.tenants; case TableType.transactions: return _db.transactions; case TableType.units: return _db.units; case TableType.users: return _db.users; } } Future _handleBatchFailure(String batchId, String errorMessage) async { await Future.delayed(const Duration(milliseconds: 500)); try { await _db.transaction(() async { final items = await (_db.select( _db.syncQueues, )..where((t) => t.batchId.equals(batchId))).get(); for (var item in items) { final isPermanentFailure = item.retryCount >= 5; await (_db.update( _db.syncQueues, )..where((t) => t.id.equals(item.id))).write( SyncQueuesCompanion( status: Value( isPermanentFailure ? SyncStatus.failed.status : SyncStatus.dirty.status, ), batchId: const Value(null), retryCount: Value(item.retryCount + 1), lastError: Value(errorMessage), ), ); } }); LogMessage.log.e( "Batch $batchId failed. Items reverted to dirty for retry. with error $errorMessage", ); } catch (e, st) { LogMessage.log.e( "Critical error in _handleBatchFailure: ${e.toString()}", error: e, stackTrace: st, ); rethrow; } } void stopWatching() => _syncSubscription?.cancel(); }