import pandas as pd import numpy as np from sklearn.neighbors import KNeighborsClassifier from sklearn.preprocessing import StandardScaler from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV from sklearn.metrics import accuracy_score, classification_report, confusion_matrix import joblib import os from datetime import datetime import config #inisialisasi KNN class JurusanKNNClassifier: def __init__(self, n_neighbors=None, weights=None, metric=None): if n_neighbors is None: #menetapkan parameter KNN default dari config jika tidak diberikan saat inisialisasi n_neighbors = config.DEFAULT_K_NEIGHBORS if weights is None: weights = getattr(config, 'KNN_WEIGHTS', 'uniform') #semua tetangga punya bobot yang sama saat voting if metric is None: metric = getattr(config, 'KNN_METRIC', 'minkowski') #cara hitung jarak antar data self.knn = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, metric=metric) #membuat scaler untuk normalisasi data self.scaler = StandardScaler() #menandai model belum dilatih self.is_trained = False self.feature_columns = [] self.training_accuracy = 0.0 self.model_last_trained = None def check_model_exists(self, model_path): """Check if trained model exists""" return os.path.exists(model_path) def _get_expected_knn_params(self): """Get current expected KNN params from config.""" return { 'n_neighbors': config.DEFAULT_K_NEIGHBORS, 'weights': getattr(config, 'KNN_WEIGHTS', 'uniform'), 'metric': getattr(config, 'KNN_METRIC', 'minkowski') } def _saved_model_matches_config(self, model_data): """Check whether saved model KNN params match current config.""" knn = model_data.get('knn') if isinstance(model_data, dict) else None if knn is None: return False expected = self._get_expected_knn_params() return ( getattr(knn, 'n_neighbors', None) == expected['n_neighbors'] and getattr(knn, 'weights', None) == expected['weights'] and getattr(knn, 'metric', None) == expected['metric'] ) def needs_retraining(self, data_path, model_path): """Check if model needs retraining based on data freshness""" if not self.check_model_exists(model_path): return True # Retrain when saved model parameters differ from current configuration. try: model_data = joblib.load(model_path) if not self._saved_model_matches_config(model_data): return True except Exception: return True # Check if data file is newer than model file if os.path.exists(data_path): data_modified = os.path.getmtime(data_path) model_modified = os.path.getmtime(model_path) return data_modified > model_modified return False def auto_train_if_needed(self, data_path, model_path=None, test_data_path=None): """Automatically train model if needed""" if model_path is None: model_path = config.MODEL_PATH try: if self.needs_retraining(data_path, model_path): # print("Training model...") accuracy, report = self.train(data_path, test_data_path=test_data_path) self.save_model(model_path) # print(f"Model trained with accuracy: {accuracy:.4f}") return True, accuracy, report else: # print("Loading existing model...") self.load_model(model_path) return False, self.training_accuracy, None except Exception as e: # print(f"Error in auto training: {str(e)}") return False, 0.0, None #siapkan data fitur dan label untuk training def prepare_data(self, data_path): #Membaca dataset dari file CSV. self.df = pd.read_csv(data_path) # kolom fitur (nilai mata pelajaran) self.feature_columns = ['nilai_informatika', 'nilai_fisika', 'nilai_kimia', 'nilai_biologi', 'nilai_big_lanjut', 'nilai_ekonomi', 'nilai_mat_lanjut', 'nilai_sej_lanjut', 'nilai_sosiologi', 'nilai_geografi'] #data nilai mata pelajaran yang digunakan untuk prediksi X = self.df[self.feature_columns] y = self.df['paket_jurusan'] #label paket jurusan return X, y #untuk melatih model KNN dengan opsi optimasi hyperparameter, cv std def _print_testing_report(self, y_test, y_pred, labels, accuracy=None, cv_scores=None): cm = confusion_matrix(y_test, y_pred, labels=labels) print("\n" + "=" * 80) print("DETAIL HASIL TESTING") print("=" * 80) # Model and evaluation context print("\n0) Konfigurasi Evaluasi") print(f"Model KNN : k={self.knn.n_neighbors}, weights={self.knn.weights}, metric={self.knn.metric}") print(f"Cross Validation : {config.CROSS_VALIDATION_FOLDS}-fold") if accuracy is not None: print(f"Test Accuracy : {accuracy:.4f} ({accuracy * 100:.2f}%)") if cv_scores is not None and len(cv_scores) > 0: cv_mean = float(np.mean(cv_scores)) cv_std = float(np.std(cv_scores)) print(f"CV Mean Accuracy : {cv_mean:.4f} ({cv_mean * 100:.2f}%)") print(f"CV Std Dev : {cv_std:.4f}") # 1) Confusion matrix table cm_df = pd.DataFrame( cm, index=[f"Actual {label}" for label in labels], columns=[f"Pred {label}" for label in labels] ) print("\n1) Confusion Matrix") print(cm_df.to_string()) # 1b) Normalized confusion matrix (% per actual class) cm_row_sum = cm.sum(axis=1, keepdims=True) cm_row_sum[cm_row_sum == 0] = 1 cm_norm = (cm / cm_row_sum) * 100.0 cm_norm_df = pd.DataFrame( cm_norm, index=[f"Actual {label}" for label in labels], columns=[f"Pred {label}" for label in labels] ) print("\n1b) Confusion Matrix Normalized (%)") print(cm_norm_df.to_string(float_format=lambda x: f"{x:.2f}")) # 2) TP, FN, FP, TN table per class (one-vs-rest) tf_table = [] total = int(cm.sum()) for idx, label in enumerate(labels): tp = int(cm[idx, idx]) fn = int(cm[idx, :].sum() - tp) fp = int(cm[:, idx].sum() - tp) tn = int(total - tp - fn - fp) tf_table.append({ 'kelas': label, 'TP': tp, 'FN': fn, 'FP': fp, 'TN': tn }) tf_df = pd.DataFrame(tf_table) print("\n2) Tabel TP/FN/FP/TN per Kelas") print(tf_df.to_string(index=False)) # 3) Actual vs predicted for all test rows result_df = pd.DataFrame({ 'no_data_test': np.arange(1, len(y_test) + 1), 'nilai_sebenarnya': np.array(y_test), 'nilai_prediksi': np.array(y_pred) }) def row_status(actual, pred): return "BENAR" if actual == pred else "SALAH" def row_confusion_status(actual, pred): if actual == pred: return "TP (kelas aktual), TN" return f"FN (kelas {actual}), FP (kelas {pred})" result_df['status'] = [ row_status(actual, pred) for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi']) ] result_df['status_tpfnfp_tn'] = [ row_confusion_status(actual, pred) for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi']) ] print(f"\n3) Nilai Sebenarnya vs Nilai Prediksi ({len(result_df)} data test)") print(result_df.to_string(index=False)) # 4) Classification metrics report_dict = classification_report( y_test, y_pred, labels=labels, output_dict=True, zero_division=0 ) metrics_rows = [] for label in labels: key = str(label) metrics_rows.append({ 'kelas': label, 'precision': report_dict[key]['precision'], 'recall': report_dict[key]['recall'], 'f1-score': report_dict[key]['f1-score'], 'support': int(report_dict[key]['support']) }) metrics_rows.append({ 'kelas': 'accuracy', 'precision': np.nan, 'recall': np.nan, 'f1-score': report_dict['accuracy'], 'support': int(sum(report_dict[str(label)]['support'] for label in labels)) }) metrics_rows.append({ 'kelas': 'macro avg', 'precision': report_dict['macro avg']['precision'], 'recall': report_dict['macro avg']['recall'], 'f1-score': report_dict['macro avg']['f1-score'], 'support': int(report_dict['macro avg']['support']) }) metrics_rows.append({ 'kelas': 'weighted avg', 'precision': report_dict['weighted avg']['precision'], 'recall': report_dict['weighted avg']['recall'], 'f1-score': report_dict['weighted avg']['f1-score'], 'support': int(report_dict['weighted avg']['support']) }) metrics_df = pd.DataFrame(metrics_rows) print("\n4) Precision, Recall, F1-Score, Support, Accuracy, Macro Avg, Weighted Avg") print(metrics_df.to_string(index=False, float_format=lambda x: f"{x:.4f}" if pd.notna(x) else "")) print("=" * 80 + "\n") #training model knn def train(self, data_path, optimize_k=None, test_data_path=None): """Melatih model KNN dengan opsi optimasi hyperparameter""" if optimize_k is None: optimize_k = config.OPTIMIZE_K X, y = self.prepare_data(data_path) # gunakan test_data_path jika diberikan dan valid, jika tidak lakukan split train/test seperti biasa if test_data_path and os.path.exists(test_data_path): X_train, y_train = X, y X_test, y_test = self.prepare_data(test_data_path) else: # Split train/test X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=config.TEST_SIZE, random_state=config.RANDOM_STATE, stratify=y ) # Normalisasi data X_train_scaled = self.scaler.fit_transform(X_train) X_test_scaled = self.scaler.transform(X_test) # Optimasi hyperparameter k jika diaktifkan if optimize_k: best_k = self.find_optimal_k(X_train_scaled, y_train) self.knn = KNeighborsClassifier( n_neighbors=best_k, weights=self.knn.weights, metric=self.knn.metric ) print(f"Optimal k value found: {best_k}") # Training self.knn.fit(X_train_scaled, y_train) # Evaluasi, menguji pada data test untuk menghitung akurasi y_pred = self.knn.predict(X_test_scaled) y_pred_proba = self.knn.predict_proba(X_test_scaled) accuracy = accuracy_score(y_test, y_pred) labels = sorted(set(np.array(y_test)).union(set(np.array(y_pred)))) # Cross-validation score cv_scores = cross_val_score( self.knn, X_train_scaled, y_train, cv=config.CROSS_VALIDATION_FOLDS ) print(f"Cross-validation scores: {cv_scores}") print(f"Average CV score: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})") self.is_trained = True self.training_accuracy = accuracy self.model_last_trained = datetime.now() self.cv_scores = cv_scores self.cv_mean = float(cv_scores.mean()) self.cv_std = float(cv_scores.std()) # Store test data for performance evaluation self.X_test = X_test self.y_test = y_test self.y_pred = y_pred self.y_pred_proba = y_pred_proba self.confusion_matrix = confusion_matrix(y_test, y_pred, labels=labels) self._print_testing_report(y_test, y_pred, labels, accuracy=accuracy, cv_scores=cv_scores) return accuracy, classification_report(y_test, y_pred, zero_division=0) #mencari nilai k terbaik dengan GridSearchCV def find_optimal_k(self, X_train, y_train, k_range=None): """Find optimal k using GridSearchCV (uses config.K_RANGE by default).""" if k_range is None: k_range = list(config.K_RANGE) else: k_range = list(k_range) param_grid = {'n_neighbors': k_range} knn_temp = KNeighborsClassifier(weights=self.knn.weights, metric=self.knn.metric) grid = GridSearchCV( knn_temp, param_grid, cv=config.CROSS_VALIDATION_FOLDS, scoring='accuracy', n_jobs=-1, refit=True ) grid.fit(X_train, y_train) best_k = int(grid.best_params_['n_neighbors']) # store grid object for inspection or display in UI self.grid_search_results = grid return best_k #untuk mendapatkan hasil evaluasi model yang sudah dilatih, termasuk akurasi, classification report, confusion matrix, dan waktu pelatihan terakhir def evaluate_model(self): if not self.is_trained: return None return { 'accuracy': accuracy_score(self.y_test, self.y_pred), 'classification_report': classification_report(self.y_test, self.y_pred, output_dict=True, zero_division=0), 'confusion_matrix': self.confusion_matrix, 'training_time': self.model_last_trained } #prediksi paket jurusan untuk siswa baru def predict(self, nilai_siswa): if not self.is_trained: raise ValueError("Model belum dilatih!") # Konversi input ke DataFrame input_df = pd.DataFrame([nilai_siswa], columns=self.feature_columns) # Data siswa di normalisasi dengan scaler yang sama seperti saat training input_scaled = self.scaler.transform(input_df) # Prediksi prediction = self.knn.predict(input_scaled)[0] probabilities = self.knn.predict_proba(input_scaled)[0] # untuk menampilkan semua prediksi dengan probabilitasnya, urutkan berdasarkan probabilitas tertinggi all_indices = np.argsort(probabilities)[::-1] all_pakets = [self.knn.classes_[i] for i in all_indices] all_probs = [probabilities[i] for i in all_indices] # untuk menampilkan 3 prediksi teratas dengan probabilitasnya top_3_pakets = [] top_3_probs = [] # Tambahkan prediksi utama terlebih dahulu main_prediction_index = np.where(self.knn.classes_ == prediction)[0][0] top_3_pakets.append(prediction) top_3_probs.append(probabilities[main_prediction_index]) # Tambahkan prediksi lain berdasarkan probabilitas tertinggi, kecuali yang sudah menjadi prediksi utama for paket, prob in zip(all_pakets, all_probs): if paket != prediction and len(top_3_pakets) < 3: top_3_pakets.append(paket) top_3_probs.append(prob) return prediction, list(zip(top_3_pakets, top_3_probs)) def save_model(self, path): """Simpan model dengan informasi tambahan""" os.makedirs(os.path.dirname(path), exist_ok=True) model_data = { 'knn': self.knn, 'scaler': self.scaler, 'feature_columns': self.feature_columns, 'is_trained': self.is_trained, 'training_accuracy': self.training_accuracy, 'model_last_trained': self.model_last_trained, 'n_neighbors': self.knn.n_neighbors if self.is_trained else 5 } # Save evaluation results if available if hasattr(self, 'X_test') and hasattr(self, 'y_test'): model_data.update({ 'X_test': self.X_test, 'y_test': self.y_test, 'y_pred': self.y_pred, 'confusion_matrix': self.confusion_matrix }) joblib.dump(model_data, path) def load_model(self, path): """Load model dengan informasi tambahan""" model_data = joblib.load(path) if not self._saved_model_matches_config(model_data): expected = self._get_expected_knn_params() saved_knn = model_data.get('knn') if isinstance(model_data, dict) else None saved_params = { 'n_neighbors': getattr(saved_knn, 'n_neighbors', None), 'weights': getattr(saved_knn, 'weights', None), 'metric': getattr(saved_knn, 'metric', None) } raise ValueError( f"Saved model configuration {saved_params} does not match current config {expected}." ) self.knn = model_data['knn'] self.scaler = model_data['scaler'] self.feature_columns = model_data['feature_columns'] self.is_trained = model_data['is_trained'] self.training_accuracy = model_data.get('training_accuracy', 0.0) self.model_last_trained = model_data.get('model_last_trained', None) # Load evaluation results if available if 'X_test' in model_data: self.X_test = model_data['X_test'] self.y_test = model_data['y_test'] self.y_pred = model_data['y_pred'] self.confusion_matrix = model_data['confusion_matrix']