198 lines
6.9 KiB
Python
198 lines
6.9 KiB
Python
import os
|
|
import cv2
|
|
import numpy as np
|
|
import requests
|
|
import urllib.request
|
|
import time
|
|
|
|
# URL untuk file weights dan konfigurasi YOLOv3-tiny
|
|
weights_url_tiny = "https://pjreddie.com/media/files/yolov3-tiny.weights"
|
|
config_url_tiny = "https://raw.githubusercontent.com/pjreddie/darknet/master/cfg/yolov3-tiny.cfg"
|
|
|
|
# Path tempat Anda ingin menyimpan file weights dan konfigurasi
|
|
weights_path_tiny = "yolo_models/yolov3-tiny.weights"
|
|
config_path_tiny = "yolo_models/yolov3-tiny.cfg"
|
|
|
|
# Buat direktori jika belum ada
|
|
os.makedirs("yolo_models", exist_ok=True)
|
|
|
|
# Fungsi untuk mengunduh file dari URL dan menyimpannya
|
|
def download_file(url, save_path):
|
|
print(f"Downloading {url}...")
|
|
urllib.request.urlretrieve(url, save_path)
|
|
print(f"File saved at {save_path}")
|
|
|
|
# Unduh file weights jika belum ada
|
|
if not os.path.exists(weights_path_tiny):
|
|
download_file(weights_url_tiny, weights_path_tiny)
|
|
|
|
# Unduh file konfigurasi jika belum ada
|
|
if not os.path.exists(config_path_tiny):
|
|
download_file(config_url_tiny, config_path_tiny)
|
|
|
|
# Load YOLOv3-tiny model and config file
|
|
net = cv2.dnn.readNetFromDarknet(config_path_tiny, weights_path_tiny)
|
|
|
|
# Jika Anda memiliki GPU yang didukung, aktifkan CUDA
|
|
if cv2.cuda.getCudaEnabledDeviceCount() > 0:
|
|
net.setPreferableBackend(cv2.dnn.DNN_BACKEND_CUDA)
|
|
net.setPreferableTarget(cv2.dnn.DNN_TARGET_CUDA)
|
|
else:
|
|
print("CUDA is not available. Running on CPU.")
|
|
|
|
# Open your laptop's camera
|
|
cap = cv2.VideoCapture(1) # Use '0' for built-in webcam, '1' for external webcam
|
|
assert cap.isOpened(), "Error accessing webcam"
|
|
|
|
# Kurangi frame rate kamera
|
|
cap.set(cv2.CAP_PROP_FPS, 15) # Set FPS to 15 for reducing load
|
|
|
|
# Initialize variables
|
|
people_in = 0
|
|
people_out = 0
|
|
next_person_id = 0
|
|
tracked_objects = {}
|
|
last_sent_people_in = 0
|
|
last_sent_people_out = 0
|
|
|
|
# Function to check if a person crosses the vertical line
|
|
def is_crossing_line(prev_centroid, current_centroid, line_position, direction):
|
|
if direction == 'in':
|
|
return prev_centroid[0] <= line_position < current_centroid[0]
|
|
elif direction == 'out':
|
|
return prev_centroid[0] >= line_position > current_centroid[0]
|
|
return False
|
|
|
|
# Function to send data to ESP32
|
|
def send_to_esp32(in_count, out_count, total_count):
|
|
url = 'http://192.168.249.72/update_counts' # Replace with the IP address of your ESP32
|
|
payload = {
|
|
'in': in_count,
|
|
'out': out_count,
|
|
'total': total_count
|
|
}
|
|
headers = {'Content-Type': 'application/json'}
|
|
try:
|
|
response = requests.post(url, json=payload, headers=headers)
|
|
print(f'Status Code: {response.status_code}, Response: {response.text}')
|
|
except requests.exceptions.RequestException as e:
|
|
print(f'Error sending data to ESP32: {e}')
|
|
|
|
while True:
|
|
start_time = time.time()
|
|
ret, frame = cap.read()
|
|
if not ret:
|
|
break
|
|
|
|
height, width, _ = frame.shape
|
|
line_position_A = width // 2 - 50
|
|
line_position_B = width // 2 + 50
|
|
|
|
resized_frame = cv2.resize(frame, (320, 320))
|
|
blob = cv2.dnn.blobFromImage(resized_frame, 1/255.0, (320, 320), swapRB=True, crop=False)
|
|
net.setInput(blob)
|
|
|
|
layer_names = net.getLayerNames()
|
|
output_layers = [layer_names[i - 1] for i in net.getUnconnectedOutLayers()]
|
|
|
|
detections = net.forward(output_layers)
|
|
|
|
current_centroids = []
|
|
|
|
confidence_threshold = 0.5
|
|
nms_threshold = 0.4
|
|
|
|
boxes = []
|
|
confidences = []
|
|
class_ids = []
|
|
|
|
for detection in detections:
|
|
for obj in detection:
|
|
scores = obj[5:]
|
|
class_id = np.argmax(scores)
|
|
confidence = scores[class_id]
|
|
|
|
if class_id == 0 and confidence > confidence_threshold:
|
|
box = obj[0:4] * np.array([width, height, width, height])
|
|
(x_center, y_center, w, h) = box.astype("int")
|
|
x1 = int(x_center - w / 2)
|
|
y1 = int(y_center - h / 2)
|
|
x2 = x1 + w
|
|
y2 = y1 + h
|
|
|
|
boxes.append([x1, y1, x2 - x1, y2 - y1])
|
|
confidences.append(float(confidence))
|
|
class_ids.append(class_id)
|
|
|
|
indices = cv2.dnn.NMSBoxes(boxes, confidences, confidence_threshold, nms_threshold)
|
|
|
|
if len(indices) > 0:
|
|
for i in indices.flatten():
|
|
box = boxes[i]
|
|
x1, y1, w, h = box
|
|
x2 = x1 + w
|
|
y2 = y1 + h
|
|
|
|
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
|
|
label = f'Person {confidences[i]:.2f}'
|
|
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)
|
|
|
|
centroid = ((x1 + x2) // 2, (y1 + y2) // 2)
|
|
current_centroids.append(centroid)
|
|
|
|
updated_tracked_objects = {}
|
|
for centroid in current_centroids:
|
|
min_distance = float("inf")
|
|
assigned_id = None
|
|
|
|
for person_id, data in tracked_objects.items():
|
|
prev_centroid = data['centroid']
|
|
distance = np.linalg.norm(np.array(centroid) - np.array(prev_centroid))
|
|
if distance < min_distance:
|
|
min_distance = distance
|
|
assigned_id = person_id
|
|
|
|
if min_distance > 50:
|
|
assigned_id = next_person_id
|
|
next_person_id += 1
|
|
|
|
updated_tracked_objects[assigned_id] = {'centroid': centroid, 'counted': tracked_objects.get(assigned_id, {'counted': False})['counted']}
|
|
|
|
if assigned_id in tracked_objects:
|
|
prev_centroid = tracked_objects[assigned_id]['centroid']
|
|
if not tracked_objects[assigned_id]['counted']:
|
|
if is_crossing_line(prev_centroid, centroid, line_position_A, 'in'):
|
|
people_in += 1
|
|
updated_tracked_objects[assigned_id]['counted'] = True
|
|
elif is_crossing_line(prev_centroid, centroid, line_position_B, 'out'):
|
|
people_out += 1
|
|
updated_tracked_objects[assigned_id]['counted'] = True
|
|
|
|
tracked_objects = updated_tracked_objects
|
|
|
|
cv2.line(frame, (line_position_A, 0), (line_position_A, height), (0, 0, 255), 2)
|
|
cv2.line(frame, (line_position_B, 0), (line_position_B, height), (0, 0, 255), 2)
|
|
|
|
total_people = people_in - people_out
|
|
cv2.putText(frame, f'IN: {people_in}', (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
|
cv2.putText(frame, f'OUT: {people_out}', (10, 100), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
|
cv2.putText(frame, f'Total: {total_people}', (10, 150), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 255, 0), 2)
|
|
|
|
if people_in != last_sent_people_in or people_out != last_sent_people_out:
|
|
send_to_esp32(people_in, people_out, total_people)
|
|
last_sent_people_in = people_in
|
|
last_sent_people_out = people_out
|
|
|
|
cv2.imshow('YOLOv3-tiny Detection', frame)
|
|
|
|
if cv2.waitKey(1) == ord('q'):
|
|
break
|
|
|
|
end_time = time.time()
|
|
elapsed_time = end_time - start_time
|
|
wait_time = max(1.0 / 15 - elapsed_time, 0)
|
|
time.sleep(wait_time)
|
|
|
|
cap.release()
|
|
cv2.destroyAllWindows()
|