""" qg_pipeline_static.py ~~~~~~~~~~~~~~~~~~~~~ Question Generation Encoder‑Decoder LSTM dengan fitur simbolik NER & SRL (pipeline statis). Datasets: – train.jsonl / valid.jsonl (lihat format di fungsi `load_jsonl`) """ import json, random, numpy as np, tensorflow as tf from collections import Counter from pathlib import Path from sklearn.model_selection import train_test_split # ------------------------------------------------------------------------------ # 1. UTILITAS DASAR # ------------------------------------------------------------------------------ SEED = 42 random.seed(SEED) np.random.seed(SEED) tf.random.set_seed(SEED) TRAIN_FILE = "QC/dataset_qc.json" VALID_RATIO = 0.10 MAX_CTX_LEN = 50 MAX_Q_LEN = 30 WORD_EMB_DIM = 128 BATCH = 32 EPOCHS = 15 SPECIALS_WORD = ("", "", "", "") SPECIALS_TAG = ("",) def load_jsonl(path): """Muatt satu file JSON‑Lines. Setiap line = dict.""" records = [] with open(path, encoding="utf-8") as f: for line in f: obj = json.loads(line) records.append(obj) return records def build_vocab(list_of_seq, specials): """Bangun (token->id, id->token) dict dari kumpulan sekuens.""" counter = Counter(tok for seq in list_of_seq for tok in seq) itos = list(specials) + [tok for tok, _ in counter.most_common()] stoi = {tok: i for i, tok in enumerate(itos)} return stoi, itos def encode(seq, tbl, max_len): ids = [tbl.get(tok, tbl[""]) for tok in seq] return (ids + [tbl[""]] * max_len)[:max_len] # ------------------------------------------------------------------------------ # 2. DATA PREP # ------------------------------------------------------------------------------ # def prepare_training_data(file_path): # """Load → build vocab → encode ke numpy array.""" # recs = load_jsonl(file_path) # ctx, ner, srl, ques = [], [], [], [] # for r in recs: # ctx.append(r["context_tokens"]) # ner.append(r["ner_tags"]) # srl.append(r["srl_tags"]) # # tambahkan , # ques.append([""] + r["question_tokens"] + [""]) # # 2.1 vocab # w2i_ctx, i2w_ctx = build_vocab(ctx, SPECIALS_WORD[:2]) # , # w2i_q, i2w_q = build_vocab(ques, SPECIALS_WORD) # 4 specials # t2i_ner, _ = build_vocab(ner, SPECIALS_TAG) # t2i_srl, _ = build_vocab(srl, SPECIALS_TAG) # # 2.2 encode & pad # X_tok = np.array([encode(s, w2i_ctx, MAX_CTX_LEN) for s in ctx]) # X_ner = np.array([encode(s, t2i_ner, MAX_CTX_LEN) for s in ner]) # X_srl = np.array([encode(s, t2i_srl, MAX_CTX_LEN) for s in srl]) # Y_in = np.array([encode(s[:-1], w2i_q, MAX_Q_LEN) for s in ques]) # bos..last-1 # Y_out = np.array([encode(s[1:], w2i_q, MAX_Q_LEN) for s in ques]) # 2..eos # return ( # X_tok, # X_ner, # X_srl, # Y_in, # Y_out, # w2i_ctx, # i2w_ctx, # w2i_q, # i2w_q, # t2i_ner, # t2i_srl, # ) # --- ganti fungsi lama --- def prepare_training_data(file_path): recs = load_jsonl(file_path) ctx, ner, srl, ques, span_st, span_ed = [], [], [], [], [], [] for r in recs: tokens = r["tokens"] ctx.append(tokens) # context_tokens ner.append(r["ner"]) srl.append(r["srl"]) # --- hitung answer_span otomatis --- ans_toks = r["answer"].split() try: start = next( i for i in range(len(tokens)) if tokens[i : i + len(ans_toks)] == ans_toks ) end = start + len(ans_toks) - 1 except StopIteration: raise ValueError( f"Jawaban '{r['answer']}' tidak cocok dengan tokens {tokens}" ) span_st.append(start) span_ed.append(end) # question tokens: tokenisasi sederhana ques.append([""] + r["question"].split() + [""]) # ---------- build vocab sama persis ---------- w2i_ctx, i2w_ctx = build_vocab(ctx, SPECIALS_WORD[:2]) w2i_q, i2w_q = build_vocab(ques, SPECIALS_WORD) t2i_ner, _ = build_vocab(ner, SPECIALS_TAG) t2i_srl, _ = build_vocab(srl, SPECIALS_TAG) # ---------- encode ---------- X_tok = np.array([encode(s, w2i_ctx, MAX_CTX_LEN) for s in ctx]) X_ner = np.array([encode(s, t2i_ner, MAX_CTX_LEN) for s in ner]) X_srl = np.array([encode(s, t2i_srl, MAX_CTX_LEN) for s in srl]) Y_in = np.array([encode(s[:-1], w2i_q, MAX_Q_LEN) for s in ques]) Y_out = np.array([encode(s[1:], w2i_q, MAX_Q_LEN) for s in ques]) # simpan span bila nanti mau copy‑mechanism spans = np.array(list(zip(span_st, span_ed))) # (N, 2) return ( X_tok, X_ner, X_srl, Y_in, Y_out, spans, w2i_ctx, i2w_ctx, w2i_q, i2w_q, t2i_ner, t2i_srl, ) print("> Loading dataset …") (X_tok, X_ner, X_srl, Y_in, Y_out, w2i_ctx, i2w_ctx, w2i_q, i2w_q, t2i_ner, t2i_srl) = ( prepare_training_data(TRAIN_FILE) ) train_idx, valid_idx = train_test_split( np.arange(len(X_tok)), test_size=VALID_RATIO, random_state=SEED ) def pick(arr, idx): return arr[idx] train_data = [pick(a, train_idx) for a in (X_tok, X_ner, X_srl, Y_in, Y_out)] valid_data = [pick(a, valid_idx) for a in (X_tok, X_ner, X_srl, Y_in, Y_out)] # ------------------------------------------------------------------------------ # 3. MODEL # ------------------------------------------------------------------------------ def build_model(vocab_ctx, vocab_q, n_ner, n_srl): tok_in = tf.keras.layers.Input((MAX_CTX_LEN,), name="tok") ner_in = tf.keras.layers.Input((MAX_CTX_LEN,), name="ner") srl_in = tf.keras.layers.Input((MAX_CTX_LEN,), name="srl") dec_in = tf.keras.layers.Input((MAX_Q_LEN,), name="dec") tok_emb = tf.keras.layers.Embedding(vocab_ctx, WORD_EMB_DIM, mask_zero=True)(tok_in) ner_emb = tf.keras.layers.Embedding(n_ner, 32, mask_zero=True)(ner_in) srl_emb = tf.keras.layers.Embedding(n_srl, 32, mask_zero=True)(srl_in) enc_in = tf.keras.layers.Concatenate()([tok_emb, ner_emb, srl_emb]) enc_out, fwd_h, fwd_c, bwd_h, bwd_c = tf.keras.layers.Bidirectional( tf.keras.layers.LSTM(WORD_EMB_DIM, return_sequences=True, return_state=True) )(enc_in) state_h = tf.keras.layers.Concatenate()([fwd_h, bwd_h]) state_c = tf.keras.layers.Concatenate()([fwd_c, bwd_c]) dec_emb = tf.keras.layers.Embedding(vocab_q, WORD_EMB_DIM, mask_zero=True)(dec_in) dec_lstm = tf.keras.layers.LSTM( WORD_EMB_DIM * 2, return_sequences=True, return_state=True ) dec_out, _, _ = dec_lstm(dec_emb, initial_state=[state_h, state_c]) # Attention (dot) score = tf.keras.layers.Dot(axes=[2, 2])([dec_out, enc_out]) attn_weights = tf.keras.layers.Activation("softmax")(score) context_vec = tf.keras.layers.Dot(axes=[2, 1])([attn_weights, enc_out]) dec_cat = tf.keras.layers.Concatenate()([dec_out, context_vec]) outputs = tf.keras.layers.TimeDistributed( tf.keras.layers.Dense(vocab_q, activation="softmax") )(dec_cat) mdl = tf.keras.Model([tok_in, ner_in, srl_in, dec_in], outputs) mdl.compile( optimizer="adam", loss="sparse_categorical_crossentropy", metrics=["accuracy"] ) return mdl print("> Building model …") model = build_model(len(w2i_ctx), len(w2i_q), len(t2i_ner), len(t2i_srl)) model.summary(line_length=120) # ------------------------------------------------------------------------------ # 4. DATA GENERATOR # ------------------------------------------------------------------------------ def generator(data, batch=BATCH): X_tok, X_ner, X_srl, Y_inp, Y_outp = data n = len(X_tok) while True: idx = np.random.permutation(n) for i in range(0, n, batch): b = idx[i : i + batch] yield [X_tok[b], X_ner[b], X_srl[b], Y_inp[b]], Y_outp[b][..., None] steps_train = len(train_idx) // BATCH steps_valid = len(valid_idx) // BATCH # ------------------------------------------------------------------------------ # 5. TRAIN # ------------------------------------------------------------------------------ print("> Training …") _ = model.fit( generator(train_data), steps_per_epoch=steps_train, validation_data=generator(valid_data), validation_steps=steps_valid, epochs=EPOCHS, ) model.save("qg_lstm_static.h5") print("✓ Model saved to qg_lstm_static.h5")