ta-azis-pendeteksi-nominal-.../project/app.py

575 lines
17 KiB
Python

from flask import Flask, request, jsonify, render_template, Response, send_file
from ultralytics import YOLO
import os
import shutil
import cv2
import subprocess
import threading
import queue
import zipfile
import yaml
import sys
from datetime import datetime
import glob
# =========================
# INIT APP
# =========================
app = Flask(__name__)
model = YOLO("best.pt")
latest_result = {
"image": "/static/latest.jpg",
"nominal": "-"
}
if not os.path.exists("static"):
os.makedirs("static")
if not os.path.exists("uploads"):
os.makedirs("uploads")
# Folder audio WAV
if not os.path.exists("static/audio"):
os.makedirs("static/audio")
# Folder untuk menyimpan semua model
if not os.path.exists("models"):
os.makedirs("models")
# File untuk menyimpan nama model yang aktif
active_model_file = "models/active_model.txt"
if not os.path.exists(active_model_file):
with open(active_model_file, "w") as f:
f.write("original")
# Simpan best.pt awal sebagai model "original" jika belum ada
_original_dir = os.path.join("models", "original")
if not os.path.exists(_original_dir) and os.path.exists("best.pt"):
os.makedirs(_original_dir, exist_ok=True)
shutil.copy("best.pt", os.path.join(_original_dir, "best.pt"))
_info = {
"name": "original",
"base_model": "best.pt",
"epochs": "-",
"imgsz": "-",
"timestamp": "000000_000000",
"created": "Model Awal"
}
with open(os.path.join(_original_dir, "info.yaml"), "w") as f:
yaml.dump(_info, f)
# Queue untuk streaming log training
training_log_queue = queue.Queue()
training_running = False
# =========================
# HALAMAN MONITORING
# =========================
@app.route('/')
def home():
return render_template('monitoring.html')
@app.route('/monitoring')
def monitoring():
return render_template('monitoring.html')
# =========================
# HALAMAN TESTING
# =========================
@app.route('/testing')
def test_page():
return render_template('testing.html')
# =========================
# HALAMAN TRAINING
# =========================
@app.route('/training')
def training_page():
return render_template('training.html')
# =========================
# STEP 0 - INSTALL ULTRALYTICS
# =========================
@app.route('/install_ultralytics', methods=['POST'])
def install_ultralytics():
def run_install():
while not training_log_queue.empty():
training_log_queue.get()
training_log_queue.put("▶ Menginstall ultralytics...\n")
process = subprocess.Popen(
[sys.executable, "-m", "pip", "install", "ultralytics"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1
)
for line in process.stdout:
training_log_queue.put(line)
process.wait()
if process.returncode == 0:
training_log_queue.put("\n✅ Ultralytics berhasil diinstall!\n")
else:
training_log_queue.put("\n❌ Gagal install ultralytics.\n")
training_log_queue.put("__DONE__")
thread = threading.Thread(target=run_install, daemon=True)
thread.start()
return jsonify({"status": "ok"})
# =========================
# STEP 1 - UPLOAD DATASET
# =========================
@app.route('/upload_dataset', methods=['POST'])
def upload_dataset():
if 'dataset' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['dataset']
if not file.filename.endswith('.zip'):
return jsonify({"error": "File harus berupa .zip"}), 400
zip_path = "uploads/data.zip"
os.makedirs("uploads", exist_ok=True)
file.save(zip_path)
extract_path = "uploads/custom_data"
if os.path.exists(extract_path):
shutil.rmtree(extract_path)
with zipfile.ZipFile(zip_path, 'r') as z:
z.extractall(extract_path)
return jsonify({"status": "ok", "message": "Dataset berhasil diupload dan diekstrak"})
# =========================
# STEP 2 - SPLIT DATASET
# =========================
@app.route('/split_dataset', methods=['POST'])
def split_dataset():
import random
src = "uploads/custom_data"
train_pct = float(request.json.get("train_pct", 0.9))
images = glob.glob(f"{src}/**/*.jpg", recursive=True) + \
glob.glob(f"{src}/**/*.jpeg", recursive=True) + \
glob.glob(f"{src}/**/*.png", recursive=True)
if len(images) == 0:
return jsonify({"error": "Tidak ada gambar ditemukan di dataset"}), 400
random.shuffle(images)
split_idx = int(len(images) * train_pct)
train_imgs = images[:split_idx]
val_imgs = images[split_idx:]
for split, imgs in [("train", train_imgs), ("validation", val_imgs)]:
os.makedirs(f"uploads/data/{split}/images", exist_ok=True)
os.makedirs(f"uploads/data/{split}/labels", exist_ok=True)
for img_path in imgs:
shutil.copy(img_path, f"uploads/data/{split}/images/")
label_path = img_path.rsplit(".", 1)[0] + ".txt"
label_path = label_path.replace("\\images\\", "\\labels\\").replace("/images/", "/labels/")
if os.path.exists(label_path):
shutil.copy(label_path, f"uploads/data/{split}/labels/")
return jsonify({
"status": "ok",
"train": len(train_imgs),
"validation": len(val_imgs),
"message": f"Dataset dibagi: {len(train_imgs)} train, {len(val_imgs)} validasi"
})
# =========================
# STEP 3 - BUAT data.yaml
# =========================
@app.route('/create_yaml', methods=['POST'])
def create_yaml():
classes_txt = "uploads/custom_data/classes.txt"
if not os.path.exists(classes_txt):
found = glob.glob("uploads/custom_data/**/classes.txt", recursive=True)
if found:
classes_txt = found[0]
else:
return jsonify({"error": "classes.txt tidak ditemukan di dalam dataset"}), 400
with open(classes_txt, 'r') as f:
classes = [line.strip() for line in f.readlines() if line.strip()]
data = {
'path': os.path.abspath("uploads/data").replace("\\", "/"),
'train': 'train/images',
'val': 'validation/images',
'nc': len(classes),
'names': classes
}
yaml_path = "uploads/data.yaml"
with open(yaml_path, 'w') as f:
yaml.dump(data, f, sort_keys=False)
return jsonify({
"status": "ok",
"classes": classes,
"message": f"data.yaml berhasil dibuat dengan {len(classes)} kelas: {', '.join(classes)}"
})
# =========================
# STEP 4 - MULAI TRAINING
# =========================
@app.route('/start_training', methods=['POST'])
def start_training():
global training_running
if training_running:
return jsonify({"error": "Training sedang berjalan"}), 400
data = request.json or {}
epochs = int(data.get("epochs", 60))
imgsz = int(data.get("imgsz", 640))
model_size = data.get("model", "yolov8n.pt")
model_name = data.get("model_name", "").strip()
yaml_path = os.path.abspath("uploads/data.yaml")
if not os.path.exists(yaml_path):
return jsonify({"error": "data.yaml belum dibuat, jalankan Step 4 dulu"}), 400
def run_training():
global training_running
training_running = True
while not training_log_queue.empty():
training_log_queue.get()
project_path = os.path.abspath("uploads/runs").replace("\\", "/")
train_script = f"""
from ultralytics import YOLO
model = YOLO("{model_size}")
model.train(
data=r"{yaml_path}",
epochs={epochs},
imgsz={imgsz},
project=r"{project_path}",
name="train",
exist_ok=True
)
"""
script_path = "uploads/_train_runner.py"
with open(script_path, "w") as f:
f.write(train_script)
cmd = [sys.executable, script_path]
training_log_queue.put(f"▶ Memulai training: {model_size}, epochs={epochs}, imgsz={imgsz}\n")
training_log_queue.put(f"▶ Python: {sys.executable}\n\n")
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
encoding='utf-8',
errors='replace'
)
for line in process.stdout:
training_log_queue.put(line)
process.wait()
if process.returncode == 0:
best_src = os.path.join("uploads", "runs", "train", "weights", "best.pt")
last_src = os.path.join("uploads", "runs", "train", "weights", "last.pt")
# Buat nama folder model
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_model = model_size.replace(".pt", "")
display_name = model_name if model_name else f"{base_model}_{timestamp}"
save_dir = os.path.join("models", display_name)
os.makedirs(save_dir, exist_ok=True)
msg = "\n"
if os.path.exists(best_src):
shutil.copy(best_src, os.path.join(save_dir, "best.pt"))
msg += f"✅ best.pt disimpan ke models/{display_name}/\n"
else:
msg += "⚠️ best.pt tidak ditemukan.\n"
if os.path.exists(last_src):
shutil.copy(last_src, os.path.join(save_dir, "last.pt"))
msg += f"✅ last.pt disimpan ke models/{display_name}/\n"
else:
msg += "⚠️ last.pt tidak ditemukan.\n"
# Simpan info model
info = {
"name": display_name,
"base_model": model_size,
"epochs": epochs,
"imgsz": imgsz,
"timestamp": timestamp,
"created": datetime.now().strftime("%d %b %Y, %H:%M")
}
with open(os.path.join(save_dir, "info.yaml"), "w") as f:
yaml.dump(info, f)
msg += f"\n💾 Model tersimpan sebagai: {display_name}"
training_log_queue.put(msg)
else:
training_log_queue.put(f"\n❌ Training gagal dengan kode: {process.returncode}\n")
training_log_queue.put("__DONE__")
training_running = False
thread = threading.Thread(target=run_training, daemon=True)
thread.start()
return jsonify({"status": "ok", "message": "Training dimulai"})
# =========================
# STREAM LOG TRAINING (SSE)
# =========================
# Kata kunci baris yang TIDAK perlu ditampilkan
_LOG_SKIP = [
"Scanning", "images,", "corrupt", "it/s",
"backgrounds", "labels...", "cache"
]
@app.route('/training_log')
def training_log():
def generate():
while True:
try:
line = training_log_queue.get(timeout=2)
if line == "__DONE__":
yield "data: __DONE__\n\n"
break
clean = line.rstrip()
if not clean:
continue
# Skip baris yang tidak penting (scanning, progress bar, dll)
if any(skip in clean for skip in _LOG_SKIP):
continue
safe = clean.replace("\n", " ")
yield f"data: {safe}\n\n"
except queue.Empty:
yield ": keepalive\n\n"
return Response(generate(), mimetype='text/event-stream',
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
"Connection": "keep-alive"
})
# =========================
# DAFTAR SEMUA MODEL
# =========================
@app.route('/list_models')
def list_models():
models_list = []
active = "best.pt"
if os.path.exists(active_model_file):
with open(active_model_file, "r") as f:
active = f.read().strip()
if not os.path.exists("models"):
return jsonify({"models": [], "active": active})
for folder in sorted(os.listdir("models"), reverse=True):
folder_path = os.path.join("models", folder)
if not os.path.isdir(folder_path):
continue
best_path = os.path.join(folder_path, "best.pt")
if not os.path.exists(best_path):
continue
info_path = os.path.join(folder_path, "info.yaml")
info = {}
if os.path.exists(info_path):
with open(info_path, "r") as f:
info = yaml.safe_load(f) or {}
models_list.append({
"name": folder,
"base_model": info.get("base_model", "-"),
"epochs": info.get("epochs", "-"),
"imgsz": info.get("imgsz", "-"),
"created": info.get("created", "-"),
"active": (active == folder)
})
return jsonify({"models": models_list, "active": active})
# =========================
# TERAPKAN MODEL
# =========================
@app.route('/apply_model', methods=['POST'])
def apply_model():
global model
data = request.json or {}
model_name = data.get("name", "")
best_path = os.path.join("models", model_name, "best.pt")
if not os.path.exists(best_path):
return jsonify({"error": f"Model '{model_name}' tidak ditemukan"}), 404
shutil.copy(best_path, "best.pt")
model = YOLO("best.pt")
with open(active_model_file, "w") as f:
f.write(model_name)
return jsonify({"status": "ok", "message": f"Model '{model_name}' berhasil diterapkan!"})
# =========================
# DOWNLOAD MODEL TERTENTU
# =========================
@app.route('/download_model/<model_name>')
def download_model(model_name):
folder_path = os.path.join("models", model_name)
best_src = os.path.join(folder_path, "best.pt")
last_src = os.path.join(folder_path, "last.pt")
if not os.path.exists(best_src):
return jsonify({"error": "Model tidak ditemukan"}), 404
zip_path = os.path.join("uploads", f"{model_name}.zip")
with zipfile.ZipFile(zip_path, 'w') as z:
z.write(best_src, "best.pt")
if os.path.exists(last_src):
z.write(last_src, "last.pt")
return send_file(os.path.abspath(zip_path), as_attachment=True,
download_name=f"{model_name}.zip")
# =========================
# STATUS TRAINING
# =========================
@app.route('/training_status')
def training_status():
return jsonify({"running": training_running})
# =========================
# DETEKSI (ESP32 + POSTMAN)
# =========================
@app.route('/detect', methods=['POST'])
def detect():
filepath = "temp.jpg"
if request.data:
with open(filepath, "wb") as f:
f.write(request.data)
elif 'image' in request.files:
file = request.files['image']
file.save(filepath)
else:
return jsonify({"error": "No image uploaded"})
results = model(filepath, conf=0.6)
boxes = results[0].boxes
if len(boxes) > 0:
best_idx = boxes.conf.argmax()
cls_id = int(boxes.cls[best_idx])
label = results[0].names[cls_id]
else:
label = "Tidak terdeteksi"
shutil.copy(filepath, "static/latest.jpg")
latest_result["nominal"] = label
return jsonify({"nominal": label})
# =========================
# DATA MONITORING
# =========================
@app.route('/latest')
def latest():
return jsonify(latest_result)
# =========================
# SERVE AUDIO WAV
# =========================
@app.route('/audio/<filename>')
def serve_audio(filename):
# Hanya izinkan ekstensi .wav
if not filename.endswith('.wav'):
return jsonify({"error": "Hanya file .wav yang diizinkan"}), 400
audio_path = os.path.join("static", "audio", filename)
if not os.path.exists(audio_path):
return jsonify({"error": f"File {filename} tidak ditemukan"}), 404
return send_file(
os.path.abspath(audio_path),
mimetype='audio/wav',
as_attachment=False
)
# =========================
# TESTING UPLOAD + BOUNDING BOX
# =========================
@app.route('/upload_test', methods=['POST'])
def upload_test():
file = request.files['image']
filepath = "static/test.jpg"
file.save(filepath)
results = model(filepath, conf=0.6)
result_img = results[0].plot()
output_path = "static/result.jpg"
cv2.imwrite(output_path, result_img)
boxes = results[0].boxes
if len(boxes) > 0:
best_idx = boxes.conf.argmax()
conf = float(boxes.conf[best_idx])
cls_id = int(boxes.cls[best_idx])
label = results[0].names[cls_id]
else:
label = "Tidak terdeteksi"
conf = 0
return jsonify({
"image": "/" + output_path,
"nominal": label,
"confidence": round(conf, 2)
})
# =========================
# RUN SERVER
# =========================
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False, threaded=True)