TIF_E41211115_lstm-quiz-gen.../question_generation/qa_model.py

491 lines
16 KiB
Python

import numpy as np
import pandas as pd
import json
import random
import tensorflow as tf
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.models import Model, load_model
from tensorflow.keras.layers import (
Input,
LSTM,
Dense,
Embedding,
Bidirectional,
Concatenate,
Attention,
Dropout,
)
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
import re
with open("data_converted.json", "r") as f:
data = json.load(f)
# Preprocessing function
def preprocess_text(text):
"""Melakukan preprocessing teks dasar"""
text = text.lower()
text = re.sub(r"\s+", " ", text).strip()
return text
# Persiapkan data untuk model
def prepare_data(data):
"""Siapkan data untuk model"""
contexts = []
tokens_list = []
ner_list = []
srl_list = []
questions = []
answers = []
q_types = []
for item in data:
for qa in item["qas"]:
contexts.append(preprocess_text(item["context"]))
tokens_list.append(item["tokens"])
ner_list.append(item["ner"])
srl_list.append(item["srl"])
questions.append(preprocess_text(qa["question"]))
answers.append(qa["answer"])
q_types.append(qa["type"])
return contexts, tokens_list, ner_list, srl_list, questions, answers, q_types
contexts, tokens_list, ner_list, srl_list, questions, answers, q_types = prepare_data(
data
)
max_vocab_size = 10000
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<OOV>")
tokenizer.fit_on_texts(contexts + questions + [" ".join(item) for item in tokens_list])
vocab_size = len(tokenizer.word_index) + 1
# Encoding untuk NER
ner_tokenizer = Tokenizer(oov_token="<OOV>")
ner_tokenizer.fit_on_texts([" ".join(ner) for ner in ner_list])
ner_vocab_size = len(ner_tokenizer.word_index) + 1
# Encoding untuk SRL
srl_tokenizer = Tokenizer(oov_token="<OOV>")
srl_tokenizer.fit_on_texts([" ".join(srl) for srl in srl_list])
srl_vocab_size = len(srl_tokenizer.word_index) + 1
# Encoding untuk tipe pertanyaan
q_type_tokenizer = Tokenizer()
q_type_tokenizer.fit_on_texts(q_types)
q_type_vocab_size = len(q_type_tokenizer.word_index) + 1
# Konversi token, ner, srl ke sequences
def tokens_to_sequences(tokens, ner, srl):
"""Konversi token, ner, dan srl ke sequences"""
token_seqs = [tokenizer.texts_to_sequences([" ".join(t)])[0] for t in tokens]
ner_seqs = [ner_tokenizer.texts_to_sequences([" ".join(n)])[0] for n in ner]
srl_seqs = [srl_tokenizer.texts_to_sequences([" ".join(s)])[0] for s in srl]
return token_seqs, ner_seqs, srl_seqs
# Menentukan panjang maksimum untuk padding
context_seqs = tokenizer.texts_to_sequences(contexts)
question_seqs = tokenizer.texts_to_sequences(questions)
token_seqs, ner_seqs, srl_seqs = tokens_to_sequences(tokens_list, ner_list, srl_list)
max_context_len = max([len(seq) for seq in context_seqs])
max_question_len = max([len(seq) for seq in question_seqs])
max_token_len = max([len(seq) for seq in token_seqs])
# Pad sequences untuk memastikan semua input sama panjang
def pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs):
"""Padding semua sequences"""
context_padded = pad_sequences(context_seqs, maxlen=max_context_len, padding="post")
question_padded = pad_sequences(
question_seqs, maxlen=max_question_len, padding="post"
)
token_padded = pad_sequences(token_seqs, maxlen=max_token_len, padding="post")
ner_padded = pad_sequences(ner_seqs, maxlen=max_token_len, padding="post")
srl_padded = pad_sequences(srl_seqs, maxlen=max_token_len, padding="post")
return context_padded, question_padded, token_padded, ner_padded, srl_padded
# Siapkan encoder untuk jawaban
answer_tokenizer = Tokenizer(oov_token="<OOV>")
answer_tokenizer.fit_on_texts(answers)
answer_vocab_size = len(answer_tokenizer.word_index) + 1
# Encode tipe pertanyaan - FIX - Menggunakan indeks langsung bukan sequence
q_type_indices = []
for q_type in q_types:
# Dapatkan indeks tipe pertanyaan (dikurangi 1 karena indeks dimulai dari 1)
q_type_idx = q_type_tokenizer.word_index.get(q_type, 0)
q_type_indices.append(q_type_idx)
# Konversi ke numpy array
q_type_indices = np.array(q_type_indices)
# One-hot encode tipe pertanyaan
q_type_categorical = tf.keras.utils.to_categorical(
q_type_indices, num_classes=q_type_vocab_size
)
# Pad sequences
context_padded, question_padded, token_padded, ner_padded, srl_padded = (
pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs)
)
# Encode jawaban
answer_seqs = answer_tokenizer.texts_to_sequences(answers)
max_answer_len = max([len(seq) for seq in answer_seqs])
answer_padded = pad_sequences(answer_seqs, maxlen=max_answer_len, padding="post")
# Split data menjadi train dan test sets
indices = list(range(len(context_padded)))
train_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42)
# Fungsi untuk mendapatkan subset dari data berdasarkan indices
def get_subset(data, indices):
return np.array([data[i] for i in indices])
# Train data
train_context = get_subset(context_padded, train_indices)
train_question = get_subset(question_padded, train_indices)
train_token = get_subset(token_padded, train_indices)
train_ner = get_subset(ner_padded, train_indices)
train_srl = get_subset(srl_padded, train_indices)
train_q_type = get_subset(q_type_categorical, train_indices)
train_answer = get_subset(answer_padded, train_indices)
# Test data
test_context = get_subset(context_padded, test_indices)
test_question = get_subset(question_padded, test_indices)
test_token = get_subset(token_padded, test_indices)
test_ner = get_subset(ner_padded, test_indices)
test_srl = get_subset(srl_padded, test_indices)
test_q_type = get_subset(q_type_categorical, test_indices)
test_answer = get_subset(answer_padded, test_indices)
# Hyperparameters
embedding_dim = 100
lstm_units = 128
ner_embedding_dim = 50
srl_embedding_dim = 50
dropout_rate = 0.3
# Function untuk membuat model dengan dua output: pertanyaan dan jawaban
def create_qa_generator_model():
# Input layers
context_input = Input(shape=(max_context_len,), name="context_input")
token_input = Input(shape=(max_token_len,), name="token_input")
ner_input = Input(shape=(max_token_len,), name="ner_input")
srl_input = Input(shape=(max_token_len,), name="srl_input")
# Tidak perlu question_input dan q_type_input untuk proses generasi
# karena ini akan menjadi output
# Shared embedding layer for text
text_embedding = Embedding(vocab_size, embedding_dim, name="text_embedding")
# Embedding untuk NER dan SRL
ner_embedding = Embedding(ner_vocab_size, ner_embedding_dim, name="ner_embedding")(
ner_input
)
srl_embedding = Embedding(srl_vocab_size, srl_embedding_dim, name="srl_embedding")(
srl_input
)
# Apply embeddings
context_embed = text_embedding(context_input)
token_embed = text_embedding(token_input)
# Bi-directional LSTM untuk context dan token-level features
context_lstm = Bidirectional(
LSTM(lstm_units, return_sequences=True, name="context_lstm")
)(context_embed)
# Concat token features (tokens, NER, SRL)
token_features = Concatenate(name="token_features")(
[token_embed, ner_embedding, srl_embedding]
)
token_lstm = Bidirectional(
LSTM(lstm_units, return_sequences=True, name="token_lstm")
)(token_features)
# Pool outputs
context_pool = tf.keras.layers.GlobalMaxPooling1D(name="context_pool")(context_lstm)
token_pool = tf.keras.layers.GlobalMaxPooling1D(name="token_pool")(token_lstm)
# Concat all features
all_features = Concatenate(name="all_features")([context_pool, token_pool])
# Shared layers
shared = Dense(256, activation="relu", name="shared_dense_1")(all_features)
shared = Dropout(dropout_rate)(shared)
shared = Dense(128, activation="relu", name="shared_dense_2")(shared)
shared = Dropout(dropout_rate)(shared)
# Branch untuk pertanyaan
question_branch = Dense(256, activation="relu", name="question_dense")(shared)
question_branch = Dropout(dropout_rate)(question_branch)
# Branch untuk jawaban
answer_branch = Dense(256, activation="relu", name="answer_dense")(shared)
answer_branch = Dropout(dropout_rate)(answer_branch)
# Output layers
# Untuk pertanyaan, kita buat layer decoder berbasis LSTM untuk menghasilkan sequence kata
# sebagai pertanyaan
question_decoder = LSTM(lstm_units, return_sequences=True, name="question_decoder")(
tf.keras.layers.RepeatVector(max_question_len)(question_branch)
)
question_output = Dense(vocab_size, activation="softmax", name="question_output")(
question_decoder
)
# Output layer untuk jawaban
answer_output = Dense(
answer_vocab_size, activation="softmax", name="answer_output"
)(answer_branch)
# Create model
model = Model(
inputs=[
context_input,
token_input,
ner_input,
srl_input,
],
outputs=[question_output, answer_output],
)
# Compile model dengan loss function dan metrics untuk kedua output
model.compile(
optimizer="adam",
loss={
"question_output": "categorical_crossentropy",
"answer_output": "sparse_categorical_crossentropy",
},
metrics={"question_output": "accuracy", "answer_output": "accuracy"},
loss_weights={"question_output": 1.0, "answer_output": 1.0},
)
return model
# Persiapkan target untuk pertanyaan (one-hot encoded)
# Untuk pertanyaan, kita perlu mengubah ke format categorical karena kita memprediksi
# setiap kata di sequence secara bersamaan
def prepare_question_target(question_padded):
question_target = []
for question in question_padded:
# One-hot encode setiap token dalam sequence
sequence_target = []
for token in question:
# Buat vektor one-hot untuk token ini
token_target = tf.keras.utils.to_categorical(token, num_classes=vocab_size)
sequence_target.append(token_target)
question_target.append(sequence_target)
return np.array(question_target)
# Siapkan target untuk question output
train_question_target = prepare_question_target(train_question)
test_question_target = prepare_question_target(test_question)
# Ubah format jawaban untuk sparse categorical crossentropy
train_answer_labels = train_answer[:, 0] # Ambil indeks pertama dari jawaban
test_answer_labels = test_answer[:, 0]
# Buat model
model = create_qa_generator_model()
model.summary()
# Callback untuk menyimpan model terbaik
checkpoint = ModelCheckpoint(
"qa_generator_model.h5",
monitor="val_question_output_accuracy",
save_best_only=True,
verbose=1,
mode="max",
)
early_stop = EarlyStopping(
monitor="val_question_output_accuracy", patience=5, verbose=1, mode="max"
)
# Training
batch_size = 8
epochs = 50
# Train model
history = model.fit(
[train_context, train_token, train_ner, train_srl],
{"question_output": train_question_target, "answer_output": train_answer_labels},
batch_size=batch_size,
epochs=epochs,
validation_data=(
[test_context, test_token, test_ner, test_srl],
{"question_output": test_question_target, "answer_output": test_answer_labels},
),
callbacks=[checkpoint, early_stop],
)
model.save("qa_generator_model_final.keras")
# Simpan tokenizer
tokenizer_data = {
"word_tokenizer": tokenizer.to_json(),
"ner_tokenizer": ner_tokenizer.to_json(),
"srl_tokenizer": srl_tokenizer.to_json(),
"answer_tokenizer": answer_tokenizer.to_json(),
"q_type_tokenizer": q_type_tokenizer.to_json(),
"max_context_len": max_context_len,
"max_question_len": max_question_len,
"max_token_len": max_token_len,
}
with open("qa_generator_tokenizers.json", "w") as f:
json.dump(tokenizer_data, f)
# Fungsi untuk prediksi
def predict_question_and_answer(model, context, tokens, ner, srl):
"""
Prediksi pertanyaan dan jawaban berdasarkan konteks, tokens, NER, dan SRL
"""
# Preprocess input
context_seq = tokenizer.texts_to_sequences([preprocess_text(context)])
context_padded = pad_sequences(context_seq, maxlen=max_context_len, padding="post")
token_seq = tokenizer.texts_to_sequences([" ".join(tokens)])
token_padded = pad_sequences(token_seq, maxlen=max_token_len, padding="post")
ner_seq = ner_tokenizer.texts_to_sequences([" ".join(ner)])
ner_padded = pad_sequences(ner_seq, maxlen=max_token_len, padding="post")
srl_seq = srl_tokenizer.texts_to_sequences([" ".join(srl)])
srl_padded = pad_sequences(srl_seq, maxlen=max_token_len, padding="post")
# Prediksi
question_pred, answer_pred = model.predict(
[context_padded, token_padded, ner_padded, srl_padded]
)
# Decode pertanyaan (mengambil indeks dengan probabilitas tertinggi di setiap posisi)
question_indices = np.argmax(question_pred[0], axis=1)
question_words = []
# Reverse word index untuk mendapatkan kata dari indeks
word_index = tokenizer.word_index
index_word = {v: k for k, v in word_index.items()}
# Decode pertanyaan
for idx in question_indices:
if idx != 0: # Skip padding (index 0)
word = index_word.get(idx, "<UNK>")
question_words.append(word)
else:
break # Stop at padding
# Decode jawaban
answer_idx = np.argmax(answer_pred[0])
# Reverse word index untuk jawaban
answer_word_index = answer_tokenizer.word_index
answer_index_word = {v: k for k, v in answer_word_index.items()}
answer = answer_index_word.get(answer_idx, "<UNK>")
# Bentuk pertanyaan
question = " ".join(question_words)
return question, answer
# Contoh penggunaan
# Catatan: Ini hanya contoh, perlu data aktual saat implementasi
"""
sample_context = "Selamat pagi, sekarang adalah hari Senin."
sample_tokens = ["selamat", "pagi", "sekarang", "adalah", "hari", "senin"]
sample_ner = ["O", "O", "O", "O", "O", "B-TIME"]
sample_srl = ["B-V", "B-ARG1", "B-ARGM-TMP", "B-ARGM-PRD", "I-ARGM-PRD", "I-ARGM-PRD"]
# Load model yang sudah dilatih
loaded_model = load_model("qa_generator_model_final.keras")
# Prediksi
question, answer = predict_question_and_answer(
loaded_model, sample_context, sample_tokens, sample_ner, sample_srl
)
print("Konteks:", sample_context)
print("Pertanyaan yang dihasilkan:", question)
print("Jawaban yang dihasilkan:", answer)
"""
sample = {
"context": "kerajaan majapahit berdiri pada tahun 1293 di trowulan",
"tokens": [
"kerajaan",
"majapahit",
"berdiri",
"pada",
"tahun",
"1293",
"di",
"trowulan",
],
"ner": ["O", "ORG", "O", "O", "O", "DATE", "O", "LOC"],
"srl": ["ARG1", "ARG1", "V", "O", "O", "ARGM-TMP", "O", "ARGM-LOC"],
}
question, answer = predict_question_and_answer(
model, sample["context"], sample["tokens"], sample["ner"], sample["srl"]
)
print("Konteks:", sample["context"])
print("Pertanyaan yang dihasilkan:", question)
print("Jawaban yang dihasilkan:", answer)
# Plot history training
# plt.figure(figsize=(12, 8))
# # Plot loss
# plt.subplot(2, 2, 1)
# plt.plot(history.history['loss'])
# plt.plot(history.history['val_loss'])
# plt.title('Model Loss')
# plt.ylabel('Loss')
# plt.xlabel('Epoch')
# plt.legend(['Train', 'Validation'], loc='upper right')
# # Plot question output accuracy
# plt.subplot(2, 2, 2)
# plt.plot(history.history['question_output_accuracy'])
# plt.plot(history.history['val_question_output_accuracy'])
# plt.title('Question Output Accuracy')
# plt.ylabel('Accuracy')
# plt.xlabel('Epoch')
# plt.legend(['Train', 'Validation'], loc='lower right')
# # Plot answer output accuracy
# plt.subplot(2, 2, 3)
# plt.plot(history.history['answer_output_accuracy'])
# plt.plot(history.history['val_answer_output_accuracy'])
# plt.title('Answer Output Accuracy')
# plt.ylabel('Accuracy')
# plt.xlabel('Epoch')
# plt.legend(['Train', 'Validation'], loc='lower right')
# plt.tight_layout()
# plt.savefig("training_history.png")
# plt.show()