TIF_E41211115_Genso_quiz_ba.../app/controllers/socket_conroller.py

198 lines
6.6 KiB
Python

from flask_socketio import SocketIO, emit, join_room, leave_room
from flask import request
from app.services import SessionService
import threading
import time
class SocketController:
def __init__(self, socketio: SocketIO, session_service: SessionService):
self.socketio = socketio
self.session_service = session_service
self._register_events()
def _register_events(self):
@self.socketio.on("connect")
def on_connect():
print(f"Client connected: {request.sid}")
emit("connection_response", {"status": "connected", "sid": request.sid})
@self.socketio.on("disconnect")
def on_disconnect():
print(f"Client disconnected: {request.sid}")
@self.socketio.on("join_room")
def handle_join_room(data):
session_code = data["session_code"]
user_id = data["user_id"]
if not session_code or not user_id:
emit("error", {"message": "session_code and user_id are required"})
return
session = self.session_service.join_session(
session_code=session_code, user_id=user_id
)
if session is None:
emit("error", {"message": "Failed to join session or session inactive"})
return
join_room(session_code)
if session["is_admin"] == True:
emit(
"room_message",
{
"message": f"admin has joined the room.",
"room": session_code,
"argument": "adm_update",
},
room=session_code,
)
return
emit(
"room_message",
{
"message": f"user {session['username']} has joined the room.",
"room": session_code,
"argument": "adm_update",
"data": session,
},
room=session_code,
)
@self.socketio.on("submit_answer")
def handle_submit_answer(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
question_index = data.get("question_index")
answer = data.get("answer")
if not all([session_id, user_id, question_index is not None, answer]):
emit(
"error",
{
"message": "session_id, user_id, question_index, and answer are required"
},
)
return
print(f"User {user_id} answered question {question_index} with {answer}")
# TODO: kamu bisa menyimpan jawaban ke database di sini
# self.answer_service.save_answer(session_id, user_id, question_index, answer)
# Kirim notifikasi ke admin (host) atau semua peserta kalau perlu
emit(
"answer_submitted",
{
"user_id": user_id,
"question_index": question_index,
"answer": answer,
},
room=session_id,
)
@self.socketio.on("leave_room")
def handle_leave_room(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
username = data.get("username", "anonymous")
leave_room(session_id)
emit(
"room_message",
{"message": f"{username} has left the room.", "room": session_id},
room=session_id,
)
@self.socketio.on("send_message")
def on_send_message(data):
session_id = data.get("session_id")
message = data.get("message")
username = data.get("username", "anonymous")
emit(
"receive_message",
{"message": message, "from": username},
room=session_id,
)
@self.socketio.on("end_session")
def handle_end_session(data):
session_id = data.get("session_id")
user_id = data.get("user_id")
if not session_id or not user_id:
emit("error", {"message": "session_id and user_id required"})
return
self.session_service.end_session(session_id=session_id, user_id=user_id)
emit(
"room_closed",
{"message": "Session has ended.", "room": session_id},
room=session_id,
)
@self.socketio.on("start_quiz")
def handle_start_quiz(data):
session_code = data.get("session_code")
if not session_code:
emit("error", {"message": "session_code is required"})
return
emit("quiz_started", {"message": "Quiz has started!"}, room=session_code)
# Jalankan thread untuk mengirim soal simulasi setiap 5 detik
threading.Thread(
target=self._simulate_quiz_flow, args=(session_code,)
).start()
def _simulate_quiz_flow(self, session_code):
questions = [
{
"question_index": 0,
"question": "Apa ibu kota Indonesia?",
"type": "option",
"options": ["Jakarta", "Bandung", "Surabaya"],
},
{
"question_index": 1,
"question": "2 + 2 = ?",
"type": "option",
"options": ["3", "4", "5"],
},
{
"question_index": 2,
"question": "Siapa presiden pertama Indonesia?",
"type": "option",
"options": ["Sukarno", "Soeharto", "Jokowi"],
},
{
"question_index": 3,
"question": "Tuliskan nama lengkap presiden pertama Indonesia.",
"type": "fill_in_the_blank",
"options": [],
},
{
"question_index": 4,
"question": "Indonesia merdeka pada tahun 1945.",
"type": "true_false",
"options": [],
},
]
for q in questions:
print(f"Sending question {q['question_index']} to {session_code}")
self.socketio.emit("quiz_question", q, room=session_code)
time.sleep(20)
# send true ansewr
time.sleep(5)
# Setelah selesai semua soal, kirim command bahwa quiz selesai
self.socketio.emit(
"quiz_done", {"message": "Quiz has ended!"}, room=session_code
)