MIF_E31222656/lib/screens/community/services/message_service.dart

780 lines
24 KiB
Dart
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import 'package:flutter/foundation.dart';
import 'package:supabase_flutter/supabase_flutter.dart';
import 'dart:async';
import 'dart:io';
import 'package:tugas_akhir_supabase/screens/community/models/message.dart';
class MessageLoadResult {
final List<Message> messages;
final bool hasMore;
MessageLoadResult({required this.messages, required this.hasMore});
}
class MessageSendResult {
final bool success;
final String? errorMessage;
final Message? message;
MessageSendResult({required this.success, this.errorMessage, this.message});
}
class MessageService {
final _supabase = Supabase.instance.client;
final Map<String, Set<String>> _messageReadReceipts = {};
// Message-specific timers
Timer? _refreshTimer;
Timer? _readStatusTimer;
Timer? _cleanupTimer;
RealtimeChannel? _subscription;
final int _pageSize = 20;
// Cache
final Map<String, String> _emailCache = {};
final Map<String, String> _usernameCache = {};
final Map<String, String> _profilePictureCache = {};
// Getters
String? get currentUserId => _supabase.auth.currentUser?.id;
// Initialize
void setupRefreshTimer({required Function() onRefresh}) {
_refreshTimer?.cancel();
_refreshTimer = Timer.periodic(const Duration(minutes: 2), (_) {
onRefresh();
});
}
void setupReadStatusTimer({required Function() onUpdate}) {
_readStatusTimer?.cancel();
_readStatusTimer = Timer.periodic(const Duration(seconds: 30), (_) {
onUpdate();
});
}
void setupCleanupTimer({required Function() onCleanup}) {
_cleanupTimer?.cancel();
_cleanupTimer = Timer.periodic(const Duration(hours: 24), (_) {
onCleanup();
});
}
// Cleanup
void dispose() {
_subscription?.unsubscribe();
_refreshTimer?.cancel();
_readStatusTimer?.cancel();
_cleanupTimer?.cancel();
}
// Real-time subscription
void setupMessagesSubscription(
Function(Message) onNewMessage,
Function(String, String) onReadStatusUpdate,
) {
try {
// Unsubscribe if already subscribed
_subscription?.unsubscribe();
print('[DEBUG] Setting up real-time subscription');
// Subscribe to read receipts if available
_subscribeToReadReceipts(onReadStatusUpdate);
// Subscribe to messages
_subscription = _supabase
.channel('public:community_messages')
.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
table: 'community_messages',
callback: (payload) {
print('[DEBUG] Received real-time update: ${payload.eventType}');
try {
final data = payload.newRecord;
// Skip messages older than 30 days
final messageDate = DateTime.parse(
data['created_at'] ?? DateTime.now().toIso8601String(),
);
if (messageDate.isBefore(
DateTime.now().subtract(Duration(days: 30)),
)) {
return;
}
final senderId = data['sender_id'] as String? ?? '';
if (senderId.isEmpty) return;
// Get cached user info if available
String senderEmail = _emailCache[senderId] ?? '';
String senderUsername = _usernameCache[senderId] ?? '';
String? avatarUrl = _profilePictureCache[senderId];
// Use email from message if available
if (senderEmail.isEmpty && data['sender_email'] != null) {
senderEmail = data['sender_email'] as String;
_emailCache[senderId] = senderEmail;
}
// Derive username if needed
if (senderUsername.isEmpty) {
if (senderEmail.isNotEmpty) {
senderUsername = senderEmail.split('@')[0];
} else {
senderUsername =
'user-${senderId.substring(0, senderId.length.clamp(0, 6))}';
}
}
// Parse reply data
String? replyToId = data['reply_to_id'] as String?;
String? replyToContent = data['reply_to_content'] as String?;
String? replyToSenderEmail =
data['reply_to_sender_email'] as String?;
final message = Message(
id:
data['id'] as String? ??
'msg-${DateTime.now().millisecondsSinceEpoch}',
content: data['content'] as String? ?? '',
senderEmail: senderEmail,
senderUsername: senderUsername,
senderUserId: senderId,
imageUrl: data['image_url'] as String?,
createdAt: messageDate,
replyToId: replyToId,
replyToContent: replyToContent,
replyToSenderEmail: replyToSenderEmail,
avatarUrl: avatarUrl,
);
// Notify callback
onNewMessage(message);
// Fetch profile in background if needed
if (avatarUrl == null) {
_fetchUserProfile(senderId)
.then((profile) {
if (profile != null && profile['avatar_url'] != null) {
_profilePictureCache[senderId] =
profile['avatar_url'];
// Update username if available
if (profile['username'] != null) {
_usernameCache[senderId] = profile['username'];
}
}
})
.catchError((e) {
print(
'[ERROR] Error fetching profile in background: $e',
);
});
}
} catch (e) {
print('[ERROR] Error processing real-time message: $e');
}
},
)
.subscribe((status, error) {
if (error != null) {
print('[ERROR] Subscription error: $error');
// Try to reconnect after a delay
Future.delayed(const Duration(seconds: 5), () {
setupMessagesSubscription(onNewMessage, onReadStatusUpdate);
});
} else {
print('[DEBUG] Subscription status: $status');
}
});
} catch (e) {
print('[ERROR] Failed to setup subscription: $e');
// Try to reconnect after a delay
Future.delayed(const Duration(seconds: 5), () {
setupMessagesSubscription(onNewMessage, onReadStatusUpdate);
});
}
}
// Subscribe to read receipts
void _subscribeToReadReceipts(Function(String, String) onReadStatusUpdate) {
try {
_supabase
.channel('public:message_read_receipts')
.onPostgresChanges(
event: PostgresChangeEvent.insert,
schema: 'public',
table: 'message_read_receipts',
callback: (payload) {
try {
final data = payload.newRecord;
final messageId = data['message_id'] as String?;
final userId = data['user_id'] as String?;
if (messageId != null && userId != null) {
// Update local read status
final readers = _messageReadReceipts[messageId] ?? <String>{};
readers.add(userId);
_messageReadReceipts[messageId] = readers;
// Notify callback
onReadStatusUpdate(messageId, userId);
}
} catch (e) {
print('[ERROR] Error processing read receipt: $e');
}
},
)
.subscribe();
} catch (e) {
print('[INFO] Could not subscribe to read receipts: $e');
}
}
// Delete message
Future<bool> deleteMessage(Message message) async {
try {
final currentUserId = _supabase.auth.currentUser?.id;
if (currentUserId == null || message.senderUserId != currentUserId) {
throw Exception('Not authorized to delete this message');
}
await _supabase
.from('community_messages')
.delete()
.eq('id', message.id)
.eq('sender_id', currentUserId);
return true;
} catch (e) {
print('[ERROR] Failed to delete message: $e');
rethrow;
}
}
// Load messages
Future<MessageLoadResult> loadMessages({
bool forceRefresh = false,
bool loadNew = false,
required List<Message> existingMessages,
}) async {
print(
'[DEBUG] Loading messages (forceRefresh: $forceRefresh, loadNew: $loadNew)',
);
try {
// Filter out expired messages
final filteredMessages = _filterExpiredMessages(existingMessages);
// If loading new messages and we have existing messages
if (loadNew && filteredMessages.isNotEmpty) {
final newestTimestamp =
filteredMessages.first.createdAt.toIso8601String();
final response = await _supabase
.from('community_messages')
.select('*')
.gte('created_at', newestTimestamp)
.order('created_at', ascending: false);
print('[DEBUG] Got ');
final newMessages = await _processMessagesFromResponse(response);
return MessageLoadResult(messages: newMessages, hasMore: true);
}
// Initial load or refresh, get the first page
final response = await _supabase
.from('community_messages')
.select('*')
.order('created_at', ascending: false)
.limit(_pageSize);
print('[DEBUG] Got ');
final messages = await _processMessagesFromResponse(response);
return MessageLoadResult(
messages: messages,
hasMore: messages.length >= _pageSize,
);
} catch (e) {
print('[ERROR] Failed to load messages: $e');
rethrow;
}
}
// Load more messages (pagination)
Future<MessageLoadResult> loadMoreMessages(
List<Message> existingMessages,
) async {
if (existingMessages.isEmpty) {
return MessageLoadResult(messages: [], hasMore: false);
}
try {
// Get the oldest message timestamp
final oldestMessageDate =
existingMessages.last.createdAt.toIso8601String();
// Query messages older than the oldest message we have
final response = await _supabase
.from('community_messages')
.select('*')
.lt('created_at', oldestMessageDate)
.order('created_at', ascending: false)
.limit(_pageSize);
final newMessages = await _processMessagesFromResponse(response);
// Check if we have more messages to load
final hasMore = newMessages.length >= _pageSize;
return MessageLoadResult(messages: newMessages, hasMore: hasMore);
} catch (e) {
print('[ERROR] Failed to load more messages: $e');
rethrow;
}
}
// Search messages
Future<List<Message>> searchMessages(String query) async {
try {
// Use case-insensitive search
final response = await _supabase
.from('community_messages')
.select('*')
.ilike('content', '%$query%')
.order('created_at', ascending: false)
.limit(50);
print('[DEBUG] Found ${response.length} search results');
return _processMessagesFromResponse(response);
} catch (e) {
print('[ERROR] Failed to search messages: $e');
rethrow;
}
}
// Send message
Future<MessageSendResult> sendMessage({
required String? text,
required File? imageFile,
required Message? replyToMessage,
required String? currentUsername,
required String? currentEmail,
required Function(Message) onOptimisticUpdate,
}) async {
final messageText = text?.trim() ?? '';
if (messageText.isEmpty && imageFile == null) {
return MessageSendResult(
success: false,
errorMessage: 'No content to send',
);
}
try {
// Get current user ID
final userId = _supabase.auth.currentUser?.id;
if (userId == null) {
throw Exception('User not logged in');
}
final userEmail = currentEmail ?? _supabase.auth.currentUser?.email ?? '';
// Generate ID
final timestamp = DateTime.now().millisecondsSinceEpoch;
final messageId =
'msg-$timestamp-${userId.substring(0, userId.length.clamp(0, 6))}';
print('[DEBUG] Sending message: $messageId');
// Upload image if available
String? imageUrl;
if (imageFile != null) {
print('[DEBUG] Uploading image for message: $messageId');
imageUrl = await _uploadImage(imageFile);
print('[DEBUG] Image uploaded: $imageUrl');
}
// Create optimistic message
final optimisticMessage = Message(
id: messageId,
content: messageText,
senderEmail: userEmail,
senderUsername: currentUsername ?? userEmail.split('@')[0],
senderUserId: userId,
imageUrl: imageUrl,
createdAt: DateTime.now(),
replyToId: replyToMessage?.id,
replyToContent: replyToMessage?.content,
replyToSenderEmail: replyToMessage?.senderEmail,
avatarUrl: _profilePictureCache[userId],
);
// Trigger optimistic update
onOptimisticUpdate(optimisticMessage);
// Prepare message data
final messageData = optimisticMessage.toMap();
// Insert to database
print('[DEBUG] Saving message to database');
bool saveSuccess = false;
try {
// First try with all data including reply fields
await _supabase.from('community_messages').insert(messageData);
print('[DEBUG] Message saved successfully');
saveSuccess = true;
} catch (e) {
print('[ERROR] Failed to save message: $e');
// If the message has reply data, try without it
if (replyToMessage != null) {
print('[DEBUG] Retrying without reply data');
// Remove reply fields
final retryData = Map<String, dynamic>.from(messageData);
retryData.remove('reply_to_id');
retryData.remove('reply_to_content');
retryData.remove('reply_to_sender_email');
try {
await _supabase.from('community_messages').insert(retryData);
print('[DEBUG] Message saved without reply data');
saveSuccess = true;
} catch (retryError) {
print('[ERROR] Retry also failed: $retryError');
rethrow;
}
} else {
rethrow;
}
}
return MessageSendResult(
success: saveSuccess,
message: optimisticMessage,
);
} catch (e) {
print('[ERROR] Failed to send message: $e');
return MessageSendResult(success: false, errorMessage: e.toString());
}
}
// Upload image
Future<String?> _uploadImage(File imageFile) async {
try {
final userId = _supabase.auth.currentUser!.id;
final timestamp = DateTime.now().millisecondsSinceEpoch;
final filePath = '$userId-$timestamp.jpg';
// Upload to 'chat-images' bucket
await _supabase.storage.from('chat-images').upload(filePath, imageFile);
// Get public URL
final imageUrl = _supabase.storage
.from('chat-images')
.getPublicUrl(filePath);
return imageUrl;
} catch (e) {
print('[ERROR] Failed to upload image: $e');
rethrow;
}
}
// Delete old messages (automatic cleanup)
Future<int> deleteOldMessages() async {
try {
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
final cutoffDateStr = cutoffDate.toIso8601String();
print(
'[INFO] Deleting messages older than 30 days (before $cutoffDateStr)',
);
final result = await _supabase
.from('community_messages')
.delete()
.lt('created_at', cutoffDateStr)
.select('id');
print('[INFO] Deleted ${result.length} old messages');
return result.length;
} catch (e) {
print('[ERROR] Failed to delete old messages: $e');
return 0;
}
}
// Filter out expired messages
List<Message> _filterExpiredMessages(List<Message> messages) {
if (messages.isEmpty) return [];
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
return messages
.where((message) => message.createdAt.isAfter(cutoffDate))
.toList();
}
// Process messages from database response
Future<List<Message>> _processMessagesFromResponse(List<dynamic> data) async {
final List<Message> messages = [];
final cutoffDate = DateTime.now().subtract(const Duration(days: 30));
// Create a map of pending profile fetches to avoid duplicate requests
final Map<String, Future<Map<String, dynamic>?>> pendingProfileFetches = {};
for (var item in data) {
try {
final senderId = item['sender_id'] as String? ?? '';
if (senderId.isEmpty) continue;
// Skip old messages
final messageDate = DateTime.parse(
item['created_at'] ?? DateTime.now().toIso8601String(),
);
if (messageDate.isBefore(cutoffDate)) continue;
// Get cached user info
String senderEmail = _emailCache[senderId] ?? '';
String senderUsername = _usernameCache[senderId] ?? '';
String? avatarUrl = _profilePictureCache[senderId];
// If not in cache, prepare to fetch
if ((senderUsername.isEmpty || avatarUrl == null) &&
!pendingProfileFetches.containsKey(senderId)) {
pendingProfileFetches[senderId] = _fetchUserProfile(senderId);
}
// Use email from message if available
if (senderEmail.isEmpty && item['sender_email'] != null) {
senderEmail = item['sender_email'] as String;
_emailCache[senderId] = senderEmail;
}
// Use derived username if needed
if (senderUsername.isEmpty && senderEmail.isNotEmpty) {
senderUsername = senderEmail.split('@')[0];
_usernameCache[senderId] = senderUsername;
} else if (senderUsername.isEmpty) {
senderUsername =
'user-${senderId.substring(0, senderId.length.clamp(0, 6))}';
}
// Parse reply data
String? replyToId = item['reply_to_id'] as String?;
String? replyToContent = item['reply_to_content'] as String?;
String? replyToSenderEmail = item['reply_to_sender_email'] as String?;
// Check if we have read receipts for this message
final messageId = item['id'] as String? ?? '';
final readers = _messageReadReceipts[messageId] ?? <String>{};
final isRead = readers.isNotEmpty;
final message = Message(
id: messageId,
content: item['content'] as String? ?? '',
senderEmail: senderEmail,
senderUsername: senderUsername,
senderUserId: senderId,
imageUrl: item['image_url'] as String?,
createdAt: messageDate,
replyToId: replyToId,
replyToContent: replyToContent,
replyToSenderEmail: replyToSenderEmail,
avatarUrl: avatarUrl,
isRead: isRead,
);
messages.add(message);
} catch (e) {
print('[ERROR] Failed to process message: $e');
}
}
// Sort messages by creation date (newest first)
messages.sort((a, b) => b.createdAt.compareTo(a.createdAt));
// Start fetching profiles in background
_fetchProfiles(pendingProfileFetches);
return messages;
}
// Fetch user profiles
void _fetchProfiles(
Map<String, Future<Map<String, dynamic>?>> pendingProfileFetches,
) {
for (final entry in pendingProfileFetches.entries) {
final userId = entry.key;
final future = entry.value;
future
.then((profile) {
if (profile != null) {
if (profile['username'] != null) {
_usernameCache[userId] = profile['username'];
}
if (profile['avatar_url'] != null) {
_profilePictureCache[userId] = profile['avatar_url'];
}
}
})
.catchError((_) {});
}
}
// Fetch a single user profile
Future<Map<String, dynamic>?> _fetchUserProfile(String userId) async {
try {
final profile =
await _supabase
.from('profiles')
.select('username, avatar_url')
.eq('user_id', userId)
.maybeSingle();
return profile;
} catch (e) {
print('[ERROR] Failed to fetch user profile: $e');
return null;
}
}
// Read status handling
void markVisibleMessagesAsRead(List<Message> visibleMessages) {
if (visibleMessages.isEmpty || currentUserId == null) return;
final messagesToUpdate = <String>[];
// Find messages that aren't from current user and aren't marked as read
for (final message in visibleMessages) {
// Skip own messages
if (message.senderUserId == currentUserId) continue;
// Check if user has already read the message
final readers = _messageReadReceipts[message.id] ?? <String>{};
if (!readers.contains(currentUserId)) {
messagesToUpdate.add(message.id);
// Update local read status
readers.add(currentUserId!);
_messageReadReceipts[message.id] = readers;
}
}
// Update read status in database
if (messagesToUpdate.isNotEmpty) {
_updateReadStatusInDatabase(messagesToUpdate, currentUserId!);
}
}
Future<void> _updateReadStatusInDatabase(
List<String> messageIds,
String userId,
) async {
if (messageIds.isEmpty) return;
try {
// Check if table exists
bool tableExists = false;
try {
await _supabase.from('message_read_receipts').select('count').limit(1);
tableExists = true;
} catch (e) {
print('[INFO] Read receipts table might not exist: $e');
}
if (tableExists) {
// Prepare batch of read receipts
final List<Map<String, dynamic>> readReceipts = [];
for (final messageId in messageIds) {
readReceipts.add({
'message_id': messageId,
'user_id': userId,
'read_at': DateTime.now().toIso8601String(),
});
}
// Insert read receipts
await _supabase
.from('message_read_receipts')
.upsert(readReceipts, onConflict: 'message_id,user_id');
print('[DEBUG] Updated read status for ${messageIds.length} messages');
} else {
print("[INFO] Read receipts table doesn't exist, skipping update");
}
} catch (e) {
print('[ERROR] Failed to update read status: $e');
}
}
Future<void> fetchReadReceipts(List<Message> messages) async {
if (messages.isEmpty) return;
try {
// Check if table exists
bool tableExists = false;
try {
await _supabase.from('message_read_receipts').select('count').limit(1);
tableExists = true;
} catch (e) {
print('[INFO] Read receipts table might not exist: $e');
return;
}
if (tableExists) {
// Get all message IDs
final List<String> messageIds = messages.map((m) => m.id).toList();
// Fetch read receipts
final response = await _supabase
.from('message_read_receipts')
.select('message_id, user_id')
.filter('message_id', 'in', messageIds);
// Process read receipts
for (final receipt in response) {
final messageId = receipt['message_id'] as String;
final userId = receipt['user_id'] as String;
// Update local tracking
final readers = _messageReadReceipts[messageId] ?? <String>{};
readers.add(userId);
_messageReadReceipts[messageId] = readers;
}
}
} catch (e) {
print('[ERROR] Failed to fetch read receipts: $e');
}
}
// Check if a message is read by everyone
bool isMessageReadByAll(Message message) {
// Only check read status for own messages
if (message.senderUserId != currentUserId) return false;
final readers = _messageReadReceipts[message.id] ?? <String>{};
// Get all unique user IDs (excluding self)
final Set<String> allUsers = {};
_emailCache.keys.where((id) => id != currentUserId).forEach(allUsers.add);
// If no other users, consider it not read
if (allUsers.isEmpty) return false;
// Check if all users have read the message
return readers.length >= allUsers.length;
}
}