MIF_E31222656/lib/screens/community/services/message_service.dart

1107 lines
36 KiB
Dart

import 'dart:math';
import 'package:flutter/foundation.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'dart:async';
import 'dart:io';
import 'package:tugas_akhir_supabase/screens/community/models/message.dart';
import 'package:uuid/uuid.dart';
class MessageLoadResult {
final List<Message> messages;
final bool hasMore;
MessageLoadResult({required this.messages, required this.hasMore});
}
class MessageSendResult {
final bool success;
final String? errorMessage;
final Message? message;
MessageSendResult({required this.success, this.errorMessage, this.message});
}
class MessageService {
final _supabase = Supabase.instance.client;
final Map<String, Set<String>> _messageReadReceipts = {};
// Message-specific timers
Timer? _refreshTimer;
Timer? _readStatusTimer;
Timer? _cleanupTimer;
RealtimeChannel? _subscription;
final int _pageSize = 20;
// Cache
final Map<String, String> _emailCache = {};
final Map<String, String> _usernameCache = {};
final Map<String, String> _profilePictureCache = {};
// State
List<Message> _cachedMessages = [];
DateTime? _lastFetch;
String? _currentUserId;
StreamSubscription? _messagesSubscription;
// Constants
static const int _fetchLimit = 30;
static const Duration _refreshInterval = Duration(seconds: 30);
static const Duration _readUpdateInterval = Duration(seconds: 15);
// Getters
String? get currentUserId {
_currentUserId ??= _supabase.auth.currentUser?.id;
return _currentUserId;
}
// Initialize
void setupRefreshTimer({required Function() onRefresh}) {
_refreshTimer?.cancel();
_refreshTimer = Timer.periodic(const Duration(minutes: 2), (_) {
onRefresh();
});
}
void setupReadStatusTimer({required Function() onUpdate}) {
_readStatusTimer?.cancel();
_readStatusTimer = Timer.periodic(const Duration(seconds: 30), (_) {
onUpdate();
});
}
void setupCleanupTimer({required Function() onCleanup}) {
_cleanupTimer?.cancel();
_cleanupTimer = Timer.periodic(const Duration(hours: 24), (_) {
onCleanup();
});
}
// Cleanup
void dispose() {
_subscription?.unsubscribe();
_refreshTimer?.cancel();
_readStatusTimer?.cancel();
_cleanupTimer?.cancel();
}
// Real-time subscription
void setupMessagesSubscription(
Function(Message) onNewMessage,
Function(String, String) onReadStatusUpdate,
) {
try {
// Unsubscribe if already subscribed
_subscription?.unsubscribe();
print('[DEBUG] Setting up real-time subscription');
// Subscribe to read receipts if available
_subscribeToReadReceipts(onReadStatusUpdate);
// Subscribe to messages
_subscription = _supabase
.channel('public:community_messages')
.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
table: 'community_messages',
callback: (payload) {
print('[DEBUG] Received real-time update: ${payload.eventType}');
try {
final data = payload.newRecord;
// Skip messages older than 30 days
final messageDate = DateTime.parse(
data['created_at'] ?? DateTime.now().toIso8601String(),
);
if (messageDate.isBefore(
DateTime.now().subtract(Duration(days: 30)),
)) {
return;
}
final senderId = data['sender_id'] as String? ?? '';
if (senderId.isEmpty) return;
// Get cached user info if available
String senderEmail = _emailCache[senderId] ?? '';
String senderUsername = _usernameCache[senderId] ?? '';
String? avatarUrl = _profilePictureCache[senderId];
// Use email from message if available
if (senderEmail.isEmpty && data['sender_email'] != null) {
senderEmail = data['sender_email'] as String;
_emailCache[senderId] = senderEmail;
}
// Derive username if needed
if (senderUsername.isEmpty) {
if (senderEmail.isNotEmpty) {
senderUsername = senderEmail.split('@')[0];
} else {
senderUsername =
'user-${senderId.substring(0, senderId.length.clamp(0, 6))}';
}
}
// Parse reply data
String? replyToId = data['reply_to_id'] as String?;
String? replyToContent = data['reply_to_content'] as String?;
String? replyToSenderEmail =
data['reply_to_sender_email'] as String?;
String? replyToSenderUsername =
data['reply_to_sender_username'] as String?;
// If replyToSenderUsername is not available but email is, derive username from email
if (replyToSenderUsername == null &&
replyToSenderEmail != null &&
replyToSenderEmail.isNotEmpty) {
replyToSenderUsername = replyToSenderEmail.split('@')[0];
}
final message = Message(
id:
data['id'] as String? ??
'msg-${DateTime.now().millisecondsSinceEpoch}',
content: data['content'] as String? ?? '',
senderEmail: senderEmail,
senderUsername: senderUsername,
senderUserId: senderId,
imageUrl: data['image_url'] as String?,
createdAt: messageDate,
replyToId: replyToId,
replyToContent: replyToContent,
replyToSenderEmail: replyToSenderEmail,
replyToSenderUsername: replyToSenderUsername,
avatarUrl: avatarUrl,
);
// Notify callback
onNewMessage(message);
// Fetch profile in background if needed
if (avatarUrl == null) {
_fetchUserProfile(senderId)
.then((profile) {
if (profile != null && profile['avatar_url'] != null) {
_profilePictureCache[senderId] =
profile['avatar_url'];
// Update username if available
if (profile['username'] != null) {
_usernameCache[senderId] = profile['username'];
}
}
})
.catchError((e) {
print(
'[ERROR] Error fetching profile in background: $e',
);
});
}
} catch (e) {
print('[ERROR] Error processing real-time message: $e');
}
},
)
.subscribe((status, error) {
if (error != null) {
print('[ERROR] Subscription error: $error');
// Try to reconnect after a delay
Future.delayed(const Duration(seconds: 5), () {
setupMessagesSubscription(onNewMessage, onReadStatusUpdate);
});
} else {
print('[DEBUG] Subscription status: $status');
}
});
} catch (e) {
print('[ERROR] Failed to setup subscription: $e');
// Try to reconnect after a delay
Future.delayed(const Duration(seconds: 5), () {
setupMessagesSubscription(onNewMessage, onReadStatusUpdate);
});
}
}
// Subscribe to read receipts
void _subscribeToReadReceipts(Function(String, String) onReadStatusUpdate) {
try {
_supabase
.channel('public:message_read_receipts')
.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
table: 'message_read_receipts',
callback: (payload) {
try {
final data = payload.newRecord;
final messageId = data['message_id'] as String?;
final userId = data['user_id'] as String?;
if (messageId != null && userId != null) {
// Update local read status
final readers = _messageReadReceipts[messageId] ?? <String>{};
readers.add(userId);
_messageReadReceipts[messageId] = readers;
// Notify callback
onReadStatusUpdate(messageId, userId);
}
} catch (e) {
print('[ERROR] Error processing read receipt: $e');
}
},
)
.subscribe();
} catch (e) {
print('[INFO] Could not subscribe to read receipts: $e');
}
}
// Delete message
Future<bool> deleteMessage(Message message) async {
try {
final currentUserId = _supabase.auth.currentUser?.id;
if (currentUserId == null || message.senderUserId != currentUserId) {
throw Exception('Not authorized to delete this message');
}
await _supabase
.from('community_messages')
.delete()
.eq('id', message.id)
.eq('sender_id', currentUserId);
return true;
} catch (e) {
print('[ERROR] Failed to delete message: $e');
rethrow;
}
}
// Load messages
Future<MessageLoadResult> loadMessages({
bool forceRefresh = false,
bool loadNew = false,
List<Message>? existingMessages,
String? groupId,
}) async {
try {
// Check if we should use cache
if (!forceRefresh && !loadNew && _cachedMessages.isNotEmpty) {
return MessageLoadResult(messages: _cachedMessages, hasMore: true);
}
// For loading new messages, use the most recent as reference
DateTime? since;
if (loadNew && existingMessages != null && existingMessages.isNotEmpty) {
// Sort to find the most recent
final sortedMessages = List<Message>.from(existingMessages)
..sort((a, b) => b.createdAt.compareTo(a.createdAt));
since = sortedMessages.first.createdAt;
}
// Build the query - use dynamic to avoid type conflicts
final queryBase = _supabase.from('community_messages').select();
// Apply filters and ordering dynamically
final query =
loadNew && since != null
? queryBase
.gte('created_at', since.toIso8601String())
.order('created_at', ascending: false)
: queryBase
.limit(_fetchLimit)
.order('created_at', ascending: false);
// Execute query with timeout
final data = await query.timeout(
const Duration(seconds: 5),
onTimeout: () {
throw TimeoutException('Database query timed out');
},
);
// Process results
List<Message> messages = [];
if (data.isNotEmpty) {
for (final item in data) {
try {
// Extract user info for Message.fromMap
final senderId = item['sender_id'] as String? ?? '';
final senderEmail =
item['sender_email'] as String? ?? _emailCache[senderId] ?? '';
final senderUsername =
_usernameCache[senderId] ?? senderEmail.split('@')[0];
final avatarUrl = _profilePictureCache[senderId];
final message = Message.fromMap(
item,
senderEmail: senderEmail,
senderUsername: senderUsername,
avatarUrl: avatarUrl,
);
// Skip messages with conversion issues
if (message.id.isEmpty) continue;
messages.add(message);
} catch (e) {
print('[ERROR] Failed to parse message: $e');
}
}
}
_lastFetch = DateTime.now();
if (!loadNew) {
// Replace cache for normal loads
_cachedMessages = messages;
} else if (loadNew && messages.isNotEmpty) {
// Prepend new messages to cache
final newIds = messages.map((m) => m.id).toSet();
_cachedMessages = [
...messages,
..._cachedMessages.where((m) => !newIds.contains(m.id)),
];
}
// Return results
return MessageLoadResult(
messages: messages,
hasMore: messages.length >= _fetchLimit,
);
} catch (e) {
print('[ERROR] Failed to load messages: $e');
// Return empty result on error but don't throw
return MessageLoadResult(messages: [], hasMore: false);
}
}
// Load more messages (pagination)
Future<MessageLoadResult> loadMoreMessages(
List<Message> existingMessages,
) async {
if (existingMessages.isEmpty) {
return await loadMessages();
}
try {
// Get oldest message timestamp for pagination
final oldestMessage = existingMessages.reduce(
(a, b) => a.createdAt.isBefore(b.createdAt) ? a : b,
);
// Query older messages
final data = await _supabase
.from('community_messages')
.select()
.lte('created_at', oldestMessage.createdAt.toIso8601String())
.order('created_at', ascending: false)
.limit(_fetchLimit)
.timeout(
const Duration(seconds: 5),
onTimeout: () {
throw TimeoutException('Database query timed out');
},
);
// Process results
List<Message> messages = [];
if (data.isNotEmpty) {
for (final item in data) {
try {
// Extract user info for Message.fromMap
final senderId = item['sender_id'] as String? ?? '';
final senderEmail =
item['sender_email'] as String? ?? _emailCache[senderId] ?? '';
final senderUsername =
_usernameCache[senderId] ?? senderEmail.split('@')[0];
final avatarUrl = _profilePictureCache[senderId];
final message = Message.fromMap(
item,
senderEmail: senderEmail,
senderUsername: senderUsername,
avatarUrl: avatarUrl,
);
messages.add(message);
} catch (e) {
print('[ERROR] Failed to parse message: $e');
}
}
}
// Update cache without duplicates
final newIds = messages.map((m) => m.id).toSet();
_cachedMessages = [
..._cachedMessages,
...messages.where(
(m) => !_cachedMessages.map((cm) => cm.id).contains(m.id),
),
];
// Return results
return MessageLoadResult(
messages: messages,
hasMore: messages.length >= _fetchLimit,
);
} catch (e) {
print('[ERROR] Failed to load more messages: $e');
return MessageLoadResult(messages: [], hasMore: false);
}
}
// Search messages
Future<List<Message>> searchMessages(String query) async {
if (query.isEmpty) return [];
try {
// Use ilike for case-insensitive search
final response = await _supabase
.from('community_messages')
.select()
.ilike('content', '%$query%')
.order('created_at', ascending: false)
.limit(50);
List<Message> results = [];
for (final item in response) {
try {
// Extract user info for Message.fromMap
final senderId = item['sender_id'] as String? ?? '';
final senderEmail =
item['sender_email'] as String? ?? _emailCache[senderId] ?? '';
final senderUsername =
_usernameCache[senderId] ?? senderEmail.split('@')[0];
final avatarUrl = _profilePictureCache[senderId];
results.add(
Message.fromMap(
item,
senderEmail: senderEmail,
senderUsername: senderUsername,
avatarUrl: avatarUrl,
),
);
} catch (e) {
print('[ERROR] Failed to parse search result: $e');
}
}
return results;
} catch (e) {
print('[ERROR] Failed to search messages: $e');
return [];
}
}
// Send message
Future<MessageSendResult> sendMessage({
String? text,
File? imageFile,
Message? replyToMessage,
String? currentUsername,
String? currentEmail,
required Function(Message) onOptimisticUpdate,
}) async {
final messageText = text?.trim() ?? '';
// Allow empty text when sending an image, but require at least one (text or image)
if (messageText.isEmpty && imageFile == null) {
return MessageSendResult(
success: false,
errorMessage: 'No content to send',
);
}
try {
// Get current user ID
final userId = _supabase.auth.currentUser?.id;
if (userId == null) {
throw Exception('User not logged in');
}
final userEmail = currentEmail ?? _supabase.auth.currentUser?.email ?? '';
// Generate ID
final timestamp = DateTime.now().millisecondsSinceEpoch;
final messageId =
'msg-$timestamp-${userId.substring(0, userId.length.clamp(0, 6))}';
print('[DEBUG] Sending message: $messageId');
print(
'[DEBUG] Message text: "$messageText", has image: ${imageFile != null}',
);
// Upload image if available
String? imageUrl;
if (imageFile != null) {
try {
print('[DEBUG] Uploading image for message: $messageId');
imageUrl = await _uploadImage(imageFile);
if (imageUrl == null) {
throw Exception('Failed to upload image - URL is null');
}
print('[DEBUG] Image uploaded: $imageUrl');
} catch (e) {
print('[ERROR] Image upload failed: $e');
return MessageSendResult(
success: false,
errorMessage: 'Gagal mengunggah gambar: ${e.toString()}',
);
}
}
// Create optimistic message
final optimisticMessage = Message(
id: messageId,
content: messageText,
senderEmail: userEmail,
senderUsername: currentUsername ?? userEmail.split('@')[0],
senderUserId: userId,
imageUrl: imageUrl,
createdAt: DateTime.now(),
replyToId: replyToMessage?.id,
replyToContent: replyToMessage?.content,
replyToSenderEmail: replyToMessage?.senderEmail,
replyToSenderUsername: replyToMessage?.senderUsername,
avatarUrl: _profilePictureCache[userId],
);
// Trigger optimistic update
onOptimisticUpdate(optimisticMessage);
// Prepare message data
final messageData = optimisticMessage.toMap();
// Ensure content is included even if it's empty (to prevent null values)
if (!messageData.containsKey('content')) {
messageData['content'] = messageText;
}
// Insert to database
print('[DEBUG] Saving message to database: ${messageData.toString()}');
bool saveSuccess = false;
try {
// First try with all data including reply fields
await _supabase.from('community_messages').insert(messageData);
print('[DEBUG] Message saved successfully');
saveSuccess = true;
} catch (e) {
print('[ERROR] Failed to save message: $e');
// If the message has reply data, try without it
if (replyToMessage != null) {
print('[DEBUG] Retrying without reply data');
// Remove reply fields
final retryData = Map<String, dynamic>.from(messageData);
retryData.remove('reply_to_id');
retryData.remove('reply_to_content');
retryData.remove('reply_to_sender_email');
retryData.remove('reply_to_sender_username');
try {
await _supabase.from('community_messages').insert(retryData);
print('[DEBUG] Message saved without reply data');
saveSuccess = true;
} catch (retryError) {
print('[ERROR] Retry also failed: $retryError');
return MessageSendResult(
success: false,
errorMessage: 'Gagal menyimpan pesan: ${retryError.toString()}',
);
}
} else {
return MessageSendResult(
success: false,
errorMessage: 'Gagal menyimpan pesan: ${e.toString()}',
);
}
}
return MessageSendResult(
success: saveSuccess,
message: optimisticMessage,
);
} catch (e) {
print('[ERROR] Failed to send message: $e');
return MessageSendResult(success: false, errorMessage: e.toString());
}
}
// Upload image
Future<String?> _uploadImage(File imageFile) async {
try {
print('[DEBUG] Starting image upload process');
// Check if file exists
if (!await imageFile.exists()) {
print('[ERROR] Image file does not exist: ${imageFile.path}');
throw Exception('File does not exist');
}
final userId = _supabase.auth.currentUser?.id;
if (userId == null) {
print('[ERROR] No authenticated user found');
throw Exception('User not authenticated');
}
final timestamp = DateTime.now().millisecondsSinceEpoch;
final randomPart = Random().nextInt(10000).toString().padLeft(4, '0');
final filePath = '$userId-$timestamp-$randomPart.jpg';
print('[DEBUG] Generated file path: $filePath');
// Verify file size
final fileSize = await imageFile.length();
print(
'[DEBUG] File size: ${(fileSize / 1024 / 1024).toStringAsFixed(2)} MB',
);
if (fileSize > 5 * 1024 * 1024) {
// 5MB
print('[ERROR] File too large: ${fileSize / 1024 / 1024} MB');
throw Exception('Ukuran gambar terlalu besar (maksimal 5MB)');
}
// Try hardcoded bucket first to simplify the process
try {
print('[DEBUG] Attempting direct upload to images bucket');
await _supabase.storage
.from('images')
.upload(
filePath,
imageFile,
fileOptions: const FileOptions(
cacheControl: '3600',
upsert: true,
),
);
final imageUrl = _supabase.storage
.from('images')
.getPublicUrl(filePath);
print('[DEBUG] Successfully uploaded to images bucket: $imageUrl');
return imageUrl;
} catch (e) {
print('[DEBUG] Direct upload to images bucket failed: $e');
// Fall back to trying multiple buckets
}
// Daftar bucket yang akan dicoba, dalam urutan prioritas
final bucketOptions = [
'images',
'avatars',
'community',
'chat-images',
'public', // Tambahkan bucket public jika ada
];
String? imageUrl;
Exception? lastError;
// Log semua bucket yang tersedia
try {
final buckets = await _supabase.storage.listBuckets();
print(
'[DEBUG] Available buckets: ${buckets.map((b) => b.name).join(", ")}',
);
// Prioritaskan bucket yang tersedia
final availableBuckets = buckets.map((b) => b.name).toList();
if (availableBuckets.isNotEmpty) {
// Tambahkan bucket yang tersedia di awal list
bucketOptions.insertAll(
0,
availableBuckets.where((name) => !bucketOptions.contains(name)),
);
print('[DEBUG] Bucket order: ${bucketOptions.join(", ")}');
}
} catch (e) {
print('[WARNING] Failed to get bucket list: $e');
}
// Try each bucket until successful with timeout
for (final bucketName in bucketOptions) {
try {
print('[DEBUG] Attempting upload to bucket: $bucketName');
// Add timeout to prevent hanging
await _supabase.storage
.from(bucketName)
.upload(
filePath,
imageFile,
fileOptions: const FileOptions(
cacheControl: '3600',
upsert: true,
),
)
.timeout(
const Duration(seconds: 15),
onTimeout: () {
print('[WARNING] Upload to $bucketName timed out');
throw TimeoutException('Upload timed out');
},
);
// Get public URL if upload succeeds
imageUrl = _supabase.storage.from(bucketName).getPublicUrl(filePath);
if (imageUrl.isEmpty) {
print('[WARNING] Got empty URL from storage');
throw Exception('Empty URL returned');
}
// Add timestamp parameter to prevent caching
final cacheBuster = DateTime.now().millisecondsSinceEpoch;
imageUrl = '$imageUrl?t=$cacheBuster';
print('[SUCCESS] Upload to $bucketName successful: $imageUrl');
return imageUrl;
} catch (e) {
print('[DEBUG] Upload to bucket $bucketName failed: $e');
lastError = e is Exception ? e : Exception(e.toString());
// Continue to next bucket
continue;
}
}
// If all buckets failed
print('[ERROR] All bucket uploads failed');
throw lastError ?? Exception('No available buckets');
} catch (e) {
print('[ERROR] Image upload failed with exception: $e');
throw Exception('Failed to upload image: ${e.toString()}');
}
}
// Delete old messages (automatic cleanup)
Future<int> deleteOldMessages() async {
try {
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
final cutoffDateStr = cutoffDate.toIso8601String();
print(
'[INFO] Deleting messages older than 30 days (before $cutoffDateStr)',
);
final result = await _supabase
.from('community_messages')
.delete()
.lt('created_at', cutoffDateStr)
.select('id');
print('[INFO] Deleted ${result.length} old messages');
return result.length;
} catch (e) {
print('[ERROR] Failed to delete old messages: $e');
return 0;
}
}
// Filter out expired messages
List<Message> _filterExpiredMessages(List<Message> messages) {
if (messages.isEmpty) return [];
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
return messages
.where((message) => message.createdAt.isAfter(cutoffDate))
.toList();
}
// Process messages from database response
Future<List<Message>> _processMessagesFromResponse(List<dynamic> data) async {
final List<Message> messages = [];
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
// Create a map of pending profile fetches to avoid duplicate requests
final Map<String, Future<Map<String, dynamic>?>> pendingProfileFetches = {};
for (var item in data) {
try {
final senderId = item['sender_id'] as String? ?? '';
if (senderId.isEmpty) continue;
// Skip old messages
final messageDate = DateTime.parse(
item['created_at'] ?? DateTime.now().toIso8601String(),
);
if (messageDate.isBefore(cutoffDate)) continue;
// Get cached user info
String senderEmail = _emailCache[senderId] ?? '';
String senderUsername = _usernameCache[senderId] ?? '';
String? avatarUrl = _profilePictureCache[senderId];
// If not in cache, prepare to fetch
if ((senderUsername.isEmpty || avatarUrl == null) &&
!pendingProfileFetches.containsKey(senderId)) {
pendingProfileFetches[senderId] = _fetchUserProfile(senderId);
}
// Use email from message if available
if (senderEmail.isEmpty && item['sender_email'] != null) {
senderEmail = item['sender_email'] as String;
_emailCache[senderId] = senderEmail;
}
// Use derived username if needed
if (senderUsername.isEmpty && senderEmail.isNotEmpty) {
senderUsername = senderEmail.split('@')[0];
_usernameCache[senderId] = senderUsername;
} else if (senderUsername.isEmpty) {
senderUsername =
'user-${senderId.substring(0, senderId.length.clamp(0, 6))}';
}
// Parse reply data
String? replyToId = item['reply_to_id'] as String?;
String? replyToContent = item['reply_to_content'] as String?;
String? replyToSenderEmail = item['reply_to_sender_email'] as String?;
String? replyToSenderUsername =
item['reply_to_sender_username'] as String?;
// If replyToSenderUsername is not available but email is, derive username from email
if (replyToSenderUsername == null &&
replyToSenderEmail != null &&
replyToSenderEmail.isNotEmpty) {
replyToSenderUsername = replyToSenderEmail.split('@')[0];
}
// Check if we have read receipts for this message
final messageId = item['id'] as String? ?? '';
final readers = _messageReadReceipts[messageId] ?? <String>{};
final isRead = readers.isNotEmpty;
final message = Message(
id: messageId,
content: item['content'] as String? ?? '',
senderEmail: senderEmail,
senderUsername: senderUsername,
senderUserId: senderId,
imageUrl: item['image_url'] as String?,
createdAt: messageDate,
replyToId: replyToId,
replyToContent: replyToContent,
replyToSenderEmail: replyToSenderEmail,
replyToSenderUsername: replyToSenderUsername,
avatarUrl: avatarUrl,
isRead: isRead,
);
messages.add(message);
} catch (e) {
print('[ERROR] Failed to process message: $e');
}
}
// Sort messages by creation date (newest first)
messages.sort((a, b) => b.createdAt.compareTo(a.createdAt));
// Start fetching profiles in background
_fetchProfiles(pendingProfileFetches);
return messages;
}
// Fetch user profiles
void _fetchProfiles(
Map<String, Future<Map<String, dynamic>?>> pendingProfileFetches,
) {
for (final entry in pendingProfileFetches.entries) {
final userId = entry.key;
final future = entry.value;
future
.then((profile) {
if (profile != null) {
if (profile['username'] != null) {
_usernameCache[userId] = profile['username'];
}
if (profile['avatar_url'] != null) {
_profilePictureCache[userId] = profile['avatar_url'];
}
}
})
.catchError((_) {});
}
}
// Fetch a single user profile
Future<Map<String, dynamic>?> _fetchUserProfile(String userId) async {
try {
final profile =
await _supabase
.from('profiles')
.select('username, avatar_url')
.eq('user_id', userId)
.maybeSingle();
return profile;
} catch (e) {
print('[ERROR] Failed to fetch user profile: $e');
return null;
}
}
// Mark visible messages as read
void markVisibleMessagesAsRead(List<Message> messages) {
try {
final userId = _supabase.auth.currentUser?.id;
if (userId == null) return;
// Kumpulkan pesan yang belum dibaca
final unreadMessageIds = <String>[];
for (final message in messages) {
// Skip pesan yang sudah dibaca atau pesan milik sendiri
if (message.senderUserId == userId) continue;
// Cek apakah pesan sudah dibaca
if (!_isMessageReadByUser(message, userId)) {
unreadMessageIds.add(message.id);
}
}
// Mark messages as read using the new function
if (unreadMessageIds.isNotEmpty) {
markMessagesAsRead(unreadMessageIds);
}
} catch (e) {
print('[ERROR] Error marking visible messages as read: $e');
}
}
// Check if a message is read by everyone
bool isMessageReadByAll(Message message) {
// Only check read status for own messages
if (message.senderUserId != currentUserId) return false;
final readers = _messageReadReceipts[message.id] ?? <String>{};
// Get all unique user IDs (excluding self)
final Set<String> allUsers = {};
_emailCache.keys.where((id) => id != currentUserId).forEach(allUsers.add);
// If no other users, consider it not read
if (allUsers.isEmpty) return false;
// Check if all users have read the message
return readers.length >= allUsers.length;
}
// Mark messages as read
Future<void> markMessagesAsRead(List<String> messageIds) async {
if (messageIds.isEmpty) return;
try {
final userId = _supabase.auth.currentUser?.id;
if (userId == null) return;
print('[DEBUG] Marking ${messageIds.length} messages as read');
// Process each message ID individually using the safe function
for (final messageId in messageIds) {
try {
await _supabase.rpc(
'add_message_read_receipt',
params: {
'p_message_id': messageId.toString(), // Ensure it's string
'p_user_id': userId.toString(), // Ensure it's string
},
);
} catch (e) {
print('[ERROR] Failed to mark message $messageId as read: $e');
}
}
print('[DEBUG] Successfully marked messages as read');
} catch (e) {
print('[ERROR] Failed to mark messages as read: $e');
}
}
// Mark a message as read
Future<void> markMessageAsRead(String messageId) async {
if (messageId.isEmpty || _supabase.auth.currentUser == null) return;
try {
// Use the new function name for handling read receipts
await _supabase.rpc(
'add_message_read_receipt',
params: {
'p_message_id': messageId.toString(), // Ensure it's string
'p_user_id':
_supabase.auth.currentUser!.id.toString(), // Ensure it's string
},
);
} catch (e) {
// Print error but don't throw - read status is non-critical
print('[ERROR] Failed to mark message $messageId as read: $e');
}
}
// Cek apakah pesan sudah dibaca oleh user tertentu
bool _isMessageReadByUser(Message message, String userId) {
final readers = _messageReadReceipts[message.id] ?? <String>{};
return readers.contains(userId);
}
// Fetch read receipts for messages
Future<void> fetchReadReceipts(List<Message> messages) async {
if (messages.isEmpty) return;
try {
// Get all message IDs
final List<String> messageIds = messages.map((m) => m.id).toList();
// Jika terlalu banyak message ID, batasi untuk menghindari error
const int maxIdsPerQuery = 50;
// Proses dalam batch jika terlalu banyak
for (int i = 0; i < messageIds.length; i += maxIdsPerQuery) {
final int endIndex =
(i + maxIdsPerQuery < messageIds.length)
? i + maxIdsPerQuery
: messageIds.length;
final List<String> batchIds = messageIds.sublist(i, endIndex);
try {
// Fetch read receipts
final response = await _supabase
.from('read_receipts')
.select('message_id, user_id')
.filter('message_id', 'in', batchIds);
// Process read receipts
for (final receipt in response) {
final messageId = receipt['message_id'] as String;
final userId = receipt['user_id'] as String;
// Update local tracking
final readers = _messageReadReceipts[messageId] ?? <String>{};
readers.add(userId);
_messageReadReceipts[messageId] = readers;
}
print(
'[DEBUG] Fetched read receipts for batch ${i ~/ maxIdsPerQuery + 1}',
);
} catch (e) {
print('[ERROR] Failed to fetch read receipts batch: $e');
}
}
} catch (e) {
print('[ERROR] Failed to fetch read receipts: $e');
}
}
}