E41222753_NinikYuniarsih_Ju.../models/knn_classifier.py

445 lines
18 KiB
Python

import pandas as pd
import numpy as np
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split, cross_val_score, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix
import joblib
import os
from datetime import datetime
import config
#inisialisasi KNN
class JurusanKNNClassifier:
def __init__(self, n_neighbors=None, weights=None, metric=None):
if n_neighbors is None:
#menetapkan parameter KNN default dari config jika tidak diberikan saat inisialisasi
n_neighbors = config.DEFAULT_K_NEIGHBORS
if weights is None:
weights = getattr(config, 'KNN_WEIGHTS', 'uniform')
#semua tetangga punya bobot yang sama saat voting
if metric is None:
metric = getattr(config, 'KNN_METRIC', 'minkowski')
#cara hitung jarak antar data
self.knn = KNeighborsClassifier(n_neighbors=n_neighbors, weights=weights, metric=metric)
#membuat scaler untuk normalisasi data
self.scaler = StandardScaler()
#menandai model belum dilatih
self.is_trained = False
self.feature_columns = []
self.training_accuracy = 0.0
self.model_last_trained = None
def check_model_exists(self, model_path):
"""Check if trained model exists"""
return os.path.exists(model_path)
def _get_expected_knn_params(self):
"""Get current expected KNN params from config."""
return {
'n_neighbors': config.DEFAULT_K_NEIGHBORS,
'weights': getattr(config, 'KNN_WEIGHTS', 'uniform'),
'metric': getattr(config, 'KNN_METRIC', 'minkowski')
}
def _saved_model_matches_config(self, model_data):
"""Check whether saved model KNN params match current config."""
knn = model_data.get('knn') if isinstance(model_data, dict) else None
if knn is None:
return False
expected = self._get_expected_knn_params()
return (
getattr(knn, 'n_neighbors', None) == expected['n_neighbors'] and
getattr(knn, 'weights', None) == expected['weights'] and
getattr(knn, 'metric', None) == expected['metric']
)
def needs_retraining(self, data_path, model_path):
"""Check if model needs retraining based on data freshness"""
if not self.check_model_exists(model_path):
return True
# Retrain when saved model parameters differ from current configuration.
try:
model_data = joblib.load(model_path)
if not self._saved_model_matches_config(model_data):
return True
except Exception:
return True
# Check if data file is newer than model file
if os.path.exists(data_path):
data_modified = os.path.getmtime(data_path)
model_modified = os.path.getmtime(model_path)
return data_modified > model_modified
return False
def auto_train_if_needed(self, data_path, model_path=None, test_data_path=None):
"""Automatically train model if needed"""
if model_path is None:
model_path = config.MODEL_PATH
try:
if self.needs_retraining(data_path, model_path):
# print("Training model...")
accuracy, report = self.train(data_path, test_data_path=test_data_path)
self.save_model(model_path)
# print(f"Model trained with accuracy: {accuracy:.4f}")
return True, accuracy, report
else:
# print("Loading existing model...")
self.load_model(model_path)
return False, self.training_accuracy, None
except Exception as e:
# print(f"Error in auto training: {str(e)}")
return False, 0.0, None
#siapkan data fitur dan label untuk training
def prepare_data(self, data_path):
#Membaca dataset dari file CSV.
self.df = pd.read_csv(data_path)
# kolom fitur (nilai mata pelajaran)
self.feature_columns = ['nilai_informatika', 'nilai_fisika', 'nilai_kimia', 'nilai_biologi',
'nilai_big_lanjut', 'nilai_ekonomi', 'nilai_mat_lanjut', 'nilai_sej_lanjut',
'nilai_sosiologi', 'nilai_geografi']
#data nilai mata pelajaran yang digunakan untuk prediksi
X = self.df[self.feature_columns]
y = self.df['paket_jurusan']
#label paket jurusan
return X, y
#untuk melatih model KNN dengan opsi optimasi hyperparameter, cv std
def _print_testing_report(self, y_test, y_pred, labels, accuracy=None, cv_scores=None):
cm = confusion_matrix(y_test, y_pred, labels=labels)
print("\n" + "=" * 80)
print("DETAIL HASIL TESTING")
print("=" * 80)
# Model and evaluation context
print("\n0) Konfigurasi Evaluasi")
print(f"Model KNN : k={self.knn.n_neighbors}, weights={self.knn.weights}, metric={self.knn.metric}")
print(f"Cross Validation : {config.CROSS_VALIDATION_FOLDS}-fold")
if accuracy is not None:
print(f"Test Accuracy : {accuracy:.4f} ({accuracy * 100:.2f}%)")
if cv_scores is not None and len(cv_scores) > 0:
cv_mean = float(np.mean(cv_scores))
cv_std = float(np.std(cv_scores))
print(f"CV Mean Accuracy : {cv_mean:.4f} ({cv_mean * 100:.2f}%)")
print(f"CV Std Dev : {cv_std:.4f}")
# 1) Confusion matrix table
cm_df = pd.DataFrame(
cm,
index=[f"Actual {label}" for label in labels],
columns=[f"Pred {label}" for label in labels]
)
print("\n1) Confusion Matrix")
print(cm_df.to_string())
# 1b) Normalized confusion matrix (% per actual class)
cm_row_sum = cm.sum(axis=1, keepdims=True)
cm_row_sum[cm_row_sum == 0] = 1
cm_norm = (cm / cm_row_sum) * 100.0
cm_norm_df = pd.DataFrame(
cm_norm,
index=[f"Actual {label}" for label in labels],
columns=[f"Pred {label}" for label in labels]
)
print("\n1b) Confusion Matrix Normalized (%)")
print(cm_norm_df.to_string(float_format=lambda x: f"{x:.2f}"))
# 2) TP, FN, FP, TN table per class (one-vs-rest)
tf_table = []
total = int(cm.sum())
for idx, label in enumerate(labels):
tp = int(cm[idx, idx])
fn = int(cm[idx, :].sum() - tp)
fp = int(cm[:, idx].sum() - tp)
tn = int(total - tp - fn - fp)
tf_table.append({
'kelas': label,
'TP': tp,
'FN': fn,
'FP': fp,
'TN': tn
})
tf_df = pd.DataFrame(tf_table)
print("\n2) Tabel TP/FN/FP/TN per Kelas")
print(tf_df.to_string(index=False))
# 3) Actual vs predicted for all test rows
result_df = pd.DataFrame({
'no_data_test': np.arange(1, len(y_test) + 1),
'nilai_sebenarnya': np.array(y_test),
'nilai_prediksi': np.array(y_pred)
})
def row_status(actual, pred):
return "BENAR" if actual == pred else "SALAH"
def row_confusion_status(actual, pred):
if actual == pred:
return "TP (kelas aktual), TN"
return f"FN (kelas {actual}), FP (kelas {pred})"
result_df['status'] = [
row_status(actual, pred)
for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi'])
]
result_df['status_tpfnfp_tn'] = [
row_confusion_status(actual, pred)
for actual, pred in zip(result_df['nilai_sebenarnya'], result_df['nilai_prediksi'])
]
print(f"\n3) Nilai Sebenarnya vs Nilai Prediksi ({len(result_df)} data test)")
print(result_df.to_string(index=False))
# 4) Classification metrics
report_dict = classification_report(
y_test,
y_pred,
labels=labels,
output_dict=True,
zero_division=0
)
metrics_rows = []
for label in labels:
key = str(label)
metrics_rows.append({
'kelas': label,
'precision': report_dict[key]['precision'],
'recall': report_dict[key]['recall'],
'f1-score': report_dict[key]['f1-score'],
'support': int(report_dict[key]['support'])
})
metrics_rows.append({
'kelas': 'accuracy',
'precision': np.nan,
'recall': np.nan,
'f1-score': report_dict['accuracy'],
'support': int(sum(report_dict[str(label)]['support'] for label in labels))
})
metrics_rows.append({
'kelas': 'macro avg',
'precision': report_dict['macro avg']['precision'],
'recall': report_dict['macro avg']['recall'],
'f1-score': report_dict['macro avg']['f1-score'],
'support': int(report_dict['macro avg']['support'])
})
metrics_rows.append({
'kelas': 'weighted avg',
'precision': report_dict['weighted avg']['precision'],
'recall': report_dict['weighted avg']['recall'],
'f1-score': report_dict['weighted avg']['f1-score'],
'support': int(report_dict['weighted avg']['support'])
})
metrics_df = pd.DataFrame(metrics_rows)
print("\n4) Precision, Recall, F1-Score, Support, Accuracy, Macro Avg, Weighted Avg")
print(metrics_df.to_string(index=False, float_format=lambda x: f"{x:.4f}" if pd.notna(x) else ""))
print("=" * 80 + "\n")
#training model knn
def train(self, data_path, optimize_k=None, test_data_path=None):
"""Melatih model KNN dengan opsi optimasi hyperparameter"""
if optimize_k is None:
optimize_k = config.OPTIMIZE_K
X, y = self.prepare_data(data_path)
# gunakan test_data_path jika diberikan dan valid, jika tidak lakukan split train/test seperti biasa
if test_data_path and os.path.exists(test_data_path):
X_train, y_train = X, y
X_test, y_test = self.prepare_data(test_data_path)
else:
# Split train/test
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=config.TEST_SIZE, random_state=config.RANDOM_STATE, stratify=y
)
# Normalisasi data
X_train_scaled = self.scaler.fit_transform(X_train)
X_test_scaled = self.scaler.transform(X_test)
# Optimasi hyperparameter k jika diaktifkan
if optimize_k:
best_k = self.find_optimal_k(X_train_scaled, y_train)
self.knn = KNeighborsClassifier(
n_neighbors=best_k,
weights=self.knn.weights,
metric=self.knn.metric
)
print(f"Optimal k value found: {best_k}")
# Training
self.knn.fit(X_train_scaled, y_train)
# Evaluasi, menguji pada data test untuk menghitung akurasi
y_pred = self.knn.predict(X_test_scaled)
y_pred_proba = self.knn.predict_proba(X_test_scaled)
accuracy = accuracy_score(y_test, y_pred)
labels = sorted(set(np.array(y_test)).union(set(np.array(y_pred))))
# Cross-validation score
cv_scores = cross_val_score(
self.knn,
X_train_scaled,
y_train,
cv=config.CROSS_VALIDATION_FOLDS
)
print(f"Cross-validation scores: {cv_scores}")
print(f"Average CV score: {cv_scores.mean():.4f} (+/- {cv_scores.std() * 2:.4f})")
self.is_trained = True
self.training_accuracy = accuracy
self.model_last_trained = datetime.now()
self.cv_scores = cv_scores
self.cv_mean = float(cv_scores.mean())
self.cv_std = float(cv_scores.std())
# Store test data for performance evaluation
self.X_test = X_test
self.y_test = y_test
self.y_pred = y_pred
self.y_pred_proba = y_pred_proba
self.confusion_matrix = confusion_matrix(y_test, y_pred, labels=labels)
self._print_testing_report(y_test, y_pred, labels, accuracy=accuracy, cv_scores=cv_scores)
return accuracy, classification_report(y_test, y_pred, zero_division=0)
#mencari nilai k terbaik dengan GridSearchCV
def find_optimal_k(self, X_train, y_train, k_range=None):
"""Find optimal k using GridSearchCV (uses config.K_RANGE by default)."""
if k_range is None:
k_range = list(config.K_RANGE)
else:
k_range = list(k_range)
param_grid = {'n_neighbors': k_range}
knn_temp = KNeighborsClassifier(weights=self.knn.weights, metric=self.knn.metric)
grid = GridSearchCV(
knn_temp,
param_grid,
cv=config.CROSS_VALIDATION_FOLDS,
scoring='accuracy',
n_jobs=-1,
refit=True
)
grid.fit(X_train, y_train)
best_k = int(grid.best_params_['n_neighbors'])
# store grid object for inspection or display in UI
self.grid_search_results = grid
return best_k
#untuk mendapatkan hasil evaluasi model yang sudah dilatih, termasuk akurasi, classification report, confusion matrix, dan waktu pelatihan terakhir
def evaluate_model(self):
if not self.is_trained:
return None
return {
'accuracy': accuracy_score(self.y_test, self.y_pred),
'classification_report': classification_report(self.y_test, self.y_pred, output_dict=True, zero_division=0),
'confusion_matrix': self.confusion_matrix,
'training_time': self.model_last_trained
}
#prediksi paket jurusan untuk siswa baru
def predict(self, nilai_siswa):
if not self.is_trained:
raise ValueError("Model belum dilatih!")
# Konversi input ke DataFrame
input_df = pd.DataFrame([nilai_siswa], columns=self.feature_columns)
# Data siswa di normalisasi dengan scaler yang sama seperti saat training
input_scaled = self.scaler.transform(input_df)
# Prediksi
prediction = self.knn.predict(input_scaled)[0]
probabilities = self.knn.predict_proba(input_scaled)[0]
# untuk menampilkan semua prediksi dengan probabilitasnya, urutkan berdasarkan probabilitas tertinggi
all_indices = np.argsort(probabilities)[::-1]
all_pakets = [self.knn.classes_[i] for i in all_indices]
all_probs = [probabilities[i] for i in all_indices]
# untuk menampilkan 3 prediksi teratas dengan probabilitasnya
top_3_pakets = []
top_3_probs = []
# Tambahkan prediksi utama terlebih dahulu
main_prediction_index = np.where(self.knn.classes_ == prediction)[0][0]
top_3_pakets.append(prediction)
top_3_probs.append(probabilities[main_prediction_index])
# Tambahkan prediksi lain berdasarkan probabilitas tertinggi, kecuali yang sudah menjadi prediksi utama
for paket, prob in zip(all_pakets, all_probs):
if paket != prediction and len(top_3_pakets) < 3:
top_3_pakets.append(paket)
top_3_probs.append(prob)
return prediction, list(zip(top_3_pakets, top_3_probs))
def save_model(self, path):
"""Simpan model dengan informasi tambahan"""
os.makedirs(os.path.dirname(path), exist_ok=True)
model_data = {
'knn': self.knn,
'scaler': self.scaler,
'feature_columns': self.feature_columns,
'is_trained': self.is_trained,
'training_accuracy': self.training_accuracy,
'model_last_trained': self.model_last_trained,
'n_neighbors': self.knn.n_neighbors if self.is_trained else 5
}
# Save evaluation results if available
if hasattr(self, 'X_test') and hasattr(self, 'y_test'):
model_data.update({
'X_test': self.X_test,
'y_test': self.y_test,
'y_pred': self.y_pred,
'confusion_matrix': self.confusion_matrix
})
joblib.dump(model_data, path)
def load_model(self, path):
"""Load model dengan informasi tambahan"""
model_data = joblib.load(path)
if not self._saved_model_matches_config(model_data):
expected = self._get_expected_knn_params()
saved_knn = model_data.get('knn') if isinstance(model_data, dict) else None
saved_params = {
'n_neighbors': getattr(saved_knn, 'n_neighbors', None),
'weights': getattr(saved_knn, 'weights', None),
'metric': getattr(saved_knn, 'metric', None)
}
raise ValueError(
f"Saved model configuration {saved_params} does not match current config {expected}."
)
self.knn = model_data['knn']
self.scaler = model_data['scaler']
self.feature_columns = model_data['feature_columns']
self.is_trained = model_data['is_trained']
self.training_accuracy = model_data.get('training_accuracy', 0.0)
self.model_last_trained = model_data.get('model_last_trained', None)
# Load evaluation results if available
if 'X_test' in model_data:
self.X_test = model_data['X_test']
self.y_test = model_data['y_test']
self.y_pred = model_data['y_pred']
self.confusion_matrix = model_data['confusion_matrix']