445 lines
18 KiB
Python
445 lines
18 KiB
Python
import pandas as pd
|
|
import numpy as np
|
|
from sklearn.neighbors import KNeighborsClassifier
|
|
from sklearn.preprocessing import StandardScaler
|
|
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
|
|
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
|
|
import joblib
|
|
import os
|
|
from datetime import datetime
|
|
import config
|
|
|
|
#inisialisasi KNN
|
|
class JurusanKNNClassifier:
|
|
def __init__(self, n_neighbors=None, weights=None, metric=None):
|
|
if n_neighbors is None:
|
|
#menetapkan parameter KNN default dari config jika tidak diberikan saat inisialisasi
|
|
n_neighbors = config.DEFAULT_K_NEIGHBORS
|
|
if weights is None:
|
|
weights = getattr(config, 'KNN_WEIGHTS', 'uniform')
|
|
#semua tetangga punya bobot yang sama saat voting
|
|
if metric is None:
|
|
metric = getattr(config, 'KNN_METRIC', 'minkowski')
|
|
#cara hitung jarak antar data
|
|
self.knn = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, metric=metric)
|
|
#membuat scaler untuk normalisasi data
|
|
self.scaler = StandardScaler()
|
|
#menandai model belum dilatih
|
|
self.is_trained = False
|
|
self.feature_columns = []
|
|
self.training_accuracy = 0.0
|
|
self.model_last_trained = None
|
|
|
|
def check_model_exists(self, model_path):
|
|
"""Check if trained model exists"""
|
|
return os.path.exists(model_path)
|
|
|
|
def _get_expected_knn_params(self):
|
|
"""Get current expected KNN params from config."""
|
|
return {
|
|
'n_neighbors': config.DEFAULT_K_NEIGHBORS,
|
|
'weights': getattr(config, 'KNN_WEIGHTS', 'uniform'),
|
|
'metric': getattr(config, 'KNN_METRIC', 'minkowski')
|
|
}
|
|
|
|
def _saved_model_matches_config(self, model_data):
|
|
"""Check whether saved model KNN params match current config."""
|
|
knn = model_data.get('knn') if isinstance(model_data, dict) else None
|
|
if knn is None:
|
|
return False
|
|
|
|
expected = self._get_expected_knn_params()
|
|
return (
|
|
getattr(knn, 'n_neighbors', None) == expected['n_neighbors'] and
|
|
getattr(knn, 'weights', None) == expected['weights'] and
|
|
getattr(knn, 'metric', None) == expected['metric']
|
|
)
|
|
|
|
def needs_retraining(self, data_path, model_path):
|
|
"""Check if model needs retraining based on data freshness"""
|
|
if not self.check_model_exists(model_path):
|
|
return True
|
|
|
|
# Retrain when saved model parameters differ from current configuration.
|
|
try:
|
|
model_data = joblib.load(model_path)
|
|
if not self._saved_model_matches_config(model_data):
|
|
return True
|
|
except Exception:
|
|
return True
|
|
|
|
# Check if data file is newer than model file
|
|
if os.path.exists(data_path):
|
|
data_modified = os.path.getmtime(data_path)
|
|
model_modified = os.path.getmtime(model_path)
|
|
return data_modified > model_modified
|
|
|
|
return False
|
|
|
|
def auto_train_if_needed(self, data_path, model_path=None, test_data_path=None):
|
|
"""Automatically train model if needed"""
|
|
if model_path is None:
|
|
model_path = config.MODEL_PATH
|
|
try:
|
|
if self.needs_retraining(data_path, model_path):
|
|
# print("Training model...")
|
|
accuracy, report = self.train(data_path, test_data_path=test_data_path)
|
|
self.save_model(model_path)
|
|
# print(f"Model trained with accuracy: {accuracy:.4f}")
|
|
return True, accuracy, report
|
|
else:
|
|
# print("Loading existing model...")
|
|
self.load_model(model_path)
|
|
return False, self.training_accuracy, None
|
|
except Exception as e:
|
|
# print(f"Error in auto training: {str(e)}")
|
|
return False, 0.0, None
|
|
|
|
#siapkan data fitur dan label untuk training
|
|
def prepare_data(self, data_path):
|
|
#Membaca dataset dari file CSV.
|
|
self.df = pd.read_csv(data_path)
|
|
|
|
# kolom fitur (nilai mata pelajaran)
|
|
self.feature_columns = ['nilai_informatika', 'nilai_fisika', 'nilai_kimia', 'nilai_biologi',
|
|
'nilai_big_lanjut', 'nilai_ekonomi', 'nilai_mat_lanjut', 'nilai_sej_lanjut',
|
|
'nilai_sosiologi', 'nilai_geografi']
|
|
#data nilai mata pelajaran yang digunakan untuk prediksi
|
|
X = self.df[self.feature_columns]
|
|
y = self.df['paket_jurusan']
|
|
#label paket jurusan
|
|
|
|
return X, y
|
|
|
|
#untuk melatih model KNN dengan opsi optimasi hyperparameter, cv std
|
|
def _print_testing_report(self, y_test, y_pred, labels, accuracy=None, cv_scores=None):
|
|
cm = confusion_matrix(y_test, y_pred, labels=labels)
|
|
|
|
print("\n" + "=" * 80)
|
|
print("DETAIL HASIL TESTING")
|
|
print("=" * 80)
|
|
|
|
# Model and evaluation context
|
|
print("\n0) Konfigurasi Evaluasi")
|
|
print(f"Model KNN : k={self.knn.n_neighbors}, weights={self.knn.weights}, metric={self.knn.metric}")
|
|
print(f"Cross Validation : {config.CROSS_VALIDATION_FOLDS}-fold")
|
|
if accuracy is not None:
|
|
print(f"Test Accuracy : {accuracy:.4f} ({accuracy * 100:.2f}%)")
|
|
if cv_scores is not None and len(cv_scores) > 0:
|
|
cv_mean = float(np.mean(cv_scores))
|
|
cv_std = float(np.std(cv_scores))
|
|
print(f"CV Mean Accuracy : {cv_mean:.4f} ({cv_mean * 100:.2f}%)")
|
|
print(f"CV Std Dev : {cv_std:.4f}")
|
|
|
|
# 1) Confusion matrix table
|
|
cm_df = pd.DataFrame(
|
|
cm,
|
|
index=[f"Actual {label}" for label in labels],
|
|
columns=[f"Pred {label}" for label in labels]
|
|
)
|
|
print("\n1) Confusion Matrix")
|
|
print(cm_df.to_string())
|
|
|
|
# 1b) Normalized confusion matrix (% per actual class)
|
|
cm_row_sum = cm.sum(axis=1, keepdims=True)
|
|
cm_row_sum[cm_row_sum == 0] = 1
|
|
cm_norm = (cm / cm_row_sum) * 100.0
|
|
cm_norm_df = pd.DataFrame(
|
|
cm_norm,
|
|
index=[f"Actual {label}" for label in labels],
|
|
columns=[f"Pred {label}" for label in labels]
|
|
)
|
|
print("\n1b) Confusion Matrix Normalized (%)")
|
|
print(cm_norm_df.to_string(float_format=lambda x: f"{x:.2f}"))
|
|
|
|
# 2) TP, FN, FP, TN table per class (one-vs-rest)
|
|
tf_table = []
|
|
total = int(cm.sum())
|
|
for idx, label in enumerate(labels):
|
|
tp = int(cm[idx, idx])
|
|
fn = int(cm[idx, :].sum() - tp)
|
|
fp = int(cm[:, idx].sum() - tp)
|
|
tn = int(total - tp - fn - fp)
|
|
tf_table.append({
|
|
'kelas': label,
|
|
'TP': tp,
|
|
'FN': fn,
|
|
'FP': fp,
|
|
'TN': tn
|
|
})
|
|
|
|
tf_df = pd.DataFrame(tf_table)
|
|
print("\n2) Tabel TP/FN/FP/TN per Kelas")
|
|
print(tf_df.to_string(index=False))
|
|
|
|
# 3) Actual vs predicted for all test rows
|
|
result_df = pd.DataFrame({
|
|
'no_data_test': np.arange(1, len(y_test) + 1),
|
|
'nilai_sebenarnya': np.array(y_test),
|
|
'nilai_prediksi': np.array(y_pred)
|
|
})
|
|
|
|
def row_status(actual, pred):
|
|
return "BENAR" if actual == pred else "SALAH"
|
|
|
|
def row_confusion_status(actual, pred):
|
|
if actual == pred:
|
|
return "TP (kelas aktual), TN"
|
|
return f"FN (kelas {actual}), FP (kelas {pred})"
|
|
|
|
result_df['status'] = [
|
|
row_status(actual, pred)
|
|
for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi'])
|
|
]
|
|
result_df['status_tpfnfp_tn'] = [
|
|
row_confusion_status(actual, pred)
|
|
for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi'])
|
|
]
|
|
|
|
print(f"\n3) Nilai Sebenarnya vs Nilai Prediksi ({len(result_df)} data test)")
|
|
print(result_df.to_string(index=False))
|
|
|
|
# 4) Classification metrics
|
|
report_dict = classification_report(
|
|
y_test,
|
|
y_pred,
|
|
labels=labels,
|
|
output_dict=True,
|
|
zero_division=0
|
|
)
|
|
|
|
metrics_rows = []
|
|
for label in labels:
|
|
key = str(label)
|
|
metrics_rows.append({
|
|
'kelas': label,
|
|
'precision': report_dict[key]['precision'],
|
|
'recall': report_dict[key]['recall'],
|
|
'f1-score': report_dict[key]['f1-score'],
|
|
'support': int(report_dict[key]['support'])
|
|
})
|
|
|
|
metrics_rows.append({
|
|
'kelas': 'accuracy',
|
|
'precision': np.nan,
|
|
'recall': np.nan,
|
|
'f1-score': report_dict['accuracy'],
|
|
'support': int(sum(report_dict[str(label)]['support'] for label in labels))
|
|
})
|
|
metrics_rows.append({
|
|
'kelas': 'macro avg',
|
|
'precision': report_dict['macro avg']['precision'],
|
|
'recall': report_dict['macro avg']['recall'],
|
|
'f1-score': report_dict['macro avg']['f1-score'],
|
|
'support': int(report_dict['macro avg']['support'])
|
|
})
|
|
metrics_rows.append({
|
|
'kelas': 'weighted avg',
|
|
'precision': report_dict['weighted avg']['precision'],
|
|
'recall': report_dict['weighted avg']['recall'],
|
|
'f1-score': report_dict['weighted avg']['f1-score'],
|
|
'support': int(report_dict['weighted avg']['support'])
|
|
})
|
|
|
|
metrics_df = pd.DataFrame(metrics_rows)
|
|
print("\n4) Precision, Recall, F1-Score, Support, Accuracy, Macro Avg, Weighted Avg")
|
|
print(metrics_df.to_string(index=False, float_format=lambda x: f"{x:.4f}" if pd.notna(x) else ""))
|
|
print("=" * 80 + "\n")
|
|
|
|
#training model knn
|
|
def train(self, data_path, optimize_k=None, test_data_path=None):
|
|
"""Melatih model KNN dengan opsi optimasi hyperparameter"""
|
|
if optimize_k is None:
|
|
optimize_k = config.OPTIMIZE_K
|
|
|
|
X, y = self.prepare_data(data_path)
|
|
|
|
# gunakan test_data_path jika diberikan dan valid, jika tidak lakukan split train/test seperti biasa
|
|
if test_data_path and os.path.exists(test_data_path):
|
|
X_train, y_train = X, y
|
|
X_test, y_test = self.prepare_data(test_data_path)
|
|
else:
|
|
# Split train/test
|
|
X_train, X_test, y_train, y_test = train_test_split(
|
|
X, y, test_size=config.TEST_SIZE, random_state=config.RANDOM_STATE, stratify=y
|
|
)
|
|
|
|
# Normalisasi data
|
|
X_train_scaled = self.scaler.fit_transform(X_train)
|
|
X_test_scaled = self.scaler.transform(X_test)
|
|
|
|
# Optimasi hyperparameter k jika diaktifkan
|
|
if optimize_k:
|
|
best_k = self.find_optimal_k(X_train_scaled, y_train)
|
|
self.knn = KNeighborsClassifier(
|
|
n_neighbors=best_k,
|
|
weights=self.knn.weights,
|
|
metric=self.knn.metric
|
|
)
|
|
print(f"Optimal k value found: {best_k}")
|
|
|
|
# Training
|
|
self.knn.fit(X_train_scaled, y_train)
|
|
|
|
# Evaluasi, menguji pada data test untuk menghitung akurasi
|
|
y_pred = self.knn.predict(X_test_scaled)
|
|
y_pred_proba = self.knn.predict_proba(X_test_scaled)
|
|
accuracy = accuracy_score(y_test, y_pred)
|
|
labels = sorted(set(np.array(y_test)).union(set(np.array(y_pred))))
|
|
|
|
# Cross-validation score
|
|
cv_scores = cross_val_score(
|
|
self.knn,
|
|
X_train_scaled,
|
|
y_train,
|
|
cv=config.CROSS_VALIDATION_FOLDS
|
|
)
|
|
print(f"Cross-validation scores: {cv_scores}")
|
|
print(f"Average CV score: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
|
|
|
|
self.is_trained = True
|
|
self.training_accuracy = accuracy
|
|
self.model_last_trained = datetime.now()
|
|
self.cv_scores = cv_scores
|
|
self.cv_mean = float(cv_scores.mean())
|
|
self.cv_std = float(cv_scores.std())
|
|
|
|
# Store test data for performance evaluation
|
|
self.X_test = X_test
|
|
self.y_test = y_test
|
|
self.y_pred = y_pred
|
|
self.y_pred_proba = y_pred_proba
|
|
self.confusion_matrix = confusion_matrix(y_test, y_pred, labels=labels)
|
|
|
|
self._print_testing_report(y_test, y_pred, labels, accuracy=accuracy, cv_scores=cv_scores)
|
|
|
|
return accuracy, classification_report(y_test, y_pred, zero_division=0)
|
|
|
|
#mencari nilai k terbaik dengan GridSearchCV
|
|
def find_optimal_k(self, X_train, y_train, k_range=None):
|
|
"""Find optimal k using GridSearchCV (uses config.K_RANGE by default)."""
|
|
if k_range is None:
|
|
k_range = list(config.K_RANGE)
|
|
else:
|
|
k_range = list(k_range)
|
|
|
|
param_grid = {'n_neighbors': k_range}
|
|
knn_temp = KNeighborsClassifier(weights=self.knn.weights, metric=self.knn.metric)
|
|
|
|
grid = GridSearchCV(
|
|
knn_temp,
|
|
param_grid,
|
|
cv=config.CROSS_VALIDATION_FOLDS,
|
|
scoring='accuracy',
|
|
n_jobs=-1,
|
|
refit=True
|
|
)
|
|
|
|
grid.fit(X_train, y_train)
|
|
best_k = int(grid.best_params_['n_neighbors'])
|
|
# store grid object for inspection or display in UI
|
|
self.grid_search_results = grid
|
|
return best_k
|
|
|
|
#untuk mendapatkan hasil evaluasi model yang sudah dilatih, termasuk akurasi, classification report, confusion matrix, dan waktu pelatihan terakhir
|
|
def evaluate_model(self):
|
|
if not self.is_trained:
|
|
return None
|
|
|
|
return {
|
|
'accuracy': accuracy_score(self.y_test, self.y_pred),
|
|
'classification_report': classification_report(self.y_test, self.y_pred, output_dict=True, zero_division=0),
|
|
'confusion_matrix': self.confusion_matrix,
|
|
'training_time': self.model_last_trained
|
|
}
|
|
|
|
#prediksi paket jurusan untuk siswa baru
|
|
def predict(self, nilai_siswa):
|
|
if not self.is_trained:
|
|
raise ValueError("Model belum dilatih!")
|
|
|
|
# Konversi input ke DataFrame
|
|
input_df = pd.DataFrame([nilai_siswa], columns=self.feature_columns)
|
|
|
|
# Data siswa di normalisasi dengan scaler yang sama seperti saat training
|
|
input_scaled = self.scaler.transform(input_df)
|
|
|
|
# Prediksi
|
|
prediction = self.knn.predict(input_scaled)[0]
|
|
probabilities = self.knn.predict_proba(input_scaled)[0]
|
|
|
|
# untuk menampilkan semua prediksi dengan probabilitasnya, urutkan berdasarkan probabilitas tertinggi
|
|
all_indices = np.argsort(probabilities)[::-1]
|
|
all_pakets = [self.knn.classes_[i] for i in all_indices]
|
|
all_probs = [probabilities[i] for i in all_indices]
|
|
|
|
# untuk menampilkan 3 prediksi teratas dengan probabilitasnya
|
|
top_3_pakets = []
|
|
top_3_probs = []
|
|
|
|
# Tambahkan prediksi utama terlebih dahulu
|
|
main_prediction_index = np.where(self.knn.classes_ == prediction)[0][0]
|
|
top_3_pakets.append(prediction)
|
|
top_3_probs.append(probabilities[main_prediction_index])
|
|
|
|
# Tambahkan prediksi lain berdasarkan probabilitas tertinggi, kecuali yang sudah menjadi prediksi utama
|
|
for paket, prob in zip(all_pakets, all_probs):
|
|
if paket != prediction and len(top_3_pakets) < 3:
|
|
top_3_pakets.append(paket)
|
|
top_3_probs.append(prob)
|
|
|
|
return prediction, list(zip(top_3_pakets, top_3_probs))
|
|
|
|
def save_model(self, path):
|
|
"""Simpan model dengan informasi tambahan"""
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
model_data = {
|
|
'knn': self.knn,
|
|
'scaler': self.scaler,
|
|
'feature_columns': self.feature_columns,
|
|
'is_trained': self.is_trained,
|
|
'training_accuracy': self.training_accuracy,
|
|
'model_last_trained': self.model_last_trained,
|
|
'n_neighbors': self.knn.n_neighbors if self.is_trained else 5
|
|
}
|
|
|
|
# Save evaluation results if available
|
|
if hasattr(self, 'X_test') and hasattr(self, 'y_test'):
|
|
model_data.update({
|
|
'X_test': self.X_test,
|
|
'y_test': self.y_test,
|
|
'y_pred': self.y_pred,
|
|
'confusion_matrix': self.confusion_matrix
|
|
})
|
|
|
|
joblib.dump(model_data, path)
|
|
|
|
def load_model(self, path):
|
|
"""Load model dengan informasi tambahan"""
|
|
model_data = joblib.load(path)
|
|
|
|
if not self._saved_model_matches_config(model_data):
|
|
expected = self._get_expected_knn_params()
|
|
saved_knn = model_data.get('knn') if isinstance(model_data, dict) else None
|
|
saved_params = {
|
|
'n_neighbors': getattr(saved_knn, 'n_neighbors', None),
|
|
'weights': getattr(saved_knn, 'weights', None),
|
|
'metric': getattr(saved_knn, 'metric', None)
|
|
}
|
|
raise ValueError(
|
|
f"Saved model configuration {saved_params} does not match current config {expected}."
|
|
)
|
|
|
|
self.knn = model_data['knn']
|
|
self.scaler = model_data['scaler']
|
|
self.feature_columns = model_data['feature_columns']
|
|
self.is_trained = model_data['is_trained']
|
|
self.training_accuracy = model_data.get('training_accuracy', 0.0)
|
|
self.model_last_trained = model_data.get('model_last_trained', None)
|
|
|
|
# Load evaluation results if available
|
|
if 'X_test' in model_data:
|
|
self.X_test = model_data['X_test']
|
|
self.y_test = model_data['y_test']
|
|
self.y_pred = model_data['y_pred']
|
|
self.confusion_matrix = model_data['confusion_matrix'] |