import numpy as np import pandas as pd import json import random import tensorflow as tf from tensorflow.keras.preprocessing.text import Tokenizer from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.models import Model, load_model from tensorflow.keras.layers import ( Input, LSTM, Dense, Embedding, Bidirectional, Concatenate, Attention, Dropout, ) from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping from sklearn.model_selection import train_test_split import matplotlib.pyplot as plt import re with open("data_converted.json", "r") as f: data = json.load(f) # Preprocessing function def preprocess_text(text): """Melakukan preprocessing teks dasar""" text = text.lower() text = re.sub(r"\s+", " ", text).strip() return text # Persiapkan data untuk model def prepare_data(data): """Siapkan data untuk model""" contexts = [] tokens_list = [] ner_list = [] srl_list = [] questions = [] answers = [] q_types = [] for item in data: for qa in item["qas"]: contexts.append(preprocess_text(item["context"])) tokens_list.append(item["tokens"]) ner_list.append(item["ner"]) srl_list.append(item["srl"]) questions.append(preprocess_text(qa["question"])) answers.append(qa["answer"]) q_types.append(qa["type"]) return contexts, tokens_list, ner_list, srl_list, questions, answers, q_types contexts, tokens_list, ner_list, srl_list, questions, answers, q_types = prepare_data( data ) max_vocab_size = 10000 tokenizer = Tokenizer(num_words=max_vocab_size, oov_token="") tokenizer.fit_on_texts(contexts + questions + [" ".join(item) for item in tokens_list]) vocab_size = len(tokenizer.word_index) + 1 # Encoding untuk NER ner_tokenizer = Tokenizer(oov_token="") ner_tokenizer.fit_on_texts([" ".join(ner) for ner in ner_list]) ner_vocab_size = len(ner_tokenizer.word_index) + 1 # Encoding untuk SRL srl_tokenizer = Tokenizer(oov_token="") srl_tokenizer.fit_on_texts([" ".join(srl) for srl in srl_list]) srl_vocab_size = len(srl_tokenizer.word_index) + 1 # Encoding untuk tipe pertanyaan q_type_tokenizer = Tokenizer() q_type_tokenizer.fit_on_texts(q_types) q_type_vocab_size = len(q_type_tokenizer.word_index) + 1 # Konversi token, ner, srl ke sequences def tokens_to_sequences(tokens, ner, srl): """Konversi token, ner, dan srl ke sequences""" token_seqs = [tokenizer.texts_to_sequences([" ".join(t)])[0] for t in tokens] ner_seqs = [ner_tokenizer.texts_to_sequences([" ".join(n)])[0] for n in ner] srl_seqs = [srl_tokenizer.texts_to_sequences([" ".join(s)])[0] for s in srl] return token_seqs, ner_seqs, srl_seqs # Menentukan panjang maksimum untuk padding context_seqs = tokenizer.texts_to_sequences(contexts) question_seqs = tokenizer.texts_to_sequences(questions) token_seqs, ner_seqs, srl_seqs = tokens_to_sequences(tokens_list, ner_list, srl_list) max_context_len = max([len(seq) for seq in context_seqs]) max_question_len = max([len(seq) for seq in question_seqs]) max_token_len = max([len(seq) for seq in token_seqs]) # Pad sequences untuk memastikan semua input sama panjang def pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs): """Padding semua sequences""" context_padded = pad_sequences(context_seqs, maxlen=max_context_len, padding="post") question_padded = pad_sequences( question_seqs, maxlen=max_question_len, padding="post" ) token_padded = pad_sequences(token_seqs, maxlen=max_token_len, padding="post") ner_padded = pad_sequences(ner_seqs, maxlen=max_token_len, padding="post") srl_padded = pad_sequences(srl_seqs, maxlen=max_token_len, padding="post") return context_padded, question_padded, token_padded, ner_padded, srl_padded # Siapkan encoder untuk jawaban answer_tokenizer = Tokenizer(oov_token="") answer_tokenizer.fit_on_texts(answers) answer_vocab_size = len(answer_tokenizer.word_index) + 1 # Encode tipe pertanyaan - FIX - Menggunakan indeks langsung bukan sequence q_type_indices = [] for q_type in q_types: # Dapatkan indeks tipe pertanyaan (dikurangi 1 karena indeks dimulai dari 1) q_type_idx = q_type_tokenizer.word_index.get(q_type, 0) q_type_indices.append(q_type_idx) # Konversi ke numpy array q_type_indices = np.array(q_type_indices) # One-hot encode tipe pertanyaan q_type_categorical = tf.keras.utils.to_categorical( q_type_indices, num_classes=q_type_vocab_size ) # Pad sequences context_padded, question_padded, token_padded, ner_padded, srl_padded = ( pad_all_sequences(context_seqs, question_seqs, token_seqs, ner_seqs, srl_seqs) ) # Encode jawaban answer_seqs = answer_tokenizer.texts_to_sequences(answers) max_answer_len = max([len(seq) for seq in answer_seqs]) answer_padded = pad_sequences(answer_seqs, maxlen=max_answer_len, padding="post") # Split data menjadi train dan test sets indices = list(range(len(context_padded))) train_indices, test_indices = train_test_split(indices, test_size=0.2, random_state=42) # Fungsi untuk mendapatkan subset dari data berdasarkan indices def get_subset(data, indices): return np.array([data[i] for i in indices]) # Train data train_context = get_subset(context_padded, train_indices) train_question = get_subset(question_padded, train_indices) train_token = get_subset(token_padded, train_indices) train_ner = get_subset(ner_padded, train_indices) train_srl = get_subset(srl_padded, train_indices) train_q_type = get_subset(q_type_categorical, train_indices) train_answer = get_subset(answer_padded, train_indices) # Test data test_context = get_subset(context_padded, test_indices) test_question = get_subset(question_padded, test_indices) test_token = get_subset(token_padded, test_indices) test_ner = get_subset(ner_padded, test_indices) test_srl = get_subset(srl_padded, test_indices) test_q_type = get_subset(q_type_categorical, test_indices) test_answer = get_subset(answer_padded, test_indices) # Hyperparameters embedding_dim = 100 lstm_units = 128 ner_embedding_dim = 50 srl_embedding_dim = 50 dropout_rate = 0.3 # Function untuk membuat model dengan dua output: pertanyaan dan jawaban def create_qa_generator_model(): # Input layers context_input = Input(shape=(max_context_len,), name="context_input") token_input = Input(shape=(max_token_len,), name="token_input") ner_input = Input(shape=(max_token_len,), name="ner_input") srl_input = Input(shape=(max_token_len,), name="srl_input") # Tidak perlu question_input dan q_type_input untuk proses generasi # karena ini akan menjadi output # Shared embedding layer for text text_embedding = Embedding(vocab_size, embedding_dim, name="text_embedding") # Embedding untuk NER dan SRL ner_embedding = Embedding(ner_vocab_size, ner_embedding_dim, name="ner_embedding")( ner_input ) srl_embedding = Embedding(srl_vocab_size, srl_embedding_dim, name="srl_embedding")( srl_input ) # Apply embeddings context_embed = text_embedding(context_input) token_embed = text_embedding(token_input) # Bi-directional LSTM untuk context dan token-level features context_lstm = Bidirectional( LSTM(lstm_units, return_sequences=True, name="context_lstm") )(context_embed) # Concat token features (tokens, NER, SRL) token_features = Concatenate(name="token_features")( [token_embed, ner_embedding, srl_embedding] ) token_lstm = Bidirectional( LSTM(lstm_units, return_sequences=True, name="token_lstm") )(token_features) # Pool outputs context_pool = tf.keras.layers.GlobalMaxPooling1D(name="context_pool")(context_lstm) token_pool = tf.keras.layers.GlobalMaxPooling1D(name="token_pool")(token_lstm) # Concat all features all_features = Concatenate(name="all_features")([context_pool, token_pool]) # Shared layers shared = Dense(256, activation="relu", name="shared_dense_1")(all_features) shared = Dropout(dropout_rate)(shared) shared = Dense(128, activation="relu", name="shared_dense_2")(shared) shared = Dropout(dropout_rate)(shared) # Branch untuk pertanyaan question_branch = Dense(256, activation="relu", name="question_dense")(shared) question_branch = Dropout(dropout_rate)(question_branch) # Branch untuk jawaban answer_branch = Dense(256, activation="relu", name="answer_dense")(shared) answer_branch = Dropout(dropout_rate)(answer_branch) # Output layers # Untuk pertanyaan, kita buat layer decoder berbasis LSTM untuk menghasilkan sequence kata # sebagai pertanyaan question_decoder = LSTM(lstm_units, return_sequences=True, name="question_decoder")( tf.keras.layers.RepeatVector(max_question_len)(question_branch) ) question_output = Dense(vocab_size, activation="softmax", name="question_output")( question_decoder ) # Output layer untuk jawaban answer_output = Dense( answer_vocab_size, activation="softmax", name="answer_output" )(answer_branch) # Create model model = Model( inputs=[ context_input, token_input, ner_input, srl_input, ], outputs=[question_output, answer_output], ) # Compile model dengan loss function dan metrics untuk kedua output model.compile( optimizer="adam", loss={ "question_output": "categorical_crossentropy", "answer_output": "sparse_categorical_crossentropy", }, metrics={"question_output": "accuracy", "answer_output": "accuracy"}, loss_weights={"question_output": 1.0, "answer_output": 1.0}, ) return model # Persiapkan target untuk pertanyaan (one-hot encoded) # Untuk pertanyaan, kita perlu mengubah ke format categorical karena kita memprediksi # setiap kata di sequence secara bersamaan def prepare_question_target(question_padded): question_target = [] for question in question_padded: # One-hot encode setiap token dalam sequence sequence_target = [] for token in question: # Buat vektor one-hot untuk token ini token_target = tf.keras.utils.to_categorical(token, num_classes=vocab_size) sequence_target.append(token_target) question_target.append(sequence_target) return np.array(question_target) # Siapkan target untuk question output train_question_target = prepare_question_target(train_question) test_question_target = prepare_question_target(test_question) # Ubah format jawaban untuk sparse categorical crossentropy train_answer_labels = train_answer[:, 0] # Ambil indeks pertama dari jawaban test_answer_labels = test_answer[:, 0] # Buat model model = create_qa_generator_model() model.summary() # Callback untuk menyimpan model terbaik checkpoint = ModelCheckpoint( "qa_generator_model.h5", monitor="val_question_output_accuracy", save_best_only=True, verbose=1, mode="max", ) early_stop = EarlyStopping( monitor="val_question_output_accuracy", patience=5, verbose=1, mode="max" ) # Training batch_size = 8 epochs = 50 # Train model history = model.fit( [train_context, train_token, train_ner, train_srl], {"question_output": train_question_target, "answer_output": train_answer_labels}, batch_size=batch_size, epochs=epochs, validation_data=( [test_context, test_token, test_ner, test_srl], {"question_output": test_question_target, "answer_output": test_answer_labels}, ), callbacks=[checkpoint, early_stop], ) model.save("qa_generator_model_final.keras") # Simpan tokenizer tokenizer_data = { "word_tokenizer": tokenizer.to_json(), "ner_tokenizer": ner_tokenizer.to_json(), "srl_tokenizer": srl_tokenizer.to_json(), "answer_tokenizer": answer_tokenizer.to_json(), "q_type_tokenizer": q_type_tokenizer.to_json(), "max_context_len": max_context_len, "max_question_len": max_question_len, "max_token_len": max_token_len, } with open("qa_generator_tokenizers.json", "w") as f: json.dump(tokenizer_data, f) # Fungsi untuk prediksi def predict_question_and_answer(model, context, tokens, ner, srl): """ Prediksi pertanyaan dan jawaban berdasarkan konteks, tokens, NER, dan SRL """ # Preprocess input context_seq = tokenizer.texts_to_sequences([preprocess_text(context)]) context_padded = pad_sequences(context_seq, maxlen=max_context_len, padding="post") token_seq = tokenizer.texts_to_sequences([" ".join(tokens)]) token_padded = pad_sequences(token_seq, maxlen=max_token_len, padding="post") ner_seq = ner_tokenizer.texts_to_sequences([" ".join(ner)]) ner_padded = pad_sequences(ner_seq, maxlen=max_token_len, padding="post") srl_seq = srl_tokenizer.texts_to_sequences([" ".join(srl)]) srl_padded = pad_sequences(srl_seq, maxlen=max_token_len, padding="post") # Prediksi question_pred, answer_pred = model.predict( [context_padded, token_padded, ner_padded, srl_padded] ) # Decode pertanyaan (mengambil indeks dengan probabilitas tertinggi di setiap posisi) question_indices = np.argmax(question_pred[0], axis=1) question_words = [] # Reverse word index untuk mendapatkan kata dari indeks word_index = tokenizer.word_index index_word = {v: k for k, v in word_index.items()} # Decode pertanyaan for idx in question_indices: if idx != 0: # Skip padding (index 0) word = index_word.get(idx, "") question_words.append(word) else: break # Stop at padding # Decode jawaban answer_idx = np.argmax(answer_pred[0]) # Reverse word index untuk jawaban answer_word_index = answer_tokenizer.word_index answer_index_word = {v: k for k, v in answer_word_index.items()} answer = answer_index_word.get(answer_idx, "") # Bentuk pertanyaan question = " ".join(question_words) return question, answer # Contoh penggunaan # Catatan: Ini hanya contoh, perlu data aktual saat implementasi """ sample_context = "Selamat pagi, sekarang adalah hari Senin." sample_tokens = ["selamat", "pagi", "sekarang", "adalah", "hari", "senin"] sample_ner = ["O", "O", "O", "O", "O", "B-TIME"] sample_srl = ["B-V", "B-ARG1", "B-ARGM-TMP", "B-ARGM-PRD", "I-ARGM-PRD", "I-ARGM-PRD"] # Load model yang sudah dilatih loaded_model = load_model("qa_generator_model_final.keras") # Prediksi question, answer = predict_question_and_answer( loaded_model, sample_context, sample_tokens, sample_ner, sample_srl ) print("Konteks:", sample_context) print("Pertanyaan yang dihasilkan:", question) print("Jawaban yang dihasilkan:", answer) """ sample = { "context": "kerajaan majapahit berdiri pada tahun 1293 di trowulan", "tokens": [ "kerajaan", "majapahit", "berdiri", "pada", "tahun", "1293", "di", "trowulan", ], "ner": ["O", "ORG", "O", "O", "O", "DATE", "O", "LOC"], "srl": ["ARG1", "ARG1", "V", "O", "O", "ARGM-TMP", "O", "ARGM-LOC"], } question, answer = predict_question_and_answer( model, sample["context"], sample["tokens"], sample["ner"], sample["srl"] ) print("Konteks:", sample["context"]) print("Pertanyaan yang dihasilkan:", question) print("Jawaban yang dihasilkan:", answer) # Plot history training # plt.figure(figsize=(12, 8)) # # Plot loss # plt.subplot(2, 2, 1) # plt.plot(history.history['loss']) # plt.plot(history.history['val_loss']) # plt.title('Model Loss') # plt.ylabel('Loss') # plt.xlabel('Epoch') # plt.legend(['Train', 'Validation'], loc='upper right') # # Plot question output accuracy # plt.subplot(2, 2, 2) # plt.plot(history.history['question_output_accuracy']) # plt.plot(history.history['val_question_output_accuracy']) # plt.title('Question Output Accuracy') # plt.ylabel('Accuracy') # plt.xlabel('Epoch') # plt.legend(['Train', 'Validation'], loc='lower right') # # Plot answer output accuracy # plt.subplot(2, 2, 3) # plt.plot(history.history['answer_output_accuracy']) # plt.plot(history.history['val_answer_output_accuracy']) # plt.title('Answer Output Accuracy') # plt.ylabel('Accuracy') # plt.xlabel('Epoch') # plt.legend(['Train', 'Validation'], loc='lower right') # plt.tight_layout() # plt.savefig("training_history.png") # plt.show()