feat: adding model question generation
This commit is contained in:
parent
27fc8245da
commit
3dec36d511
|
@ -14,6 +14,14 @@ def create_quiz(controller: QuizController = Provide[Container.quiz_controller])
|
|||
return controller.create_quiz(reqBody)
|
||||
|
||||
|
||||
@quiz_bp.route("/ai", methods=["POST"])
|
||||
@inject
|
||||
def create_quiz_auto(controller: QuizController = Provide[Container.quiz_controller]):
|
||||
|
||||
reqBody = request.get_json()
|
||||
return controller.create_quiz_auto(reqBody)
|
||||
|
||||
|
||||
@quiz_bp.route("/<quiz_id>", methods=["GET"])
|
||||
@inject
|
||||
def get_quiz(
|
||||
|
|
|
@ -2,15 +2,21 @@ import json
|
|||
from pydantic import ValidationError
|
||||
from app.schemas.requests import QuizCreateSchema, UserAnswerSchema
|
||||
from app.schemas.response import QuizCreationResponse
|
||||
from app.services import QuizService, AnswerService
|
||||
from app.services import QuizService, AnswerService, QuestionGenerationService
|
||||
from app.helpers import make_response, make_error_response
|
||||
from app.exception import ValidationException, DataNotFoundException
|
||||
|
||||
|
||||
class QuizController:
|
||||
def __init__(self, quiz_service: QuizService, answer_service: AnswerService):
|
||||
def __init__(
|
||||
self,
|
||||
quiz_service: QuizService,
|
||||
answer_service: AnswerService,
|
||||
question_generate_service: QuestionGenerationService,
|
||||
):
|
||||
self.quiz_service = quiz_service
|
||||
self.answer_service = answer_service
|
||||
self.question_generate_service = question_generate_service
|
||||
|
||||
def get_quiz(self, quiz_id):
|
||||
try:
|
||||
|
@ -121,3 +127,15 @@ class QuizController:
|
|||
)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
||||
def create_quiz_auto(
|
||||
self,
|
||||
reqBody,
|
||||
):
|
||||
try:
|
||||
result = self.question_generate_service.createQuizAutomate(
|
||||
reqBody["sentence"]
|
||||
)
|
||||
return make_response(message="succes labeling", data=result)
|
||||
except Exception as e:
|
||||
return make_error_response(e)
|
||||
|
|
|
@ -5,6 +5,7 @@ from app.repositories import (
|
|||
UserAnswerRepository,
|
||||
SubjectRepository,
|
||||
SessionRepository,
|
||||
NERSRLRepository,
|
||||
)
|
||||
|
||||
from app.services import (
|
||||
|
@ -15,6 +16,7 @@ from app.services import (
|
|||
HistoryService,
|
||||
SubjectService,
|
||||
SessionService,
|
||||
QuestionGenerationService,
|
||||
)
|
||||
|
||||
from app.controllers import (
|
||||
|
@ -41,6 +43,7 @@ class Container(containers.DeclarativeContainer):
|
|||
answer_repository = providers.Factory(UserAnswerRepository, mongo.provided.db)
|
||||
subject_repository = providers.Factory(SubjectRepository, mongo.provided.db)
|
||||
session_repository = providers.Factory(SessionRepository, mongo.provided.db)
|
||||
ner_srl_repository = providers.Factory(NERSRLRepository)
|
||||
|
||||
# services
|
||||
auth_service = providers.Factory(
|
||||
|
@ -83,10 +86,19 @@ class Container(containers.DeclarativeContainer):
|
|||
user_repository,
|
||||
)
|
||||
|
||||
question_generation_service = providers.Factory(
|
||||
QuestionGenerationService, ner_srl_repository
|
||||
)
|
||||
|
||||
# controllers
|
||||
auth_controller = providers.Factory(AuthController, user_service, auth_service)
|
||||
user_controller = providers.Factory(UserController, user_service)
|
||||
quiz_controller = providers.Factory(QuizController, quiz_service, answer_service)
|
||||
quiz_controller = providers.Factory(
|
||||
QuizController,
|
||||
quiz_service,
|
||||
answer_service,
|
||||
question_generation_service,
|
||||
)
|
||||
history_controller = providers.Factory(HistoryController, history_service)
|
||||
subject_controller = providers.Factory(SubjectController, subject_service)
|
||||
socket_controller = providers.Factory(
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
Binary file not shown.
Binary file not shown.
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
|
@ -3,6 +3,7 @@ from .quiz_repositroy import QuizRepository
|
|||
from .answer_repository import UserAnswerRepository
|
||||
from .subject_repository import SubjectRepository
|
||||
from .session_repostory import SessionRepository
|
||||
from .ner_srl_repository import NERSRLRepository
|
||||
|
||||
__all__ = [
|
||||
"UserRepository",
|
||||
|
@ -10,4 +11,5 @@ __all__ = [
|
|||
"UserAnswerRepository",
|
||||
"SubjectRepository",
|
||||
"SessionRepository",
|
||||
"NERSRLRepository",
|
||||
]
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
import json
|
||||
import numpy as np
|
||||
import tensorflow as tf
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
class AnswerGenerationRepository:
|
||||
MODEL_PATH = "app/lstm_model/question_generation/qa_lstm_model_final.h5"
|
||||
TOKENIZER_PATH = "app/lstm_model/question_generation/qa_tokenizers.json"
|
||||
|
||||
def __init__(self):
|
||||
with open(self.TOKENIZER_PATH, "r") as f:
|
||||
tokenizer_data = json.load(f)
|
||||
|
||||
self.tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
|
||||
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
|
||||
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
|
||||
self.answer_tokenizer = tokenizer_from_json(tokenizer_data["answer_tokenizer"])
|
||||
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
|
||||
|
||||
self.max_context_len = tokenizer_data["max_context_len"]
|
||||
self.max_question_len = tokenizer_data["max_question_len"]
|
||||
self.max_token_len = tokenizer_data["max_token_len"]
|
||||
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
|
||||
|
||||
self.model = load_model(self.MODEL_PATH)
|
||||
|
||||
def preprocess_text(self, text):
|
||||
text = text.lower()
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
return text
|
||||
|
||||
def predict_answer(self, context, question, tokens, ner, srl, q_type):
|
||||
context_seq = self.tokenizer.texts_to_sequences([self.preprocess_text(context)])
|
||||
question_seq = self.tokenizer.texts_to_sequences(
|
||||
[self.preprocess_text(question)]
|
||||
)
|
||||
token_seq = [self.tokenizer.texts_to_sequences([" ".join(tokens)])[0]]
|
||||
ner_seq = [self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]]
|
||||
srl_seq = [self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]]
|
||||
|
||||
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
|
||||
q_type_cat = tf.keras.utils.to_categorical(
|
||||
[q_type_idx], num_classes=self.q_type_vocab_size
|
||||
)
|
||||
|
||||
context_pad = pad_sequences(
|
||||
context_seq, maxlen=self.max_context_len, padding="post"
|
||||
)
|
||||
question_pad = pad_sequences(
|
||||
question_seq, maxlen=self.max_question_len, padding="post"
|
||||
)
|
||||
token_pad = pad_sequences(token_seq, maxlen=self.max_token_len, padding="post")
|
||||
ner_pad = pad_sequences(ner_seq, maxlen=self.max_token_len, padding="post")
|
||||
srl_pad = pad_sequences(srl_seq, maxlen=self.max_token_len, padding="post")
|
||||
|
||||
prediction = self.model.predict(
|
||||
[context_pad, question_pad, token_pad, ner_pad, srl_pad, q_type_cat],
|
||||
verbose=0,
|
||||
)
|
||||
answer_idx = np.argmax(prediction[0])
|
||||
|
||||
for word, idx in self.answer_tokenizer.word_index.items():
|
||||
if idx == answer_idx:
|
||||
return word
|
||||
|
||||
return "Unknown"
|
|
@ -0,0 +1,49 @@
|
|||
import numpy as np
|
||||
import pickle
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
class NERSRLRepository:
|
||||
def __init__(self):
|
||||
# Load model and artifacts
|
||||
self.model = load_model("app/lstm_model/ner_srl/lstm_ner_srl_model.keras")
|
||||
|
||||
with open("app/lstm_model/ner_srl/word2idx.pkl", "rb") as f:
|
||||
self.word2idx = pickle.load(f)
|
||||
with open("app/lstm_model/ner_srl/tag2idx_ner.pkl", "rb") as f:
|
||||
self.tag2idx_ner = pickle.load(f)
|
||||
with open("app/lstm_model/ner_srl/tag2idx_srl.pkl", "rb") as f:
|
||||
self.tag2idx_srl = pickle.load(f)
|
||||
|
||||
self.idx2tag_ner = {i: t for t, i in self.tag2idx_ner.items()}
|
||||
self.idx2tag_srl = {i: t for t, i in self.tag2idx_srl.items()}
|
||||
|
||||
self.PAD_WORD_ID = self.word2idx["PAD"]
|
||||
self.MAXLEN = self.model.input_shape[1]
|
||||
|
||||
def _preprocess_tokens(self, tokens: list[str]) -> np.ndarray:
|
||||
seq = [self.word2idx.get(tok.lower(), self.word2idx["UNK"]) for tok in tokens]
|
||||
return pad_sequences(
|
||||
[seq], maxlen=self.MAXLEN, padding="post", value=self.PAD_WORD_ID
|
||||
)
|
||||
|
||||
def predict_sentence(self, sentence: str) -> dict:
|
||||
tokens = re.findall(r"\d{1,2}\.\d{2}|\w+|[^\w\s]", sentence.lower())
|
||||
print(tokens)
|
||||
seq_padded = self._preprocess_tokens(tokens)
|
||||
|
||||
pred_ner_prob, pred_srl_prob = self.model.predict(seq_padded, verbose=0)
|
||||
pred_ner = pred_ner_prob.argmax(-1)[0][: len(tokens)]
|
||||
pred_srl = pred_srl_prob.argmax(-1)[0][: len(tokens)]
|
||||
|
||||
return {
|
||||
"tokens": tokens,
|
||||
"labels_ner": [self.idx2tag_ner[int(i)] for i in pred_ner],
|
||||
"labels_srl": [self.idx2tag_srl[int(i)] for i in pred_srl],
|
||||
}
|
||||
|
||||
def labeling_token(self, tokens: list[str]) -> dict:
|
||||
sentence = " ".join(tokens)
|
||||
return self.predict_sentence(sentence)
|
|
@ -0,0 +1,188 @@
|
|||
import numpy as np
|
||||
import json
|
||||
import tensorflow as tf
|
||||
from tensorflow.keras.preprocessing.text import tokenizer_from_json # type: ignore
|
||||
from tensorflow.keras.preprocessing.sequence import pad_sequences # type: ignore
|
||||
from tensorflow.keras.models import load_model # type: ignore
|
||||
import re
|
||||
|
||||
|
||||
class QuestionGenerationRepository:
|
||||
# Static paths for model and tokenizer
|
||||
MODEL_PATH = "app/lstm_model/question_generation/question_prediction_model_final.h5"
|
||||
TOKENIZER_PATH = "app/lstm_model/question_generation/question_prediction_tokenizers.json"
|
||||
|
||||
def __init__(self):
|
||||
"""
|
||||
Initialize question prediction model with pre-trained model and tokenizers
|
||||
using static paths
|
||||
"""
|
||||
# Load model
|
||||
self.model = load_model(self.MODEL_PATH)
|
||||
|
||||
# Load tokenizers
|
||||
with open(self.TOKENIZER_PATH, "r") as f:
|
||||
tokenizer_data = json.load(f)
|
||||
|
||||
# Reconstruct tokenizers
|
||||
self.word_tokenizer = tokenizer_from_json(tokenizer_data["word_tokenizer"])
|
||||
self.ner_tokenizer = tokenizer_from_json(tokenizer_data["ner_tokenizer"])
|
||||
self.srl_tokenizer = tokenizer_from_json(tokenizer_data["srl_tokenizer"])
|
||||
self.q_type_tokenizer = tokenizer_from_json(tokenizer_data["q_type_tokenizer"])
|
||||
|
||||
# Get max lengths
|
||||
self.max_context_len = tokenizer_data["max_context_len"]
|
||||
self.max_answer_len = tokenizer_data["max_answer_len"]
|
||||
self.max_question_len = tokenizer_data["max_question_len"]
|
||||
self.max_token_len = tokenizer_data["max_token_len"]
|
||||
|
||||
# Get vocabulary sizes
|
||||
self.vocab_size = len(self.word_tokenizer.word_index) + 1
|
||||
self.q_type_vocab_size = len(self.q_type_tokenizer.word_index) + 1
|
||||
|
||||
def preprocess_text(self, text):
|
||||
"""Basic text preprocessing"""
|
||||
text = text.lower()
|
||||
text = re.sub(r"\s+", " ", text).strip()
|
||||
return text
|
||||
|
||||
def predict_question(self, context, answer, tokens, ner, srl, q_type):
|
||||
"""
|
||||
Predict a question based on given context, answer, tokens, NER, SRL, and question type
|
||||
|
||||
Args:
|
||||
context (str): The context text
|
||||
answer (str): The answer to generate a question for
|
||||
tokens (list): List of tokens
|
||||
ner (list): List of NER tags corresponding to tokens
|
||||
srl (list): List of SRL tags corresponding to tokens
|
||||
q_type (str): Question type ('isian', 'opsi', or 'true_false')
|
||||
|
||||
Returns:
|
||||
str: The predicted question
|
||||
"""
|
||||
# Preprocess inputs
|
||||
context = self.preprocess_text(context)
|
||||
answer = self.preprocess_text(answer)
|
||||
|
||||
# Convert to sequences
|
||||
context_seq = self.word_tokenizer.texts_to_sequences([context])[0]
|
||||
answer_seq = self.word_tokenizer.texts_to_sequences([answer])[0]
|
||||
tokens_seq = self.word_tokenizer.texts_to_sequences([" ".join(tokens)])[0]
|
||||
ner_seq = self.ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]
|
||||
srl_seq = self.srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]
|
||||
|
||||
# Pad sequences
|
||||
context_padded = pad_sequences(
|
||||
[context_seq], maxlen=self.max_context_len, padding="post"
|
||||
)
|
||||
answer_padded = pad_sequences(
|
||||
[answer_seq], maxlen=self.max_answer_len, padding="post"
|
||||
)
|
||||
tokens_padded = pad_sequences(
|
||||
[tokens_seq], maxlen=self.max_token_len, padding="post"
|
||||
)
|
||||
ner_padded = pad_sequences([ner_seq], maxlen=self.max_token_len, padding="post")
|
||||
srl_padded = pad_sequences([srl_seq], maxlen=self.max_token_len, padding="post")
|
||||
|
||||
# One-hot encode question type
|
||||
q_type_idx = self.q_type_tokenizer.word_index.get(q_type, 0)
|
||||
q_type_categorical = tf.keras.utils.to_categorical(
|
||||
[q_type_idx], num_classes=self.q_type_vocab_size
|
||||
)
|
||||
|
||||
# Make prediction
|
||||
predicted_seq = self.model.predict(
|
||||
[
|
||||
context_padded,
|
||||
answer_padded,
|
||||
tokens_padded,
|
||||
ner_padded,
|
||||
srl_padded,
|
||||
q_type_categorical,
|
||||
]
|
||||
)
|
||||
|
||||
# Convert predictions to tokens (taking the highest probability token at each position)
|
||||
predicted_indices = np.argmax(predicted_seq[0], axis=1)
|
||||
|
||||
# Create reversed word index for converting indices back to words
|
||||
reverse_word_index = {v: k for k, v in self.word_tokenizer.word_index.items()}
|
||||
|
||||
# Convert indices to words
|
||||
predicted_words = []
|
||||
for idx in predicted_indices:
|
||||
if idx != 0: # Skip padding tokens
|
||||
predicted_words.append(reverse_word_index.get(idx, ""))
|
||||
|
||||
# Form the question
|
||||
predicted_question = " ".join(predicted_words)
|
||||
|
||||
# Add "___" to the end based on question type convention
|
||||
if "___" not in predicted_question:
|
||||
predicted_question += " ___"
|
||||
|
||||
return predicted_question
|
||||
|
||||
def batch_predict_questions(self, data):
|
||||
"""
|
||||
Predict questions for a batch of data
|
||||
|
||||
Args:
|
||||
data (list): List of dictionaries with context, tokens, ner, srl, and answers
|
||||
|
||||
Returns:
|
||||
list: List of predicted questions
|
||||
"""
|
||||
results = []
|
||||
|
||||
for item in data:
|
||||
context = item["context"]
|
||||
tokens = item["tokens"]
|
||||
ner = item["ner"]
|
||||
srl = item["srl"]
|
||||
|
||||
# If there are Q&A pairs, use them for evaluation
|
||||
if "qas" in item:
|
||||
for qa in item["qas"]:
|
||||
answer = qa["answer"]
|
||||
q_type = qa["type"]
|
||||
ground_truth = qa["question"]
|
||||
|
||||
predicted_question = self.predict_question(
|
||||
context, answer, tokens, ner, srl, q_type
|
||||
)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"context": context,
|
||||
"answer": answer,
|
||||
"predicted_question": predicted_question,
|
||||
"ground_truth": ground_truth,
|
||||
"question_type": q_type,
|
||||
}
|
||||
)
|
||||
else:
|
||||
# If no Q&A pairs, generate questions for all question types
|
||||
for q_type in ["isian", "true_false", "opsi"]:
|
||||
# For demo purposes, use a placeholder answer (would need actual answers in real use)
|
||||
# In practice, you might extract potential answers from the context
|
||||
placeholders = {
|
||||
"isian": "placeholder",
|
||||
"true_false": "true",
|
||||
"opsi": "placeholder",
|
||||
}
|
||||
|
||||
predicted_question = self.predict_question(
|
||||
context, placeholders[q_type], tokens, ner, srl, q_type
|
||||
)
|
||||
|
||||
results.append(
|
||||
{
|
||||
"context": context,
|
||||
"predicted_question": predicted_question,
|
||||
"question_type": q_type,
|
||||
}
|
||||
)
|
||||
|
||||
return results
|
|
@ -5,6 +5,7 @@ from .answer_service import AnswerService
|
|||
from .history_service import HistoryService
|
||||
from .subject_service import SubjectService
|
||||
from .session_service import SessionService
|
||||
from .question_generation_service import QuestionGenerationService
|
||||
|
||||
__all__ = [
|
||||
"AuthService",
|
||||
|
@ -14,4 +15,5 @@ __all__ = [
|
|||
"HistoryService",
|
||||
"SubjectService",
|
||||
"SessionService",
|
||||
"QuestionGenerationService",
|
||||
]
|
||||
|
|
|
@ -0,0 +1,25 @@
|
|||
from app.repositories import NERSRLRepository
|
||||
import re
|
||||
|
||||
|
||||
class QuestionGenerationService:
|
||||
|
||||
def __init__(self, ner_srl_repository: NERSRLRepository):
|
||||
self._ner_srl_repository = ner_srl_repository
|
||||
|
||||
def createQuizAutomate(self, sentence: str):
|
||||
# Gunakan regex untuk split hanya pada titik yang diikuti spasi atau akhir kalimat,
|
||||
# dan bukan bagian dari angka (contoh: 19.00 tidak dipisah)
|
||||
split_pattern = r"\.(?=\s|$)(?!\d)"
|
||||
|
||||
# Pisahkan kalimat menggunakan regex
|
||||
sentences = [s.strip() for s in re.split(split_pattern, sentence) if s.strip()]
|
||||
|
||||
results = []
|
||||
for s in sentences:
|
||||
result = self._ner_srl_repository.predict_sentence(s)
|
||||
results.append(result)
|
||||
|
||||
|
||||
|
||||
return results
|
|
@ -33,7 +33,6 @@ iniconfig==2.0.0
|
|||
itsdangerous==2.2.0
|
||||
Jinja2==3.1.6
|
||||
MarkupSafe==3.0.2
|
||||
numpy==2.1.2
|
||||
oauthlib==3.2.2
|
||||
packaging==24.2
|
||||
pluggy==1.5.0
|
||||
|
@ -58,3 +57,4 @@ typing_extensions==4.12.2
|
|||
urllib3==2.3.0
|
||||
Werkzeug==3.1.3
|
||||
wsproto==1.2.0
|
||||
tensorflow==2.18.0
|
||||
|
|
Loading…
Reference in New Issue