TKK_E32210675/example.py

368 lines
12 KiB
Python

from flask import Flask, Response, jsonify, redirect, render_template, session, request, url_for
import mysql.connector
import cv2
from PIL import Image
import numpy as np
import os
from datetime import date
from flask_cors import CORS
import datetime
import ffmpeg
import threading
app = Flask(__name__)
app.secret_key = 'your_secret_key_here'
CORS(app)
cnt = 0
pause_cnt = 0
justscanned = False
mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="flask_db"
)
mycursor = mydb.cursor()
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Generate dataset >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def generate_dataset(nbr):
face_classifier = cv2.CascadeClassifier("D:/TUGAS AKHIR/WEB/resources/haarcascade_frontalface_default.xml")
def face_cropped(img):
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = face_classifier.detectMultiScale(gray, 1.1, 3)
if len(faces) == 0:
return None
for (x, y, w, h) in faces:
cropped_face = img[y:y + h, x:x + w]
return cropped_face
return None
cap = cv2.VideoCapture(0)
mycursor.execute("SELECT IFNULL(MAX(img_id), 0) FROM img_dataset")
row = mycursor.fetchone()
lastid = row[0]
img_id = lastid
max_imgid = img_id + 100
count_img = 0
while True:
ret, img = cap.read()
face = face_cropped(img)
if face is not None:
count_img += 1
img_id += 1
face = cv2.resize(face, (300, 300))
face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY)
file_name_path = f"dataset/{nbr}.{img_id}.jpg"
cv2.imwrite(file_name_path, face)
cv2.putText(face, str(count_img), (50, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 255, 0), 2)
mycursor.execute("INSERT INTO img_dataset (img_id, img_person) VALUES (%s, %s)", (img_id, nbr))
mydb.commit()
frame = cv2.imencode('.jpg', face)[1].tobytes()
yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')
if cv2.waitKey(1) == 13 or img_id >= max_imgid:
break
cap.release()
cv2.destroyAllWindows()
@app.route('/train_classifier/<nbr>')
def train_classifier(nbr):
dataset_dir = "D:/TUGAS AKHIR/WEB/dataset"
path = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(".jpg") and "_" not in f]
faces = []
ids = []
for image in path:
img = Image.open(image).convert('L')
imageNp = np.array(img, 'uint8')
id = int(os.path.split(image)[1].split(".")[1])
faces.append(imageNp)
ids.append(id)
ids = np.array(ids)
# Latih classifier dan simpan
clf = cv2.face.LBPHFaceRecognizer_create()
clf.train(faces, ids)
clf.write("classifier.xml")
return redirect('/')
# <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Face Recognition >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
def face_recognition():
def draw_boundary(img, faceCascade, color, scaleFactor, minNeighbors, clf):
global justscanned, pause_cnt, cnt
gray_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
faces = faceCascade.detectMultiScale(gray_image, scaleFactor=scaleFactor, minNeighbors=minNeighbors)
pause_cnt += 1
coords = []
for (x, y, w, h) in faces:
cv2.rectangle(img, (x, y), (x + w, y + h), color, 2)
roi_gray = gray_image[y:y + h, x:x + w]
id, pred = clf.predict(roi_gray)
confidence = int(100 * (1 - pred / 300))
if confidence > 80 and not justscanned:
mycursor.execute("SELECT a.img_person, b.prs_name, b.prs_skill FROM img_dataset a LEFT JOIN prs_mstr b ON a.img_person = b.prs_nbr WHERE img_id = %s", (id,))
row = mycursor.fetchone()
if row is not None:
pnbr = row[0]
pname = row[1]
pskill = row[2]
cnt += 1
n = (100 / 60) * cnt
w_filled = (cnt / 60) * w
cv2.putText(img, str(int(n))+' %', (x + 20, y + h + 28), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (153, 255, 255), 2, cv2.LINE_AA)
cv2.rectangle(img, (x, y + h + 40), (x + w, y + h + 50), color, 2)
cv2.rectangle(img, (x, y + h + 40), (x + int(w_filled), y + h + 50), (153, 255, 255), cv2.FILLED)
if cnt == 60:
cnt = 0
mycursor.execute("INSERT INTO accs_hist (accs_date, accs_prsn) VALUES (%s, %s)", (str(datetime.date.today()), pnbr))
mydb.commit()
cv2.putText(img, pname + ' | ' + pskill, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (153, 255, 255), 2, cv2.LINE_AA)
justscanned = True
pause_cnt = 0
else:
if not justscanned:
cv2.putText(img, 'UNKNOWN', (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA)
coords = [x, y, w, h]
if pause_cnt > 80:
justscanned = False
return coords
def recognize(img, clf, faceCascade):
coords = draw_boundary(img, faceCascade, (255, 255, 0), 1.2, 7, clf)
return img
faceCascade = cv2.CascadeClassifier("D:/TUGAS AKHIR/WEB/resources/haarcascade_frontalface_default.xml")
clf = cv2.face.LBPHFaceRecognizer_create()
clf.read("classifier.xml")
wCam, hCam = 200, 200
cap = cv2.VideoCapture(0)
# cap = cv2.VideoCapture("rtsp://admin:LKAYXK@192.168.188.80:554")
# cap = cv2.VideoCapture("http://192.168.1.82:81/stream")
# cap = cv2.VideoCapture("http://192.168.108.1:81/stream")
cap.set(3, wCam)
cap.set(2, hCam)
cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)
while True:
ret, img = cap.read()
# if not ret:
# print("Failed opening video stream or video file")
# continue
img = recognize(img, clf, faceCascade)
frame = cv2.imencode('.jpg', img)[1].tobytes()
yield (b'--frame\r\n'
b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n')
key = cv2.waitKey(1)
if key == 27:
break
cap.release()
cv2.destroyAllWindows()
#>>>>>>>>>>>>>>>>>>>>>>>>>>>ROUTE<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
@app.route('/')
def home():
mycursor.execute("SELECT prs_nbr, prs_name, prs_skill, prs_active, prs_added FROM prs_mstr")
data = mycursor.fetchall()
if 'username' in session:
return render_template('index.html', username=session['username'], data=data)
else:
return render_template('index.html', data=data)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
cur = mydb.cursor()
cur.execute("SELECT username, password FROM user_adm WHERE username = %s", (username,))
user = cur.fetchone()
cur.close()
if user and password == user[1]:
session['username'] = user[0]
return redirect(url_for('home'))
else:
error = "Invalid username or password!"
return render_template('login.html', error=error)
return render_template('login.html')
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form['username']
email = request.form['email']
password = request.form['password']
cur = mydb.cursor()
cur.execute("INSERT INTO user_adm (username, email, password) VALUES (%s, %s, %s)", (username, email, password))
mydb.commit()
cur.close()
session['message'] = 'Registration successful. You can now login.'
return redirect(url_for('register'))
return render_template('register.html')
@app.route('/logout')
def logout():
session.pop('username', None)
return redirect(url_for('login'))
@app.route('/user')
def user():
mycursor.execute("select prs_nbr, prs_name, prs_skill, prs_active, prs_added from prs_mstr")
data = mycursor.fetchall()
return render_template('user.html', data=data)
@app.route('/registration')
def addprsn():
mycursor.execute("select ifnull(max(prs_nbr) + 1, 101) from prs_mstr")
row = mycursor.fetchone()
nbr = row[0]
# print(int(nbr))
return render_template('addprsn.html', newnbr=int(nbr))
@app.route('/addprsn_submit', methods=['POST'])
def addprsn_submit():
prsnbr = request.form.get('txtnbr')
prsname = request.form.get('txtname')
prsskill = request.form.get('optskill')
mycursor.execute("""INSERT INTO `prs_mstr` (`prs_nbr`, `prs_name`, `prs_skill`) VALUES
('{}', '{}', '{}')""".format(prsnbr, prsname, prsskill))
mydb.commit()
# return redirect(url_for('home'))
return redirect(url_for('vfdataset_page', prs=prsnbr))
@app.route('/vfdataset_page/<prs>')
def vfdataset_page(prs):
return render_template('gendataset.html', prs=prs)
@app.route('/vidfeed_dataset/<nbr>')
def vidfeed_dataset(nbr):
#Video streaming route. Put this in the src attribute of an img tag
return Response(generate_dataset(nbr), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/video_feed')
def video_feed():
# Video streaming route. Put this in the src attribute of an img tag
return Response(face_recognition(), mimetype='multipart/x-mixed-replace; boundary=frame')
@app.route('/fr_page')
def fr_page():
"""Video streaming home page."""
mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, a.accs_added "
" from accs_hist a "
" left join prs_mstr b on a.accs_prsn = b.prs_nbr "
" where a.accs_date = curdate() "
" order by 1 desc")
data = mycursor.fetchall()
return render_template('fr_page.html', data=data)
@app.route('/countTodayScan')
def countTodayScan():
mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="flask_db"
)
mycursor = mydb.cursor()
mycursor.execute("select count(*) "
" from accs_hist "
" where accs_date = curdate() ")
row = mycursor.fetchone()
rowcount = row[0]
return jsonify({'rowcount': rowcount})
@app.route('/loadData', methods = ['GET', 'POST'])
def loadData():
mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="flask_db"
)
mycursor = mydb.cursor()
mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, date_format(a.accs_added, '%H:%i:%s') "
" from accs_hist a "
" left join prs_mstr b on a.accs_prsn = b.prs_nbr "
" where a.accs_date = curdate() "
" order BY a.accs_id DESC"
" limit 10")
data = mycursor.fetchall()
return jsonify(response = data)
@app.route('/loadDataLog', methods = ['GET', 'POST'])
def loadDataLog():
mydb = mysql.connector.connect(
host="localhost",
user="root",
passwd="",
database="flask_db"
)
mycursor = mydb.cursor()
mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, date_format(a.accs_added, '%d-%m-%Y %H:%i:%s') "
" from accs_hist a "
" left join prs_mstr b on a.accs_prsn = b.prs_nbr "
" order BY a.accs_id DESC")
data = mycursor.fetchall()
return jsonify(response = data)
@app.route('/log')
def log():
mycursor.execute("select accs_id, accs_prsn, accs_date, accs_added from accs_hist")
data = mycursor.fetchall()
return render_template('log.html', data=data)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)