This commit is contained in:
fhm 2025-06-15 13:02:29 +07:00
parent b2810f3d10
commit 04d8284011
13 changed files with 389 additions and 214 deletions

View File

@ -1,7 +1,7 @@
from fastapi import FastAPI from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from routes.predict_file import router as predict_file_router from routes.prediction import router as prediction_route
from routes.predict_json import router as predict_json_router from routes.protected_prediction import router as protected_prediction
app = FastAPI() app = FastAPI()
@ -14,8 +14,8 @@ app.add_middleware(
) )
# Register API Router # Register API Router
app.include_router(predict_file_router, prefix="/api") app.include_router(prediction_route, prefix="/api")
app.include_router(predict_json_router, prefix="/api") app.include_router(protected_prediction, prefix="/api")
@app.get("/") @app.get("/")
async def root(): async def root():

6
package-lock.json generated Normal file
View File

@ -0,0 +1,6 @@
{
"name": "TA_BE_PY",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

View File

@ -1,72 +0,0 @@
from fastapi import APIRouter, File, UploadFile, Form, HTTPException
from typing import List, Literal
import pandas as pd
import io
from services.forecastService import forecast_arima_per_product
router = APIRouter()
@router.post("/predict-file")
async def predict(
sheet: UploadFile = File(...),
# recordPeriod: Literal["daily", "weekly", "monthly"] = Form(...),
predictionPeriod: Literal["weekly", "monthly"] = Form(...),
predictionMode: Literal["auto", "optimal", "custom"] = Form(...),
arimaModel: str = Form("")
):
try:
# Parse model
model_values: List[int] = []
if predictionMode == "custom":
if not arimaModel:
raise HTTPException(status_code=400, detail="arimaModel harus diisi saat predictionMode adalah 'custom'")
try:
model_values = list(map(int, arimaModel.split(",")))
if len(model_values) != 3:
raise ValueError
except ValueError:
raise HTTPException(status_code=400, detail="Format arimaModel harus 'p,d,q'.")
# Baca file
content = await sheet.read()
df = pd.read_csv(io.BytesIO(content)) if sheet.filename.endswith(".csv") else pd.read_excel(io.BytesIO(content))
if df.empty:
raise HTTPException(status_code=400, detail="File tidak berisi data.")
# Validasi kolom
if 'product_code' not in df.columns and 'product_name' not in df.columns:
raise HTTPException(status_code=400, detail="Data harus memiliki kolom 'product_code' atau 'product_name'.")
if 'date' not in df.columns or 'sold(qty)' not in df.columns:
raise HTTPException(status_code=400, detail="Data harus memiliki kolom 'date' dan 'sold(qty)'.")
product_column = 'product_name' if 'product_name' in df.columns else 'product_code'
df['date'] = pd.to_datetime(df['date'])
df = df.sort_values(by=[product_column, 'date'])
freq_map = {"daily": "D", "weekly": "W", "monthly": "M"}
horizon = 3
results = []
for product, group in df.groupby(product_column):
try:
result = forecast_arima_per_product(group, freq_map[predictionPeriod], predictionMode, model_values, horizon)
forecast = result["forecast"]
results.append({
"predictionPeriod":predictionPeriod,
"product": product,
"order": ",".join(map(str, result["model_params"])),
"phase1": forecast[0] if len(forecast) > 0 else None,
"phase2": forecast[1] if len(forecast) > 1 else None,
"phase3": forecast[2] if len(forecast) > 2 else None,
})
except Exception as model_err:
results.append({
"product": product,
"error": str(model_err)
})
return {"status": "success", "data": results}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Terjadi kesalahan saat memproses file: {str(e)}")

View File

@ -1,80 +0,0 @@
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Optional, Literal
import numpy as np
import pandas as pd
from statsmodels.tsa.arima.model import ARIMA
from statsmodels.tsa.stattools import adfuller, acf, pacf
from sklearn.metrics import mean_squared_error
router = APIRouter()
class TimeSeriesData(BaseModel):
date: List[str]
value: List[float]
class PredictionRequest(BaseModel):
data: TimeSeriesData
model: Literal['optimal', 'custom', 'auto'] = "auto"
forecast_step: int
order: Optional[List[int]] = None
def determine_d(series):
""" Menentukan jumlah differencing (d) berdasarkan uji Augmented Dickey-Fuller """
d = 0
while adfuller(series)[1] > 0.05 and d < 2:
series = series.diff().dropna()
d += 1
return d
def determine_p_q(series):
""" Menentukan p dan q berdasarkan ACF dan PACF """
acf_vals = acf(series.dropna(), nlags=10)
pacf_vals = pacf(series.dropna(), nlags=10)
p = next((i for i, v in enumerate(pacf_vals[1:], start=1) if abs(v) > 0.2), 1)
q = next((i for i, v in enumerate(acf_vals[1:], start=1) if abs(v) > 0.2), 1)
return p, q
@router.post("/predict-json")
async def predict_json(request: PredictionRequest):
if len(request.data.date) != len(request.data.value):
raise HTTPException(status_code=400, detail="Date and value lists must have the same length.")
try:
df = pd.DataFrame({"date": pd.to_datetime(request.data.date), "value": request.data.value})
df = df.dropna().sort_values(by="date").set_index("date")
except Exception as e:
raise HTTPException(status_code=400, detail=f"Invalid data format: {str(e)}")
if len(df) < 60:
raise HTTPException(status_code=400, detail="Insufficient data: At least 60 records required.")
train_size = int(len(df) * 0.7)
train, test = df[:train_size], df[train_size:]
if request.model == "auto":
d = determine_d(train["value"])
p, q = determine_p_q(train["value"])
elif request.model == "optimal":
p, d, q = 2, 1, 2
elif request.model == "custom":
if not request.order or len(request.order) != 3:
raise HTTPException(status_code=400, detail="Custom model requires an array of [p, d, q].")
p, d, q = request.order
else:
raise HTTPException(status_code=400, detail="Invalid model type. Choose 'auto', 'optimal', or 'custom'.")
try:
arima_model = ARIMA(train["value"], order=(p, d, q))
model_fit = arima_model.fit()
predictions = model_fit.forecast(steps=len(test)).tolist()
rmse = np.sqrt(mean_squared_error(test["value"], predictions))
future_forecast = model_fit.forecast(steps=request.forecast_step).tolist()
except Exception as e:
raise HTTPException(status_code=500, detail=f"Model training error: {str(e)}")
return {
"arima_order": [p, d, q],
"rmse": rmse,
"forecast": future_forecast
}

93
routes/prediction.py Normal file
View File

@ -0,0 +1,93 @@
from fastapi import APIRouter, HTTPException
from schema.prediction import (
AutoPredictionRequest,
AutoPredictionResponse,
ManualPredictionRequest,
ManualPredictionResponse,
)
from utils.statistic.auto_arima import auto_arima_forecast
from utils.statistic.manual_arima import manual_arima_forecast
from utils.data_preparation import read_csv_string_to_df, df_group_by_interval
router = APIRouter()
@router.post("/predict/auto", response_model=AutoPredictionResponse)
def predict_auto(request: AutoPredictionRequest):
try:
df = read_csv_string_to_df(request.csv_string)
if request.date_column not in df.columns or request.value_column not in df.columns:
raise HTTPException(status_code=400, detail="Kolom tanggal atau nilai tidak ditemukan di data.")
freq = "W" if request.prediction_period == "weekly" else "M"
# Gunakan parameter date_column & value_column dari request
ts_df = df_group_by_interval(
df,
date_col=request.date_column,
value_col=request.value_column,
freq=freq if request.date_regroup else None # hanya grup jika date_regroup True
)
series = ts_df[request.value_column]
result = auto_arima_forecast(series, forecast_periods=3)
return AutoPredictionResponse(
rmse=result["rmse"],
mape=result["mape"],
arima_order=tuple(result["arima_order"]),
prediction=result["prediction"],
lower=result["lower"],
upper=result["upper"],
success=True
)
except ValueError as ve:
raise HTTPException(status_code=422, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Terjadi kesalahan saat memproses data: {e}")
@router.post("/predict/manual", response_model=ManualPredictionResponse)
def predict_manual(request: ManualPredictionRequest):
try:
df = read_csv_string_to_df(request.csv_string)
if any(val is None for val in [p, d, q]):
raise HTTPException(status_code=400, detail="Semua elemen arimaModel harus memiliki nilai.")
if request.date_column not in df.columns or request.value_column not in df.columns:
raise HTTPException(status_code=400, detail="Kolom tanggal atau nilai tidak ditemukan di data.")
freq = "W" if request.prediction_period == "weekly" else "M"
# Gunakan freq hanya kalau date_regroup True
ts_df = df_group_by_interval(
df,
date_col=request.date_column,
value_col=request.value_column,
freq=freq if request.date_regroup else None
)
series = ts_df[request.value_column]
# Validasi arimaModel
if len(request.arima_model) != 3:
raise HTTPException(status_code=400, detail="Parameter arimaModel harus terdiri dari 3 elemen (p, d, q).")
p, d, q = request.arima_model
result = manual_arima_forecast(series, p=p, d=d, q=q, forecast_periods=3)
return ManualPredictionResponse(
arima_order=tuple(result["arima_order"]),
prediction=result["prediction"],
lower=result["lower"],
upper=result["upper"],
success=True
)
except ValueError as ve:
raise HTTPException(status_code=422, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Terjadi kesalahan saat memproses data: {e}")

View File

@ -0,0 +1,63 @@
from fastapi import APIRouter, HTTPException
from schema.prediction import (
AutoPredictionRequest,
AutoPredictionResponse,
ManualPredictionRequest,
ManualPredictionResponse,
)
from utils.statistic.auto_arima import auto_arima_forecast
from utils.statistic.manual_arima import manual_arima_forecast
from utils.data_preparation import read_csv_string_to_df, df_group_by_interval
router = APIRouter()
@router.post("/predict/private/auto", response_model=AutoPredictionResponse)
def predict_auto(request: AutoPredictionRequest):
try:
df = read_csv_string_to_df(request.csv_string)
series = df['amount']
result = auto_arima_forecast(series, forecast_periods=1)
return AutoPredictionResponse(
rmse=result["rmse"],
mape=result["mape"],
arima_order=tuple(result["arima_order"]),
prediction=result["prediction"],
lower=result["lower"],
upper=result["upper"],
success=True
)
except ValueError as ve:
raise HTTPException(status_code=422, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Terjadi kesalahan saat memproses data: {e}")
@router.post("/predict/private/manual", response_model=ManualPredictionResponse)
def predict_manual(request: ManualPredictionRequest):
try:
df = read_csv_string_to_df(request.csv_string)
series = df['amount']
# Validasi arimaModel
if len(request.arima_model) != 3:
raise HTTPException(status_code=400, detail="Parameter arimaModel harus terdiri dari 3 elemen (p, d, q).")
p, d, q = request.arima_model
result = manual_arima_forecast(series, p=p, d=d, q=q, forecast_periods=1)
return ManualPredictionResponse(
arima_order=tuple(result["arima_order"]),
prediction=result["prediction"],
lower=result["lower"],
upper=result["upper"],
success=True
)
except ValueError as ve:
raise HTTPException(status_code=422, detail=str(ve))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Terjadi kesalahan saat memproses data: {e}")

39
schema/prediction.py Normal file
View File

@ -0,0 +1,39 @@
from pydantic import BaseModel
from typing import List, Literal, Optional, Tuple
class BasePredictionRequest(BaseModel):
csv_string: str
prediction_period: Literal["weekly", "monthly"]
value_column: str='sold_qty'
date_column: str='date'
date_regroup: bool=False
class AutoPredictionRequest(BasePredictionRequest):
"""Request model for automatic ARIMA prediction."""
pass
class ManualPredictionRequest(BasePredictionRequest):
"""Request model for manual ARIMA prediction."""
arima_model: List[Optional[int]] = []
class BasePredictionResponse(BaseModel):
arima_order: Tuple[int, int, int]
upper: List[float]
lower: List[float]
prediction: List[float]
success: bool
class AutoPredictionResponse(BasePredictionResponse):
"""Response model for automatic ARIMA prediction with error metrics."""
rmse: float
mape: float
class ManualPredictionResponse(BasePredictionResponse):
"""Response model for manual ARIMA prediction."""
pass

View File

@ -1,58 +0,0 @@
from statsmodels.tsa.arima.model import ARIMA
from pmdarima import auto_arima
from statsmodels.tsa.stattools import adfuller
import pandas as pd
def forecast_arima_per_product(group: pd.DataFrame, freq: str, mode: str, arima_order: list[int], horizon: int):
group = group.set_index('date')
df_resampled = group.resample(freq).sum().dropna()
series = df_resampled['sold(qty)']
if adfuller(series)[1] > 0.05:
series = series.diff().dropna()
try:
if mode == "auto":
model = auto_arima(
series,
start_p=0, start_q=0,
max_p=5, max_q=5,
d=None,
seasonal=False,
stepwise=True,
suppress_warnings=True,
error_action="ignore"
)
forecast = model.predict(n_periods=horizon)
return {
"forecast": forecast.tolist(),
"model_params": model.order
}
elif mode == "optimal":
model_order = (2, 1, 2)
model = ARIMA(series, order=model_order)
model_fit = model.fit()
forecast = model_fit.forecast(steps=horizon)
return {
"forecast": forecast.tolist(),
"model_params": model_order
}
elif mode == "custom":
if len(arima_order) != 3:
raise ValueError("Parameter ARIMA harus 3 angka: p,d,q.")
model = ARIMA(series, order=tuple(arima_order))
model_fit = model.fit()
forecast = model_fit.forecast(steps=horizon)
return {
"forecast": forecast.tolist(),
"model_params": arima_order
}
else:
raise ValueError("Mode prediksi tidak valid.")
except Exception as e:
raise RuntimeError(f"Model ARIMA gagal dibentuk: {str(e)}")

49
utils/data_preparation.py Normal file
View File

@ -0,0 +1,49 @@
import pandas as pd
import numpy as np
from io import StringIO
def read_csv_string_to_df(csv_string):
return pd.read_csv(StringIO(csv_string))
def df_group_by_interval(df, date_col, value_col, freq):
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
start = df[date_col].min().to_period(freq)
# end = pd.Timestamp.today().to_period(freq)
end = df[date_col].max().to_period(freq) #end date using last date on record
df['period'] = df[date_col].dt.to_period(freq)
grouped = df.groupby('period')[value_col].sum()
full_index = pd.period_range(start, end, freq=freq)
grouped_full = grouped.reindex(full_index, fill_value=0)
# Ubah PeriodIndex jadi DatetimeIndex
grouped_full.index = grouped_full.index.to_timestamp()
# Convert Series jadi DataFrame biar bisa akses kolom
return grouped_full.to_frame(name=value_col)
# return grouped.to_frame(name=value_col)
def df_group_by_interval_interpolate(df, date_col, value_col, freq):
df[date_col] = pd.to_datetime(df[date_col], errors='coerce')
start = df[date_col].min().to_period(freq)
end = pd.Timestamp.today().to_period(freq)
df['period'] = df[date_col].dt.to_period(freq)
grouped = df.groupby('period')[value_col].sum()
full_index = pd.period_range(start, end, freq=freq)
grouped_full = grouped.reindex(full_index, fill_value=0)
# Convert PeriodIndex ke DatetimeIndex
grouped_full.index = grouped_full.index.to_timestamp()
# Interpolasi linear untuk mengisi 0 yang ada di data
# Pertama, ubah 0 jadi NaN supaya interpolasi bisa jalan
grouped_full_replaced = grouped_full.replace(0, np.nan)
# Lakukan interpolasi berdasarkan waktu index
grouped_interpolated = grouped_full_replaced.interpolate(method='time')
# Optional: kalau mau fill sisa NaN di ujung dengan 0 lagi (atau pakai forward/backward fill)
grouped_interpolated = grouped_interpolated.fillna(0)
return grouped_interpolated.to_frame(name=value_col)

6
utils/math/mape.py Normal file
View File

@ -0,0 +1,6 @@
import numpy as np
def mean_absolute_percentage_error(y_true, y_pred):
"""Hitung MAPE manual karena sklearn belum built-in."""
y_true, y_pred = np.array(y_true), np.array(y_pred)
return np.mean(np.abs((y_true - y_pred) / np.clip(np.abs(y_true), 1e-8, None))) * 100

View File

@ -0,0 +1,66 @@
from sklearn.metrics import mean_squared_error as mse
from utils.math.mape import mean_absolute_percentage_error
import pmdarima as pm
import warnings
import pandas as pd
import numpy as np
warnings.filterwarnings("ignore", category=FutureWarning)
def auto_arima_forecast(series: pd.Series, train_ratio=0.8, forecast_periods: int = 1) -> dict:
if series is None or series.empty:
raise ValueError("Data tidak valid atau kosong.")
# Split data
train_len = int(len(series) * train_ratio)
train_data = series.iloc[:train_len]
test_data = series.iloc[train_len:]
# Training model
model = pm.auto_arima(
train_data,
start_p=1, start_q=1,
max_p=3, max_q=3,
d=None,
test='adf',
seasonal=False,
m=1,
trace=False,
error_action='ignore',
suppress_warnings=True,
stepwise=True
)
# Prediksi test set
n_test = len(test_data)
preds_test, _ = model.predict(n_periods=n_test, return_conf_int=True)
rmse = np.sqrt(mse(test_data, preds_test))
mape = mean_absolute_percentage_error(test_data, preds_test)
# Refit dengan seluruh data
model.update(test_data)
# Forecast n-step forward
forecast, conf_int = model.predict(n_periods=forecast_periods, return_conf_int=True)
# Buat index forecast
# if isinstance(series.index, pd.DatetimeIndex):
# last_date = series.index[-1]
# freq = pd.infer_freq(series.index) or 'D'
# forecast_index = pd.date_range(start=last_date, periods=forecast_periods + 1, freq=freq)[1:]
# else:
# last_index = series.index[-1] if not series.index.empty else -1
# forecast_index = pd.RangeIndex(start=last_index + 1, stop=last_index + 1 + forecast_periods)
forecast_values = list(forecast)
lower_bounds = list(conf_int[:, 0])
upper_bounds = list(conf_int[:, 1])
return {
"rmse": rmse,
"mape": mape,
"arima_order": model.order,
"prediction": forecast_values,
"lower": lower_bounds,
"upper": upper_bounds,
"success": True
}

View File

@ -0,0 +1,63 @@
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.arima.model import ARIMA
import pandas as pd
import numpy as np
def test_stationarity(timeseries):
try:
dftest = adfuller(timeseries.dropna(), autolag='AIC')
return dftest[1] # p-value
except Exception as e:
print(f"Uji stasioneritas gagal: {e}")
return 1.0
def find_d_parameter(timeseries):
d = 0
ts_stationary = timeseries.copy()
p_value = test_stationarity(ts_stationary.dropna())
while p_value > 0.05 and d < 3:
d += 1
ts_stationary = ts_stationary.diff().dropna()
if ts_stationary.empty or len(ts_stationary) < 20:
return timeseries.diff(d-1).dropna() if d > 0 else timeseries, d-1 if d > 0 else 0
p_value = test_stationarity(ts_stationary)
return ts_stationary, d
def manual_arima_forecast(series: pd.Series,
p=None, d=None, q=None, forecast_periods=1):
if series is None or series.empty:
raise ValueError("Data tidak valid atau kosong.")
if d is None:
_, d_optimal = find_d_parameter(series)
else:
d_optimal = d
p_optimal = p if p is not None else 1
q_optimal = q if q is not None else 1
model = ARIMA(series.astype(float), order=(p_optimal, d_optimal, q_optimal))
model_fit = model.fit()
forecast_result = model_fit.get_forecast(steps=forecast_periods)
forecast_values = forecast_result.predicted_mean
confidence_intervals = forecast_result.conf_int()
# if isinstance(series.index, pd.DatetimeIndex):
# last_date = series.index[-1]
# freq = pd.infer_freq(series.index) or 'D'
# forecast_index = pd.date_range(start=last_date, periods=forecast_periods + 1, freq=freq)[1:]
# else:
# last_index_val = series.index[-1] if not series.index.empty else -1
# forecast_index = pd.RangeIndex(start=last_index_val + 1, stop=last_index_val + 1 + forecast_periods)
# forecast_series = pd.Series(forecast_values.values, index=forecast_index)
# confidence_intervals.index = forecast_index
return {
"arima_order": (p_optimal, d_optimal, q_optimal),
"prediction": [float(x) for x in forecast_values.values],
"lower": list(confidence_intervals.iloc[:, 0]),
"upper": list(confidence_intervals.iloc[:, 1]),
"success": True
}