474 lines
15 KiB
Python
474 lines
15 KiB
Python
import numpy as np
|
|
import json
|
|
import random
|
|
import tensorflow as tf
|
|
from tensorflow.keras.preprocessing.text import Tokenizer
|
|
from tensorflow.keras.preprocessing.sequence import pad_sequences
|
|
from tensorflow.keras.models import Model, load_model
|
|
from tensorflow.keras.layers import (
|
|
Input,
|
|
LSTM,
|
|
Dense,
|
|
Embedding,
|
|
Bidirectional,
|
|
Concatenate,
|
|
Dropout,
|
|
)
|
|
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
|
|
from sklearn.model_selection import train_test_split
|
|
import matplotlib.pyplot as plt
|
|
import re
|
|
from rouge_score import rouge_scorer
|
|
from nltk.translate.bleu_score import sentence_bleu
|
|
|
|
# Load data
|
|
with open("data_converted.json", "r") as f:
|
|
data = json.load(f)
|
|
|
|
|
|
# Preprocessing function
|
|
def preprocess_text(text):
|
|
"""Melakukan preprocessing teks dasar"""
|
|
text = text.lower()
|
|
text = re.sub(r"\s+", " ", text).strip()
|
|
return text
|
|
|
|
|
|
# Persiapkan data untuk model prediksi pertanyaan
|
|
def prepare_question_prediction_data(data):
|
|
"""Siapkan data untuk model prediksi pertanyaan"""
|
|
contexts = []
|
|
tokens_list = []
|
|
ner_list = []
|
|
srl_list = []
|
|
questions = []
|
|
q_types = []
|
|
|
|
for item in data:
|
|
for qa in item["qas"]:
|
|
contexts.append(preprocess_text(item["context"]))
|
|
tokens_list.append(item["tokens"])
|
|
ner_list.append(item["ner"])
|
|
srl_list.append(item["srl"])
|
|
questions.append(preprocess_text(qa["question"]))
|
|
q_types.append(qa["type"])
|
|
# Tidak mengambil jawaban (answer) sebagai input
|
|
|
|
return contexts, tokens_list, ner_list, srl_list, questions, q_types
|
|
|
|
|
|
# Siapkan data
|
|
contexts, tokens_list, ner_list, srl_list, questions, q_types = (
|
|
prepare_question_prediction_data(data)
|
|
)
|
|
|
|
# Tokenizer untuk teks (context, question)
|
|
max_vocab_size = 10000
|
|
tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="<OOV>")
|
|
all_texts = contexts + questions + [" ".join(item) for item in tokens_list]
|
|
tokenizer.fit_on_texts(all_texts)
|
|
vocab_size = len(tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk NER
|
|
ner_tokenizer = Tokenizer(oov_token="<OOV>")
|
|
ner_tokenizer.fit_on_texts([" ".join(ner) for ner in ner_list])
|
|
ner_vocab_size = len(ner_tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk SRL
|
|
srl_tokenizer = Tokenizer(oov_token="<OOV>")
|
|
srl_tokenizer.fit_on_texts([" ".join(srl) for srl in srl_list])
|
|
srl_vocab_size = len(srl_tokenizer.word_index) + 1
|
|
|
|
# Encoding untuk tipe pertanyaan
|
|
q_type_tokenizer = Tokenizer()
|
|
q_type_tokenizer.fit_on_texts(q_types)
|
|
q_type_vocab_size = len(q_type_tokenizer.word_index) + 1
|
|
|
|
|
|
# Konversi token, ner, srl ke sequences
|
|
def tokens_to_sequences(tokens, ner, srl):
|
|
"""Konversi token, ner, dan srl ke sequences"""
|
|
token_seqs = [tokenizer.texts_to_sequences([" ".join(t)])[0] for t in tokens]
|
|
ner_seqs = [ner_tokenizer.texts_to_sequences([" ".join(n)])[0] for n in ner]
|
|
srl_seqs = [srl_tokenizer.texts_to_sequences([" ".join(s)])[0] for s in srl]
|
|
return token_seqs, ner_seqs, srl_seqs
|
|
|
|
|
|
# Sequences
|
|
context_seqs = tokenizer.texts_to_sequences(contexts)
|
|
question_seqs = tokenizer.texts_to_sequences(questions)
|
|
token_seqs, ner_seqs, srl_seqs = tokens_to_sequences(tokens_list, ner_list, srl_list)
|
|
|
|
# Menentukan panjang maksimum untuk padding
|
|
max_context_len = max([len(seq) for seq in context_seqs])
|
|
max_question_len = max([len(seq) for seq in question_seqs])
|
|
max_token_len = max([len(seq) for seq in token_seqs])
|
|
|
|
|
|
# Pad sequences untuk memastikan semua input sama panjang
|
|
def pad_all_sequences(context_seqs, token_seqs, ner_seqs, srl_seqs, question_seqs):
|
|
"""Padding semua sequences"""
|
|
context_padded = pad_sequences(context_seqs, maxlen=max_context_len, padding="post")
|
|
token_padded = pad_sequences(token_seqs, maxlen=max_token_len, padding="post")
|
|
ner_padded = pad_sequences(ner_seqs, maxlen=max_token_len, padding="post")
|
|
srl_padded = pad_sequences(srl_seqs, maxlen=max_token_len, padding="post")
|
|
question_padded = pad_sequences(
|
|
question_seqs, maxlen=max_question_len, padding="post"
|
|
)
|
|
return (
|
|
context_padded,
|
|
token_padded,
|
|
ner_padded,
|
|
srl_padded,
|
|
question_padded,
|
|
)
|
|
|
|
|
|
# Encode tipe pertanyaan
|
|
q_type_indices = []
|
|
for q_type in q_types:
|
|
q_type_idx = q_type_tokenizer.word_index.get(q_type, 0)
|
|
q_type_indices.append(q_type_idx)
|
|
|
|
# Konversi ke numpy array
|
|
q_type_indices = np.array(q_type_indices)
|
|
|
|
# One-hot encode tipe pertanyaan
|
|
q_type_categorical = tf.keras.utils.to_categorical(
|
|
q_type_indices, num_classes=q_type_vocab_size
|
|
)
|
|
|
|
# Pad sequences
|
|
context_padded, token_padded, ner_padded, srl_padded, question_padded = (
|
|
pad_all_sequences(context_seqs, token_seqs, ner_seqs, srl_seqs, question_seqs)
|
|
)
|
|
|
|
# Split data menjadi train dan test sets
|
|
indices = list(range(len(context_padded)))
|
|
train_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42)
|
|
|
|
|
|
# Fungsi untuk mendapatkan subset dari data berdasarkan indices
|
|
def get_subset(data, indices):
|
|
return np.array([data[i] for i in indices])
|
|
|
|
|
|
# Train data
|
|
train_context = get_subset(context_padded, train_indices)
|
|
train_token = get_subset(token_padded, train_indices)
|
|
train_ner = get_subset(ner_padded, train_indices)
|
|
train_srl = get_subset(srl_padded, train_indices)
|
|
train_q_type = get_subset(q_type_categorical, train_indices)
|
|
train_question = get_subset(question_padded, train_indices)
|
|
|
|
# Test data
|
|
test_context = get_subset(context_padded, test_indices)
|
|
test_token = get_subset(token_padded, test_indices)
|
|
test_ner = get_subset(ner_padded, test_indices)
|
|
test_srl = get_subset(srl_padded, test_indices)
|
|
test_q_type = get_subset(q_type_categorical, test_indices)
|
|
test_question = get_subset(question_padded, test_indices)
|
|
|
|
# Hyperparameters
|
|
embedding_dim = 100
|
|
lstm_units = 128
|
|
ner_embedding_dim = 50
|
|
srl_embedding_dim = 50
|
|
dropout_rate = 0.3
|
|
|
|
|
|
# Function untuk membuat model prediksi pertanyaan
|
|
def create_question_prediction_model():
|
|
# Input layers
|
|
context_input = Input(shape=(max_context_len,), name="context_input")
|
|
token_input = Input(shape=(max_token_len,), name="token_input")
|
|
ner_input = Input(shape=(max_token_len,), name="ner_input")
|
|
srl_input = Input(shape=(max_token_len,), name="srl_input")
|
|
q_type_input = Input(shape=(q_type_vocab_size,), name="q_type_input")
|
|
|
|
# Shared embedding layer for text
|
|
text_embedding = Embedding(vocab_size, embedding_dim, name="text_embedding")
|
|
|
|
# Embedding untuk NER dan SRL
|
|
ner_embedding = Embedding(ner_vocab_size, ner_embedding_dim, name="ner_embedding")(
|
|
ner_input
|
|
)
|
|
srl_embedding = Embedding(srl_vocab_size, srl_embedding_dim, name="srl_embedding")(
|
|
srl_input
|
|
)
|
|
|
|
# Apply embeddings
|
|
context_embed = text_embedding(context_input)
|
|
token_embed = text_embedding(token_input)
|
|
|
|
# Bi-directional LSTM untuk context dan token-level features
|
|
context_lstm = Bidirectional(
|
|
LSTM(lstm_units, return_sequences=True, name="context_lstm")
|
|
)(context_embed)
|
|
|
|
# Concat token features (tokens, NER, SRL)
|
|
token_features = Concatenate(name="token_features")(
|
|
[token_embed, ner_embedding, srl_embedding]
|
|
)
|
|
token_lstm = Bidirectional(
|
|
LSTM(lstm_units, return_sequences=True, name="token_lstm")
|
|
)(token_features)
|
|
|
|
# Apply attention to context LSTM
|
|
context_attention = tf.keras.layers.Attention(name="context_attention")(
|
|
[context_lstm, context_lstm]
|
|
)
|
|
|
|
# Pool attention outputs
|
|
context_att_pool = tf.keras.layers.GlobalMaxPooling1D(name="context_att_pool")(
|
|
context_attention
|
|
)
|
|
token_pool = tf.keras.layers.GlobalMaxPooling1D(name="token_pool")(token_lstm)
|
|
|
|
# Concat all features (tidak ada answer feature)
|
|
all_features = Concatenate(name="all_features")(
|
|
[context_att_pool, token_pool, q_type_input]
|
|
)
|
|
|
|
# Dense layers with expanded capacity for sequence generation
|
|
x = Dense(512, activation="relu", name="dense_1")(all_features)
|
|
x = Dropout(dropout_rate)(x)
|
|
x = Dense(256, activation="relu", name="dense_2")(x)
|
|
x = Dropout(dropout_rate)(x)
|
|
|
|
# Reshape untuk sequence decoder
|
|
decoder_dense = Dense(vocab_size, activation="softmax", name="decoder_dense")
|
|
|
|
# Many-to-many architecture for sequence generation
|
|
# Decoder LSTM
|
|
decoder_lstm = LSTM(lstm_units * 2, return_sequences=True, name="decoder_lstm")
|
|
|
|
# Reshape untuk input ke decoder
|
|
decoder_input = Dense(lstm_units * 2, activation="relu", name="decoder_input")(x)
|
|
|
|
# Decoder sequence with teacher forcing
|
|
# Expand dimensionality to match expected sequence length
|
|
repeated_vector = tf.keras.layers.RepeatVector(max_question_len)(decoder_input)
|
|
|
|
# Process through decoder LSTM
|
|
decoder_outputs = decoder_lstm(repeated_vector)
|
|
|
|
# Apply dense layer to each timestep
|
|
question_output_seq = tf.keras.layers.TimeDistributed(decoder_dense)(
|
|
decoder_outputs
|
|
)
|
|
|
|
# Create model
|
|
model = Model(
|
|
inputs=[
|
|
context_input,
|
|
token_input,
|
|
ner_input,
|
|
srl_input,
|
|
q_type_input,
|
|
],
|
|
outputs=question_output_seq,
|
|
)
|
|
|
|
# Compile model with categorical crossentropy for sequence prediction
|
|
model.compile(
|
|
optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"]
|
|
)
|
|
|
|
return model
|
|
|
|
|
|
# Buat model
|
|
model = create_question_prediction_model()
|
|
model.summary()
|
|
|
|
# Callback untuk menyimpan model terbaik
|
|
checkpoint = ModelCheckpoint(
|
|
"question_prediction_model.h5",
|
|
monitor="val_accuracy",
|
|
save_best_only=True,
|
|
verbose=1,
|
|
)
|
|
|
|
early_stop = EarlyStopping(monitor="val_accuracy", patience=10, verbose=1)
|
|
|
|
# Reshaping question data for sequence-to-sequence training
|
|
# We need to reshape to (samples, max_question_len, 1) for sparse categorical crossentropy
|
|
train_question_target = np.expand_dims(train_question, -1)
|
|
test_question_target = np.expand_dims(test_question, -1)
|
|
|
|
# Training parameters
|
|
batch_size = 8
|
|
epochs = 50
|
|
|
|
# Train model
|
|
history = model.fit(
|
|
[train_context, train_token, train_ner, train_srl, train_q_type],
|
|
train_question_target,
|
|
batch_size=batch_size,
|
|
epochs=epochs,
|
|
validation_data=(
|
|
[test_context, test_token, test_ner, test_srl, test_q_type],
|
|
test_question_target,
|
|
),
|
|
callbacks=[checkpoint, early_stop],
|
|
)
|
|
|
|
# Plot training history
|
|
plt.figure(figsize=(12, 4))
|
|
plt.subplot(1, 2, 1)
|
|
plt.plot(history.history["accuracy"])
|
|
plt.plot(history.history["val_accuracy"])
|
|
plt.title("Model Accuracy")
|
|
plt.ylabel("Accuracy")
|
|
plt.xlabel("Epoch")
|
|
plt.legend(["Train", "Validation"], loc="upper left")
|
|
|
|
plt.subplot(1, 2, 2)
|
|
plt.plot(history.history["loss"])
|
|
plt.plot(history.history["val_loss"])
|
|
plt.title("Model Loss")
|
|
plt.ylabel("Loss")
|
|
plt.xlabel("Epoch")
|
|
plt.legend(["Train", "Validation"], loc="upper left")
|
|
plt.tight_layout()
|
|
plt.savefig("question_prediction_training_history.png")
|
|
plt.show()
|
|
|
|
# Simpan model dan tokenizer
|
|
model.save("question_prediction_model_final.h5")
|
|
|
|
# Simpan tokenizer
|
|
tokenizer_data = {
|
|
"word_tokenizer": tokenizer.to_json(),
|
|
"ner_tokenizer": ner_tokenizer.to_json(),
|
|
"srl_tokenizer": srl_tokenizer.to_json(),
|
|
"q_type_tokenizer": q_type_tokenizer.to_json(),
|
|
"max_context_len": max_context_len,
|
|
"max_question_len": max_question_len,
|
|
"max_token_len": max_token_len,
|
|
}
|
|
|
|
with open("question_prediction_tokenizers.json", "w") as f:
|
|
json.dump(tokenizer_data, f)
|
|
|
|
print("Model dan tokenizer untuk prediksi pertanyaan berhasil disimpan!")
|
|
|
|
|
|
# Fungsi untuk memprediksi pertanyaan
|
|
def predict_question(context, tokens, ner, srl, q_type):
|
|
context = preprocess_text(context)
|
|
|
|
context_seq = tokenizer.texts_to_sequences([context])[0]
|
|
token_seq = tokenizer.texts_to_sequences([" ".join(tokens)])[0]
|
|
ner_seq = ner_tokenizer.texts_to_sequences([" ".join(ner)])[0]
|
|
srl_seq = srl_tokenizer.texts_to_sequences([" ".join(srl)])[0]
|
|
|
|
context_padded = pad_sequences(
|
|
[context_seq], maxlen=max_context_len, padding="post"
|
|
)
|
|
token_padded = pad_sequences([token_seq], maxlen=max_token_len, padding="post")
|
|
ner_padded = pad_sequences([ner_seq], maxlen=max_token_len, padding="post")
|
|
srl_padded = pad_sequences([srl_seq], maxlen=max_token_len, padding="post")
|
|
|
|
# Q-type one-hot encoding
|
|
q_type_idx = q_type_tokenizer.word_index.get(q_type, 0)
|
|
q_type_one_hot = tf.keras.utils.to_categorical(
|
|
[q_type_idx], num_classes=q_type_vocab_size
|
|
)
|
|
|
|
# Predict
|
|
pred = model.predict(
|
|
[context_padded, token_padded, ner_padded, srl_padded, q_type_one_hot],
|
|
verbose=1,
|
|
)
|
|
|
|
# Convert prediction to words
|
|
pred_seq = np.argmax(pred[0], axis=1)
|
|
|
|
# Convert indices to words
|
|
reverse_word_map = {v: k for k, v in tokenizer.word_index.items()}
|
|
pred_words = [reverse_word_map.get(i, "") for i in pred_seq if i != 0]
|
|
|
|
return " ".join(pred_words)
|
|
|
|
|
|
def evaluate_model_performance(test_data):
|
|
|
|
# Initialize ROUGE scorer
|
|
scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)
|
|
|
|
# Lists to store scores
|
|
bleu_scores = []
|
|
rouge1_scores = []
|
|
rouge2_scores = []
|
|
rougel_scores = []
|
|
|
|
# Iterate through test data
|
|
for i in range(len(test_data)):
|
|
# Get test sample
|
|
sample_context = contexts[test_data[i]]
|
|
sample_tokens = tokens_list[test_data[i]]
|
|
sample_ner = ner_list[test_data[i]]
|
|
sample_srl = srl_list[test_data[i]]
|
|
sample_q_type = q_types[test_data[i]]
|
|
actual_question = questions[test_data[i]]
|
|
|
|
# Predict question
|
|
pred_question = predict_question(
|
|
sample_context, sample_tokens, sample_ner, sample_srl, sample_q_type
|
|
)
|
|
|
|
# Tokenize for BLEU score
|
|
actual_tokens = actual_question.split()
|
|
pred_tokens = pred_question.split()
|
|
|
|
# Calculate BLEU score
|
|
# Using unigram, bigram, trigram, and 4-gram
|
|
print("kaliamt aktual", actual_tokens)
|
|
print("kaliamt prediksi", pred_tokens)
|
|
bleu_score = sentence_bleu([actual_tokens], pred_tokens)
|
|
bleu_scores.append(bleu_score)
|
|
|
|
try:
|
|
rouge_scores = scorer.score(actual_question, pred_question)
|
|
|
|
# Extract F1 scores
|
|
rouge1_scores.append(rouge_scores["rouge1"].fmeasure)
|
|
rouge2_scores.append(rouge_scores["rouge2"].fmeasure)
|
|
rougel_scores.append(rouge_scores["rougeL"].fmeasure)
|
|
except Exception as e:
|
|
print(f"Error calculating ROUGE score: {e}")
|
|
|
|
# Calculate average scores
|
|
results = {
|
|
"avg_bleu_score": np.mean(bleu_scores),
|
|
"avg_rouge1": np.mean(rouge1_scores),
|
|
"avg_rouge2": np.mean(rouge2_scores),
|
|
"avg_rougel": np.mean(rougel_scores),
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
loaded_model = load_model("question_prediction_model_final.h5")
|
|
|
|
with open("question_prediction_tokenizers.json", "r") as f:
|
|
tokenizer_data = json.load(f)
|
|
|
|
# Ambil beberapa sampel dari data test
|
|
sample_idx = random.randint(0, len(test_indices) - 1)
|
|
sample_context = contexts[test_indices[sample_idx]]
|
|
sample_tokens = tokens_list[test_indices[sample_idx]]
|
|
sample_ner = ner_list[test_indices[sample_idx]]
|
|
sample_srl = srl_list[test_indices[sample_idx]]
|
|
sample_q_type = q_types[test_indices[sample_idx]]
|
|
|
|
performance_metrics = evaluate_model_performance(test_indices)
|
|
|
|
print("\nModel Performance Metrics:")
|
|
print(f"Average BLEU Score: {performance_metrics['avg_bleu_score']:.4f}")
|
|
print(f"Average ROUGE-1 Score: {performance_metrics['avg_rouge1']:.4f}")
|
|
print(f"Average ROUGE-2 Score: {performance_metrics['avg_rouge2']:.4f}")
|
|
print(f"Average ROUGE-L Score: {performance_metrics['avg_rougel']:.4f}")
|