491 lines
16 KiB
Python
491 lines
16 KiB
Python
import numpy as np
|
|
import pandas as pd
|
|
import json
|
|
import random
|
|
import tensorflow as tf
|
|
from tensorflow.keras.preprocessing.text import Tokenizer
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from tensorflow.keras.models import Model, load_model
|
|
from tensorflow.keras.layers import (
|
|
Input,
|
|
LSTM,
|
|
Dense,
|
|
Embedding,
|
|
Bidirectional,
|
|
Concatenate,
|
|
Attention,
|
|
Dropout,
|
|
)
|
|
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
|
|
from sklearn.model_selection import train_test_split
|
|
import matplotlib.pyplot as plt
|
|
import re
|
|
|
|
|
|
with open("data_converted.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
|
|
# Preprocessing function
|
|
def preprocess_text(text):
|
|
"""Melakukan preprocessing teks dasar"""
|
|
text = text.lower()
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
# Persiapkan data untuk model
|
|
def prepare_data(data):
|
|
"""Siapkan data untuk model"""
|
|
contexts = []
|
|
tokens_list = []
|
|
ner_list = []
|
|
srl_list = []
|
|
questions = []
|
|
answers = []
|
|
q_types = []
|
|
|
|
for item in data:
|
|
for qa in item["qas"]:
|
|
contexts.append(preprocess_text(item["context"]))
|
|
tokens_list.append(item["tokens"])
|
|
ner_list.append(item["ner"])
|
|
srl_list.append(item["srl"])
|
|
questions.append(preprocess_text(qa["question"]))
|
|
answers.append(qa["answer"])
|
|
q_types.append(qa["type"])
|
|
|
|
return contexts, tokens_list, ner_list, srl_list, questions, answers, q_types
|
|
|
|
|
|
contexts, tokens_list, ner_list, srl_list, questions, answers, q_types = prepare_data(
|
|
data
|
|
)
|
|
|
|
max_vocab_size = 10000
|
|
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<OOV>")
|
|
tokenizer.fit_on_texts(contexts + questions + [" ".join(item) for item in tokens_list])
|
|
vocab_size = len(tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk NER
|
|
ner_tokenizer = Tokenizer(oov_token="<OOV>")
|
|
ner_tokenizer.fit_on_texts([" ".join(ner) for ner in ner_list])
|
|
ner_vocab_size = len(ner_tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk SRL
|
|
srl_tokenizer = Tokenizer(oov_token="<OOV>")
|
|
srl_tokenizer.fit_on_texts([" ".join(srl) for srl in srl_list])
|
|
srl_vocab_size = len(srl_tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk tipe pertanyaan
|
|
q_type_tokenizer = Tokenizer()
|
|
q_type_tokenizer.fit_on_texts(q_types)
|
|
q_type_vocab_size = len(q_type_tokenizer.word_index) + 1
|
|
|
|
|
|
# Konversi token, ner, srl ke sequences
|
|
def tokens_to_sequences(tokens, ner, srl):
|
|
"""Konversi token, ner, dan srl ke sequences"""
|
|
token_seqs = [tokenizer.texts_to_sequences([" ".join(t)])[0] for t in tokens]
|
|
ner_seqs = [ner_tokenizer.texts_to_sequences([" ".join(n)])[0] for n in ner]
|
|
srl_seqs = [srl_tokenizer.texts_to_sequences([" ".join(s)])[0] for s in srl]
|
|
return token_seqs, ner_seqs, srl_seqs
|
|
|
|
|
|
# Menentukan panjang maksimum untuk padding
|
|
context_seqs = tokenizer.texts_to_sequences(contexts)
|
|
question_seqs = tokenizer.texts_to_sequences(questions)
|
|
token_seqs, ner_seqs, srl_seqs = tokens_to_sequences(tokens_list, ner_list, srl_list)
|
|
|
|
max_context_len = max([len(seq) for seq in context_seqs])
|
|
max_question_len = max([len(seq) for seq in question_seqs])
|
|
max_token_len = max([len(seq) for seq in token_seqs])
|
|
|
|
|
|
# Pad sequences untuk memastikan semua input sama panjang
|
|
def pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs):
|
|
"""Padding semua sequences"""
|
|
context_padded = pad_sequences(context_seqs, maxlen=max_context_len, padding="post")
|
|
question_padded = pad_sequences(
|
|
question_seqs, maxlen=max_question_len, padding="post"
|
|
)
|
|
token_padded = pad_sequences(token_seqs, maxlen=max_token_len, padding="post")
|
|
ner_padded = pad_sequences(ner_seqs, maxlen=max_token_len, padding="post")
|
|
srl_padded = pad_sequences(srl_seqs, maxlen=max_token_len, padding="post")
|
|
return context_padded, question_padded, token_padded, ner_padded, srl_padded
|
|
|
|
|
|
# Siapkan encoder untuk jawaban
|
|
answer_tokenizer = Tokenizer(oov_token="<OOV>")
|
|
answer_tokenizer.fit_on_texts(answers)
|
|
answer_vocab_size = len(answer_tokenizer.word_index) + 1
|
|
|
|
# Encode tipe pertanyaan - FIX - Menggunakan indeks langsung bukan sequence
|
|
q_type_indices = []
|
|
for q_type in q_types:
|
|
# Dapatkan indeks tipe pertanyaan (dikurangi 1 karena indeks dimulai dari 1)
|
|
q_type_idx = q_type_tokenizer.word_index.get(q_type, 0)
|
|
q_type_indices.append(q_type_idx)
|
|
|
|
# Konversi ke numpy array
|
|
q_type_indices = np.array(q_type_indices)
|
|
|
|
# One-hot encode tipe pertanyaan
|
|
q_type_categorical = tf.keras.utils.to_categorical(
|
|
q_type_indices, num_classes=q_type_vocab_size
|
|
)
|
|
|
|
# Pad sequences
|
|
context_padded, question_padded, token_padded, ner_padded, srl_padded = (
|
|
pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs)
|
|
)
|
|
|
|
# Encode jawaban
|
|
answer_seqs = answer_tokenizer.texts_to_sequences(answers)
|
|
max_answer_len = max([len(seq) for seq in answer_seqs])
|
|
answer_padded = pad_sequences(answer_seqs, maxlen=max_answer_len, padding="post")
|
|
|
|
# Split data menjadi train dan test sets
|
|
indices = list(range(len(context_padded)))
|
|
train_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42)
|
|
|
|
|
|
# Fungsi untuk mendapatkan subset dari data berdasarkan indices
|
|
def get_subset(data, indices):
|
|
return np.array([data[i] for i in indices])
|
|
|
|
|
|
# Train data
|
|
train_context = get_subset(context_padded, train_indices)
|
|
train_question = get_subset(question_padded, train_indices)
|
|
train_token = get_subset(token_padded, train_indices)
|
|
train_ner = get_subset(ner_padded, train_indices)
|
|
train_srl = get_subset(srl_padded, train_indices)
|
|
train_q_type = get_subset(q_type_categorical, train_indices)
|
|
train_answer = get_subset(answer_padded, train_indices)
|
|
|
|
# Test data
|
|
test_context = get_subset(context_padded, test_indices)
|
|
test_question = get_subset(question_padded, test_indices)
|
|
test_token = get_subset(token_padded, test_indices)
|
|
test_ner = get_subset(ner_padded, test_indices)
|
|
test_srl = get_subset(srl_padded, test_indices)
|
|
test_q_type = get_subset(q_type_categorical, test_indices)
|
|
test_answer = get_subset(answer_padded, test_indices)
|
|
|
|
# Hyperparameters
|
|
embedding_dim = 100
|
|
lstm_units = 128
|
|
ner_embedding_dim = 50
|
|
srl_embedding_dim = 50
|
|
dropout_rate = 0.3
|
|
|
|
|
|
# Function untuk membuat model dengan dua output: pertanyaan dan jawaban
|
|
def create_qa_generator_model():
|
|
# Input layers
|
|
context_input = Input(shape=(max_context_len,), name="context_input")
|
|
token_input = Input(shape=(max_token_len,), name="token_input")
|
|
ner_input = Input(shape=(max_token_len,), name="ner_input")
|
|
srl_input = Input(shape=(max_token_len,), name="srl_input")
|
|
|
|
# Tidak perlu question_input dan q_type_input untuk proses generasi
|
|
# karena ini akan menjadi output
|
|
|
|
# Shared embedding layer for text
|
|
text_embedding = Embedding(vocab_size, embedding_dim, name="text_embedding")
|
|
|
|
# Embedding untuk NER dan SRL
|
|
ner_embedding = Embedding(ner_vocab_size, ner_embedding_dim, name="ner_embedding")(
|
|
ner_input
|
|
)
|
|
srl_embedding = Embedding(srl_vocab_size, srl_embedding_dim, name="srl_embedding")(
|
|
srl_input
|
|
)
|
|
|
|
# Apply embeddings
|
|
context_embed = text_embedding(context_input)
|
|
token_embed = text_embedding(token_input)
|
|
|
|
# Bi-directional LSTM untuk context dan token-level features
|
|
context_lstm = Bidirectional(
|
|
LSTM(lstm_units, return_sequences=True, name="context_lstm")
|
|
)(context_embed)
|
|
|
|
# Concat token features (tokens, NER, SRL)
|
|
token_features = Concatenate(name="token_features")(
|
|
[token_embed, ner_embedding, srl_embedding]
|
|
)
|
|
token_lstm = Bidirectional(
|
|
LSTM(lstm_units, return_sequences=True, name="token_lstm")
|
|
)(token_features)
|
|
|
|
# Pool outputs
|
|
context_pool = tf.keras.layers.GlobalMaxPooling1D(name="context_pool")(context_lstm)
|
|
token_pool = tf.keras.layers.GlobalMaxPooling1D(name="token_pool")(token_lstm)
|
|
|
|
# Concat all features
|
|
all_features = Concatenate(name="all_features")([context_pool, token_pool])
|
|
|
|
# Shared layers
|
|
shared = Dense(256, activation="relu", name="shared_dense_1")(all_features)
|
|
shared = Dropout(dropout_rate)(shared)
|
|
shared = Dense(128, activation="relu", name="shared_dense_2")(shared)
|
|
shared = Dropout(dropout_rate)(shared)
|
|
|
|
# Branch untuk pertanyaan
|
|
question_branch = Dense(256, activation="relu", name="question_dense")(shared)
|
|
question_branch = Dropout(dropout_rate)(question_branch)
|
|
|
|
# Branch untuk jawaban
|
|
answer_branch = Dense(256, activation="relu", name="answer_dense")(shared)
|
|
answer_branch = Dropout(dropout_rate)(answer_branch)
|
|
|
|
# Output layers
|
|
# Untuk pertanyaan, kita buat layer decoder berbasis LSTM untuk menghasilkan sequence kata
|
|
# sebagai pertanyaan
|
|
question_decoder = LSTM(lstm_units, return_sequences=True, name="question_decoder")(
|
|
tf.keras.layers.RepeatVector(max_question_len)(question_branch)
|
|
)
|
|
question_output = Dense(vocab_size, activation="softmax", name="question_output")(
|
|
question_decoder
|
|
)
|
|
|
|
# Output layer untuk jawaban
|
|
answer_output = Dense(
|
|
answer_vocab_size, activation="softmax", name="answer_output"
|
|
)(answer_branch)
|
|
|
|
# Create model
|
|
model = Model(
|
|
inputs=[
|
|
context_input,
|
|
token_input,
|
|
ner_input,
|
|
srl_input,
|
|
],
|
|
outputs=[question_output, answer_output],
|
|
)
|
|
|
|
# Compile model dengan loss function dan metrics untuk kedua output
|
|
model.compile(
|
|
optimizer="adam",
|
|
loss={
|
|
"question_output": "categorical_crossentropy",
|
|
"answer_output": "sparse_categorical_crossentropy",
|
|
},
|
|
metrics={"question_output": "accuracy", "answer_output": "accuracy"},
|
|
loss_weights={"question_output": 1.0, "answer_output": 1.0},
|
|
)
|
|
|
|
return model
|
|
|
|
|
|
# Persiapkan target untuk pertanyaan (one-hot encoded)
|
|
# Untuk pertanyaan, kita perlu mengubah ke format categorical karena kita memprediksi
|
|
# setiap kata di sequence secara bersamaan
|
|
def prepare_question_target(question_padded):
|
|
question_target = []
|
|
for question in question_padded:
|
|
# One-hot encode setiap token dalam sequence
|
|
sequence_target = []
|
|
for token in question:
|
|
# Buat vektor one-hot untuk token ini
|
|
token_target = tf.keras.utils.to_categorical(token, num_classes=vocab_size)
|
|
sequence_target.append(token_target)
|
|
question_target.append(sequence_target)
|
|
return np.array(question_target)
|
|
|
|
|
|
# Siapkan target untuk question output
|
|
train_question_target = prepare_question_target(train_question)
|
|
test_question_target = prepare_question_target(test_question)
|
|
|
|
# Ubah format jawaban untuk sparse categorical crossentropy
|
|
train_answer_labels = train_answer[:, 0] # Ambil indeks pertama dari jawaban
|
|
test_answer_labels = test_answer[:, 0]
|
|
|
|
# Buat model
|
|
model = create_qa_generator_model()
|
|
model.summary()
|
|
|
|
# Callback untuk menyimpan model terbaik
|
|
checkpoint = ModelCheckpoint(
|
|
"qa_generator_model.h5",
|
|
monitor="val_question_output_accuracy",
|
|
save_best_only=True,
|
|
verbose=1,
|
|
mode="max",
|
|
)
|
|
|
|
early_stop = EarlyStopping(
|
|
monitor="val_question_output_accuracy", patience=5, verbose=1, mode="max"
|
|
)
|
|
|
|
# Training
|
|
batch_size = 8
|
|
epochs = 50
|
|
|
|
# Train model
|
|
history = model.fit(
|
|
[train_context, train_token, train_ner, train_srl],
|
|
{"question_output": train_question_target, "answer_output": train_answer_labels},
|
|
batch_size=batch_size,
|
|
epochs=epochs,
|
|
validation_data=(
|
|
[test_context, test_token, test_ner, test_srl],
|
|
{"question_output": test_question_target, "answer_output": test_answer_labels},
|
|
),
|
|
callbacks=[checkpoint, early_stop],
|
|
)
|
|
|
|
model.save("qa_generator_model_final.keras")
|
|
|
|
# Simpan tokenizer
|
|
tokenizer_data = {
|
|
"word_tokenizer": tokenizer.to_json(),
|
|
"ner_tokenizer": ner_tokenizer.to_json(),
|
|
"srl_tokenizer": srl_tokenizer.to_json(),
|
|
"answer_tokenizer": answer_tokenizer.to_json(),
|
|
"q_type_tokenizer": q_type_tokenizer.to_json(),
|
|
"max_context_len": max_context_len,
|
|
"max_question_len": max_question_len,
|
|
"max_token_len": max_token_len,
|
|
}
|
|
|
|
with open("qa_generator_tokenizers.json", "w") as f:
|
|
json.dump(tokenizer_data, f)
|
|
|
|
|
|
# Fungsi untuk prediksi
|
|
def predict_question_and_answer(model, context, tokens, ner, srl):
|
|
"""
|
|
Prediksi pertanyaan dan jawaban berdasarkan konteks, tokens, NER, dan SRL
|
|
"""
|
|
# Preprocess input
|
|
context_seq = tokenizer.texts_to_sequences([preprocess_text(context)])
|
|
context_padded = pad_sequences(context_seq, maxlen=max_context_len, padding="post")
|
|
|
|
token_seq = tokenizer.texts_to_sequences([" ".join(tokens)])
|
|
token_padded = pad_sequences(token_seq, maxlen=max_token_len, padding="post")
|
|
|
|
ner_seq = ner_tokenizer.texts_to_sequences([" ".join(ner)])
|
|
ner_padded = pad_sequences(ner_seq, maxlen=max_token_len, padding="post")
|
|
|
|
srl_seq = srl_tokenizer.texts_to_sequences([" ".join(srl)])
|
|
srl_padded = pad_sequences(srl_seq, maxlen=max_token_len, padding="post")
|
|
|
|
# Prediksi
|
|
question_pred, answer_pred = model.predict(
|
|
[context_padded, token_padded, ner_padded, srl_padded]
|
|
)
|
|
|
|
# Decode pertanyaan (mengambil indeks dengan probabilitas tertinggi di setiap posisi)
|
|
question_indices = np.argmax(question_pred[0], axis=1)
|
|
question_words = []
|
|
|
|
# Reverse word index untuk mendapatkan kata dari indeks
|
|
word_index = tokenizer.word_index
|
|
index_word = {v: k for k, v in word_index.items()}
|
|
|
|
# Decode pertanyaan
|
|
for idx in question_indices:
|
|
if idx != 0: # Skip padding (index 0)
|
|
word = index_word.get(idx, "<UNK>")
|
|
question_words.append(word)
|
|
else:
|
|
break # Stop at padding
|
|
|
|
# Decode jawaban
|
|
answer_idx = np.argmax(answer_pred[0])
|
|
|
|
# Reverse word index untuk jawaban
|
|
answer_word_index = answer_tokenizer.word_index
|
|
answer_index_word = {v: k for k, v in answer_word_index.items()}
|
|
|
|
answer = answer_index_word.get(answer_idx, "<UNK>")
|
|
|
|
# Bentuk pertanyaan
|
|
question = " ".join(question_words)
|
|
|
|
return question, answer
|
|
|
|
|
|
# Contoh penggunaan
|
|
# Catatan: Ini hanya contoh, perlu data aktual saat implementasi
|
|
"""
|
|
sample_context = "Selamat pagi, sekarang adalah hari Senin."
|
|
sample_tokens = ["selamat", "pagi", "sekarang", "adalah", "hari", "senin"]
|
|
sample_ner = ["O", "O", "O", "O", "O", "B-TIME"]
|
|
sample_srl = ["B-V", "B-ARG1", "B-ARGM-TMP", "B-ARGM-PRD", "I-ARGM-PRD", "I-ARGM-PRD"]
|
|
|
|
# Load model yang sudah dilatih
|
|
loaded_model = load_model("qa_generator_model_final.keras")
|
|
|
|
# Prediksi
|
|
question, answer = predict_question_and_answer(
|
|
loaded_model, sample_context, sample_tokens, sample_ner, sample_srl
|
|
)
|
|
|
|
print("Konteks:", sample_context)
|
|
print("Pertanyaan yang dihasilkan:", question)
|
|
print("Jawaban yang dihasilkan:", answer)
|
|
"""
|
|
|
|
sample = {
|
|
"context": "kerajaan majapahit berdiri pada tahun 1293 di trowulan",
|
|
"tokens": [
|
|
"kerajaan",
|
|
"majapahit",
|
|
"berdiri",
|
|
"pada",
|
|
"tahun",
|
|
"1293",
|
|
"di",
|
|
"trowulan",
|
|
],
|
|
"ner": ["O", "ORG", "O", "O", "O", "DATE", "O", "LOC"],
|
|
"srl": ["ARG1", "ARG1", "V", "O", "O", "ARGM-TMP", "O", "ARGM-LOC"],
|
|
}
|
|
question, answer = predict_question_and_answer(
|
|
model, sample["context"], sample["tokens"], sample["ner"], sample["srl"]
|
|
)
|
|
|
|
print("Konteks:", sample["context"])
|
|
print("Pertanyaan yang dihasilkan:", question)
|
|
print("Jawaban yang dihasilkan:", answer)
|
|
|
|
# Plot history training
|
|
# plt.figure(figsize=(12, 8))
|
|
|
|
# # Plot loss
|
|
# plt.subplot(2, 2, 1)
|
|
# plt.plot(history.history['loss'])
|
|
# plt.plot(history.history['val_loss'])
|
|
# plt.title('Model Loss')
|
|
# plt.ylabel('Loss')
|
|
# plt.xlabel('Epoch')
|
|
# plt.legend(['Train', 'Validation'], loc='upper right')
|
|
|
|
# # Plot question output accuracy
|
|
# plt.subplot(2, 2, 2)
|
|
# plt.plot(history.history['question_output_accuracy'])
|
|
# plt.plot(history.history['val_question_output_accuracy'])
|
|
# plt.title('Question Output Accuracy')
|
|
# plt.ylabel('Accuracy')
|
|
# plt.xlabel('Epoch')
|
|
# plt.legend(['Train', 'Validation'], loc='lower right')
|
|
|
|
# # Plot answer output accuracy
|
|
# plt.subplot(2, 2, 3)
|
|
# plt.plot(history.history['answer_output_accuracy'])
|
|
# plt.plot(history.history['val_answer_output_accuracy'])
|
|
# plt.title('Answer Output Accuracy')
|
|
# plt.ylabel('Accuracy')
|
|
# plt.xlabel('Epoch')
|
|
# plt.legend(['Train', 'Validation'], loc='lower right')
|
|
|
|
# plt.tight_layout()
|
|
# plt.savefig("training_history.png")
|
|
# plt.show()
|