import numpy as np import json import random import tensorflow as tf from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.models import Model, load_model from tensorflow.keras.layers import ( Input, LSTM, Dense, Embedding, Bidirectional, Concatenate, Dropout, ) from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping from sklearn.model_selection import train_test_split import matplotlib.pyplot as plt import re from rouge_score import rouge_scorer from nltk.translate.bleu_score import sentence_bleu # Load data with open("data_converted.json", "r") as f: data = json.load(f) # Preprocessing function def preprocess_text(text): """Melakukan preprocessing teks dasar""" text = text.lower() text = re.sub(r"\s+", " ", text).strip() return text # Persiapkan data untuk model prediksi pertanyaan def prepare_question_prediction_data(data): """Siapkan data untuk model prediksi pertanyaan""" contexts = [] tokens_list = [] ner_list = [] srl_list = [] questions = [] q_types = [] for item in data: for qa in item["qas"]: contexts.append(preprocess_text(item["context"])) tokens_list.append(item["tokens"]) ner_list.append(item["ner"]) srl_list.append(item["srl"]) questions.append(preprocess_text(qa["question"])) q_types.append(qa["type"]) # Tidak mengambil jawaban (answer) sebagai input return contexts, tokens_list, ner_list, srl_list, questions, q_types # Siapkan data contexts, tokens_list, ner_list, srl_list, questions, q_types = ( prepare_question_prediction_data(data) ) # Tokenizer untuk teks (context, question) max_vocab_size = 10000 tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="") all_texts = contexts + questions + [" ".join(item) for item in tokens_list] tokenizer.fit_on_texts(all_texts) vocab_size = len(tokenizer.word_index) + 1 # Encoding untuk NER ner_tokenizer = Tokenizer(oov_token="") ner_tokenizer.fit_on_texts([" ".join(ner) for ner in ner_list]) ner_vocab_size = len(ner_tokenizer.word_index) + 1 # Encoding untuk SRL srl_tokenizer = Tokenizer(oov_token="") srl_tokenizer.fit_on_texts([" ".join(srl) for srl in srl_list]) srl_vocab_size = len(srl_tokenizer.word_index) + 1 # Encoding untuk tipe pertanyaan q_type_tokenizer = Tokenizer() q_type_tokenizer.fit_on_texts(q_types) q_type_vocab_size = len(q_type_tokenizer.word_index) + 1 # Konversi token, ner, srl ke sequences def tokens_to_sequences(tokens, ner, srl): """Konversi token, ner, dan srl ke sequences""" token_seqs = [tokenizer.texts_to_sequences([" ".join(t)])[0] for t in tokens] ner_seqs = [ner_tokenizer.texts_to_sequences([" ".join(n)])[0] for n in ner] srl_seqs = [srl_tokenizer.texts_to_sequences([" ".join(s)])[0] for s in srl] return token_seqs, ner_seqs, srl_seqs # Sequences context_seqs = tokenizer.texts_to_sequences(contexts) question_seqs = tokenizer.texts_to_sequences(questions) token_seqs, ner_seqs, srl_seqs = tokens_to_sequences(tokens_list, ner_list, srl_list) # Menentukan panjang maksimum untuk padding max_context_len = max([len(seq) for seq in context_seqs]) max_question_len = max([len(seq) for seq in question_seqs]) max_token_len = max([len(seq) for seq in token_seqs]) # Pad sequences untuk memastikan semua input sama panjang def pad_all_sequences(context_seqs, token_seqs, ner_seqs, srl_seqs, question_seqs): """Padding semua sequences""" context_padded = pad_sequences(context_seqs, maxlen=max_context_len, padding="post") token_padded = pad_sequences(token_seqs, maxlen=max_token_len, padding="post") ner_padded = pad_sequences(ner_seqs, maxlen=max_token_len, padding="post") srl_padded = pad_sequences(srl_seqs, maxlen=max_token_len, padding="post") question_padded = pad_sequences( question_seqs, maxlen=max_question_len, padding="post" ) return ( context_padded, token_padded, ner_padded, srl_padded, question_padded, ) # Encode tipe pertanyaan q_type_indices = [] for q_type in q_types: q_type_idx = q_type_tokenizer.word_index.get(q_type, 0) q_type_indices.append(q_type_idx) # Konversi ke numpy array q_type_indices = np.array(q_type_indices) # One-hot encode tipe pertanyaan q_type_categorical = tf.keras.utils.to_categorical( q_type_indices, num_classes=q_type_vocab_size ) # Pad sequences context_padded, token_padded, ner_padded, srl_padded, question_padded = ( pad_all_sequences(context_seqs, token_seqs, ner_seqs, srl_seqs, question_seqs) ) # Split data menjadi train dan test sets indices = list(range(len(context_padded))) train_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42) # Fungsi untuk mendapatkan subset dari data berdasarkan indices def get_subset(data, indices): return np.array([data[i] for i in indices]) # Train data train_context = get_subset(context_padded, train_indices) train_token = get_subset(token_padded, train_indices) train_ner = get_subset(ner_padded, train_indices) train_srl = get_subset(srl_padded, train_indices) train_q_type = get_subset(q_type_categorical, train_indices) train_question = get_subset(question_padded, train_indices) # Test data test_context = get_subset(context_padded, test_indices) test_token = get_subset(token_padded, test_indices) test_ner = get_subset(ner_padded, test_indices) test_srl = get_subset(srl_padded, test_indices) test_q_type = get_subset(q_type_categorical, test_indices) test_question = get_subset(question_padded, test_indices) # Hyperparameters embedding_dim = 100 lstm_units = 128 ner_embedding_dim = 50 srl_embedding_dim = 50 dropout_rate = 0.3 # Function untuk membuat model prediksi pertanyaan def create_question_prediction_model(): # Input layers context_input = Input(shape=(max_context_len,), name="context_input") token_input = Input(shape=(max_token_len,), name="token_input") ner_input = Input(shape=(max_token_len,), name="ner_input") srl_input = Input(shape=(max_token_len,), name="srl_input") q_type_input = Input(shape=(q_type_vocab_size,), name="q_type_input") # Shared embedding layer for text text_embedding = Embedding(vocab_size, embedding_dim, name="text_embedding") # Embedding untuk NER dan SRL ner_embedding = Embedding(ner_vocab_size, ner_embedding_dim, name="ner_embedding")( ner_input ) srl_embedding = Embedding(srl_vocab_size, srl_embedding_dim, name="srl_embedding")( srl_input ) # Apply embeddings context_embed = text_embedding(context_input) token_embed = text_embedding(token_input) # Bi-directional LSTM untuk context dan token-level features context_lstm = Bidirectional( LSTM(lstm_units, return_sequences=True, name="context_lstm") )(context_embed) # Concat token features (tokens, NER, SRL) token_features = Concatenate(name="token_features")( [token_embed, ner_embedding, srl_embedding] ) token_lstm = Bidirectional( LSTM(lstm_units, return_sequences=True, name="token_lstm") )(token_features) # Apply attention to context LSTM context_attention = tf.keras.layers.Attention(name="context_attention")( [context_lstm, context_lstm] ) # Pool attention outputs context_att_pool = tf.keras.layers.GlobalMaxPooling1D(name="context_att_pool")( context_attention ) token_pool = tf.keras.layers.GlobalMaxPooling1D(name="token_pool")(token_lstm) # Concat all features (tidak ada answer feature) all_features = Concatenate(name="all_features")( [context_att_pool, token_pool, q_type_input] ) # Dense layers with expanded capacity for sequence generation x = Dense(512, activation="relu", name="dense_1")(all_features) x = Dropout(dropout_rate)(x) x = Dense(256, activation="relu", name="dense_2")(x) x = Dropout(dropout_rate)(x) # Reshape untuk sequence decoder decoder_dense = Dense(vocab_size, activation="softmax", name="decoder_dense") # Many-to-many architecture for sequence generation # Decoder LSTM decoder_lstm = LSTM(lstm_units * 2, return_sequences=True, name="decoder_lstm") # Reshape untuk input ke decoder decoder_input = Dense(lstm_units * 2, activation="relu", name="decoder_input")(x) # Decoder sequence with teacher forcing # Expand dimensionality to match expected sequence length repeated_vector = tf.keras.layers.RepeatVector(max_question_len)(decoder_input) # Process through decoder LSTM decoder_outputs = decoder_lstm(repeated_vector) # Apply dense layer to each timestep question_output_seq = tf.keras.layers.TimeDistributed(decoder_dense)( decoder_outputs ) # Create model model = Model( inputs=[ context_input, token_input, ner_input, srl_input, q_type_input, ], outputs=question_output_seq, ) # Compile model with categorical crossentropy for sequence prediction model.compile( optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"] ) return model # Buat model model = create_question_prediction_model() model.summary() # Callback untuk menyimpan model terbaik checkpoint = ModelCheckpoint( "question_prediction_model.h5", monitor="val_accuracy", save_best_only=True, verbose=1, ) early_stop = EarlyStopping(monitor="val_accuracy", patience=10, verbose=1) # Reshaping question data for sequence-to-sequence training # We need to reshape to (samples, max_question_len, 1) for sparse categorical crossentropy train_question_target = np.expand_dims(train_question, -1) test_question_target = np.expand_dims(test_question, -1) # Training parameters batch_size = 8 epochs = 50 # Train model history = model.fit( [train_context, train_token, train_ner, train_srl, train_q_type], train_question_target, batch_size=batch_size, epochs=epochs, validation_data=( [test_context, test_token, test_ner, test_srl, test_q_type], test_question_target, ), callbacks=[checkpoint, early_stop], ) # Plot training history plt.figure(figsize=(12, 4)) plt.subplot(1, 2, 1) plt.plot(history.history["accuracy"]) plt.plot(history.history["val_accuracy"]) plt.title("Model Accuracy") plt.ylabel("Accuracy") plt.xlabel("Epoch") plt.legend(["Train", "Validation"], loc="upper left") plt.subplot(1, 2, 2) plt.plot(history.history["loss"]) plt.plot(history.history["val_loss"]) plt.title("Model Loss") plt.ylabel("Loss") plt.xlabel("Epoch") plt.legend(["Train", "Validation"], loc="upper left") plt.tight_layout() plt.savefig("question_prediction_training_history.png") plt.show() # Simpan model dan tokenizer model.save("question_prediction_model_final.h5") # Simpan tokenizer tokenizer_data = { "word_tokenizer": tokenizer.to_json(), "ner_tokenizer": ner_tokenizer.to_json(), "srl_tokenizer": srl_tokenizer.to_json(), "q_type_tokenizer": q_type_tokenizer.to_json(), "max_context_len": max_context_len, "max_question_len": max_question_len, "max_token_len": max_token_len, } with open("question_prediction_tokenizers.json", "w") as f: json.dump(tokenizer_data, f) print("Model dan tokenizer untuk prediksi pertanyaan berhasil disimpan!") # Fungsi untuk memprediksi pertanyaan def predict_question(context, tokens, ner, srl, q_type): context = preprocess_text(context) context_seq = tokenizer.texts_to_sequences([context])[0] token_seq = tokenizer.texts_to_sequences([" ".join(tokens)])[0] ner_seq = ner_tokenizer.texts_to_sequences([" ".join(ner)])[0] srl_seq = srl_tokenizer.texts_to_sequences([" ".join(srl)])[0] context_padded = pad_sequences( [context_seq], maxlen=max_context_len, padding="post" ) token_padded = pad_sequences([token_seq], maxlen=max_token_len, padding="post") ner_padded = pad_sequences([ner_seq], maxlen=max_token_len, padding="post") srl_padded = pad_sequences([srl_seq], maxlen=max_token_len, padding="post") # Q-type one-hot encoding q_type_idx = q_type_tokenizer.word_index.get(q_type, 0) q_type_one_hot = tf.keras.utils.to_categorical( [q_type_idx], num_classes=q_type_vocab_size ) # Predict pred = model.predict( [context_padded, token_padded, ner_padded, srl_padded, q_type_one_hot], verbose=1, ) # Convert prediction to words pred_seq = np.argmax(pred[0], axis=1) # Convert indices to words reverse_word_map = {v: k for k, v in tokenizer.word_index.items()} pred_words = [reverse_word_map.get(i, "") for i in pred_seq if i != 0] return " ".join(pred_words) def evaluate_model_performance(test_data): # Initialize ROUGE scorer scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True) # Lists to store scores bleu_scores = [] rouge1_scores = [] rouge2_scores = [] rougel_scores = [] # Iterate through test data for i in range(len(test_data)): # Get test sample sample_context = contexts[test_data[i]] sample_tokens = tokens_list[test_data[i]] sample_ner = ner_list[test_data[i]] sample_srl = srl_list[test_data[i]] sample_q_type = q_types[test_data[i]] actual_question = questions[test_data[i]] # Predict question pred_question = predict_question( sample_context, sample_tokens, sample_ner, sample_srl, sample_q_type ) # Tokenize for BLEU score actual_tokens = actual_question.split() pred_tokens = pred_question.split() # Calculate BLEU score # Using unigram, bigram, trigram, and 4-gram print("kaliamt aktual", actual_tokens) print("kaliamt prediksi", pred_tokens) bleu_score = sentence_bleu([actual_tokens], pred_tokens) bleu_scores.append(bleu_score) try: rouge_scores = scorer.score(actual_question, pred_question) # Extract F1 scores rouge1_scores.append(rouge_scores["rouge1"].fmeasure) rouge2_scores.append(rouge_scores["rouge2"].fmeasure) rougel_scores.append(rouge_scores["rougeL"].fmeasure) except Exception as e: print(f"Error calculating ROUGE score: {e}") # Calculate average scores results = { "avg_bleu_score": np.mean(bleu_scores), "avg_rouge1": np.mean(rouge1_scores), "avg_rouge2": np.mean(rouge2_scores), "avg_rougel": np.mean(rougel_scores), } return results loaded_model = load_model("question_prediction_model_final.h5") with open("question_prediction_tokenizers.json", "r") as f: tokenizer_data = json.load(f) # Ambil beberapa sampel dari data test sample_idx = random.randint(0, len(test_indices) - 1) sample_context = contexts[test_indices[sample_idx]] sample_tokens = tokens_list[test_indices[sample_idx]] sample_ner = ner_list[test_indices[sample_idx]] sample_srl = srl_list[test_indices[sample_idx]] sample_q_type = q_types[test_indices[sample_idx]] performance_metrics = evaluate_model_performance(test_indices) print("\nModel Performance Metrics:") print(f"Average BLEU Score: {performance_metrics['avg_bleu_score']:.4f}") print(f"Average ROUGE-1 Score: {performance_metrics['avg_rouge1']:.4f}") print(f"Average ROUGE-2 Score: {performance_metrics['avg_rouge2']:.4f}") print(f"Average ROUGE-L Score: {performance_metrics['avg_rougel']:.4f}")