TIF_E41211266/backend/scripts/utils.py

96 lines
3.3 KiB
Python

import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import os
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import os
def load_and_clean_data(file_path):
try:
print(f"Memproses file: {file_path}")
df = pd.read_csv(file_path, delimiter=';')
date_formats = ['%d/%m/%Y', '%Y-%m-%d']
for date_format in date_formats:
try:
df['Tanggal'] = pd.to_datetime(df['Tanggal'], format=date_format)
print(f"Berhasil mengkonversi tanggal dengan format {date_format}")
break
except Exception as e:
print(f"Error konversi tanggal dengan format {date_format}: {e}")
continue
else:
try:
df['Tanggal'] = pd.to_datetime(df['Tanggal'])
print("Berhasil mengkonversi tanggal dengan format default")
except Exception as e2:
raise ValueError(f"Gagal mengkonversi tanggal: {e2}")
# Jika kolom tidak bernama "Harga", coba temukan
if "Harga" not in df.columns:
# Gunakan kolom kedua sebagai Harga
if len(df.columns) > 1:
harga_column = df.columns[1]
df = df.rename(columns={harga_column: "Harga"})
print(f"Menggunakan kolom {harga_column} sebagai Harga")
else:
raise ValueError("Tidak dapat menemukan kolom harga")
df.set_index('Tanggal', inplace=True)
# isi nilai hilangggg
df = df.ffill()
if len(df) < 60:
raise ValueError(f"Data tidak cukup untuk model LSTM. Hanya tersedia {len(df)} baris, minimal 60 baris diperlukan.")
df = df.reset_index()
print(f"Preprocessing berhasil: {len(df)} baris data")
return df
except Exception as e:
error_msg = f"Error saat memuat atau membersihkan data: {str(e)}"
print(error_msg)
raise Exception(error_msg)
def normalize_data(df, dataset_name, scalers):
if 'Tanggal' in df.columns:
df = df.set_index('Tanggal')
scalers[dataset_name] = MinMaxScaler(feature_range=(0, 1))
df_scaled = scalers[dataset_name].fit_transform(df[['Harga']].values)
print(f"Normalisasi selesai untuk {dataset_name}")
print(f"Nilai asli: {df['Harga'].head().values}")
print(f"Nilai ternormalisasi: {df_scaled[:5].flatten()}")
return df_scaled
def create_dataset(data, time_step=60):
X, y = [], []
for i in range(len(data) - time_step - 1):
X.append(data[i:(i + time_step), 0])
y.append(data[i + time_step, 0])
return np.array(X), np.array(y)
def split_data(X, y, train_size=0.8):
train_len = int(len(X) * train_size)
X_train, X_test = X[:train_len], X[train_len:]
y_train, y_test = y[:train_len], y[train_len:]
return X_train, X_test, y_train, y_test
def denormalize_data(scaled_data, dataset_name, scalers):
return scalers[dataset_name].inverse_transform(scaled_data)
def evaluate_model(model, X_test, y_test):
from sklearn.metrics import mean_squared_error, mean_absolute_error
y_pred = model.predict(X_test)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
mae = mean_absolute_error(y_test, y_pred)
return rmse, mae