from bson import ObjectId from typing import List, Optional from models import QuizEntity from pymongo.database import Database from pymongo.collection import Collection from datetime import datetime class QuizRepository: def __init__(self, db: Database): self.collection: Collection = db.quiz def create(self, quiz: QuizEntity) -> str: quiz_dict = quiz.dict(by_alias=True, exclude_none=True) result = self.collection.insert_one(quiz_dict) return str(result.inserted_id) def get_by_id(self, quiz_id: str) -> Optional[QuizEntity]: data = self.collection.find_one({"_id": ObjectId(quiz_id)}) if data: return QuizEntity(**data) return None def get_by_ids(self, quiz_ids: List[str]) -> Optional[List[QuizEntity]]: object_ids = [ObjectId(qid) for qid in quiz_ids] cursor = self.collection.find({"_id": {"$in": object_ids}}) datas = list(cursor) print(datas) if not datas: return None return [QuizEntity(**data) for data in datas] def get_by_user_id( self, user_id: str, page: int = 1, page_size: int = 10 ) -> List[QuizEntity]: skip = (page - 1) * page_size cursor = ( self.collection.find({"author_id": user_id}).skip(skip).limit(page_size) ) quiz_list = [] for doc in cursor: if "date" in doc and isinstance(doc["date"], str): try: doc["date"] = datetime.strptime(doc["date"], "%d-%m-%Y") except ValueError: doc["date"] = None quiz_list.append(QuizEntity(**doc)) return quiz_list def get_all(self, skip: int = 0, limit: int = 10) -> List[QuizEntity]: cursor = self.collection.find().skip(skip).limit(limit) return [QuizEntity(**doc) for doc in cursor] def update(self, quiz_id: str, update_data: dict) -> bool: result = self.collection.update_one( {"_id": ObjectId(quiz_id)}, {"$set": update_data} ) return result.modified_count > 0 def update_user_playing(self, quiz_id: str, total_user: int) -> bool: result = self.collection.update_one( {"_id": ObjectId(quiz_id)}, {"$set": {"total_user_playing": total_user}} ) return result.modified_count > 0 def delete(self, quiz_id: str) -> bool: result = self.collection.delete_one({"_id": ObjectId(quiz_id)}) return result.deleted_count > 0 def count_by_user_id(self, user_id: str) -> int: return self.collection.count_documents({"author_id": user_id})