from flask import Flask, Response, jsonify, redirect, render_template, session, request, url_for import mysql.connector import cv2 from PIL import Image import numpy as np import os from datetime import date from flask_cors import CORS import datetime import ffmpeg import threading from flask import Flask, Response from video_stream import start_video_stream, get_video_stream app = Flask(__name__) app.secret_key = 'your_secret_key_here' CORS(app) cnt = 0 pause_cnt = 0 justscanned = False mydb = mysql.connector.connect( host="localhost", user="root", passwd="", database="flask_db" ) mycursor = mydb.cursor() # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Generate dataset >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def generate_dataset(nbr): face_classifier = cv2.CascadeClassifier("D:/TUGAS AKHIR/WEB/resources/haarcascade_frontalface_default.xml") def face_cropped(img): gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) faces = face_classifier.detectMultiScale(gray, 1.1, 3) if len(faces) == 0: return None for (x, y, w, h) in faces: cropped_face = img[y:y + h, x:x + w] return cropped_face return None cap = cv2.VideoCapture(0) mycursor.execute("SELECT IFNULL(MAX(img_id), 0) FROM img_dataset") row = mycursor.fetchone() lastid = row[0] img_id = lastid max_imgid = img_id + 100 count_img = 0 while True: ret, img = cap.read() face = face_cropped(img) if face is not None: count_img += 1 img_id += 1 face = cv2.resize(face, (300, 300)) face = cv2.cvtColor(face, cv2.COLOR_BGR2GRAY) file_name_path = f"dataset/{nbr}.{img_id}.jpg" cv2.imwrite(file_name_path, face) cv2.putText(face, str(count_img), (50, 50), cv2.FONT_HERSHEY_COMPLEX, 1, (0, 255, 0), 2) mycursor.execute("INSERT INTO img_dataset (img_id, img_person) VALUES (%s, %s)", (img_id, nbr)) mydb.commit() frame = cv2.imencode('.jpg', face)[1].tobytes() yield (b'--frame\r\n'b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n') if cv2.waitKey(1) == 13 or img_id >= max_imgid: break cap.release() cv2.destroyAllWindows() @app.route('/train_classifier/') def train_classifier(nbr): dataset_dir = "D:/TUGAS AKHIR/WEB/dataset" path = [os.path.join(dataset_dir, f) for f in os.listdir(dataset_dir) if f.endswith(".jpg") and "_" not in f] faces = [] ids = [] for image in path: img = Image.open(image).convert('L') imageNp = np.array(img, 'uint8') id = int(os.path.split(image)[1].split(".")[1]) faces.append(imageNp) ids.append(id) ids = np.array(ids) # Latih classifier dan simpan clf = cv2.face.LBPHFaceRecognizer_create() clf.train(faces, ids) clf.write("classifier.xml") return redirect('/') # <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< Face Recognition >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> def start_face_recognition_thread(): face_recognition_thread = threading.Thread(target=face_recognition) face_recognition_thread.start() def face_recognition(): def draw_boundary(img, faceCascade, color, scaleFactor, minNeighbors, clf): global justscanned, pause_cnt, cnt gray_image = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) faces = faceCascade.detectMultiScale(gray_image, scaleFactor=scaleFactor, minNeighbors=minNeighbors) pause_cnt += 1 coords = [] for (x, y, w, h) in faces: cv2.rectangle(img, (x, y), (x + w, y + h), color, 2) roi_gray = gray_image[y:y + h, x:x + w] id, pred = clf.predict(roi_gray) confidence = int(100 * (1 - pred / 300)) if confidence > 80 and not justscanned: mycursor.execute("SELECT a.img_person, b.prs_name, b.prs_skill FROM img_dataset a LEFT JOIN prs_mstr b ON a.img_person = b.prs_nbr WHERE img_id = %s", (id,)) row = mycursor.fetchone() if row is not None: pnbr = row[0] pname = row[1] pskill = row[2] cnt += 1 n = (100 / 60) * cnt w_filled = (cnt / 60) * w cv2.putText(img, str(int(n))+' %', (x + 20, y + h + 28), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (153, 255, 255), 2, cv2.LINE_AA) cv2.rectangle(img, (x, y + h + 40), (x + w, y + h + 50), color, 2) cv2.rectangle(img, (x, y + h + 40), (x + int(w_filled), y + h + 50), (153, 255, 255), cv2.FILLED) if cnt == 60: cnt = 0 mycursor.execute("INSERT INTO accs_hist (accs_date, accs_prsn) VALUES (%s, %s)", (str(datetime.date.today()), pnbr)) mydb.commit() cv2.putText(img, pname + ' | ' + pskill, (x - 10, y - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (153, 255, 255), 2, cv2.LINE_AA) justscanned = True pause_cnt = 0 else: if not justscanned: cv2.putText(img, 'UNKNOWN', (x, y - 5), cv2.FONT_HERSHEY_SIMPLEX, 0.8, (0, 0, 255), 2, cv2.LINE_AA) coords = [x, y, w, h] if pause_cnt > 80: justscanned = False return coords def recognize(img, clf, faceCascade): coords = draw_boundary(img, faceCascade, (255, 255, 0), 1.2, 7, clf) return img faceCascade = cv2.CascadeClassifier("D:/TUGAS AKHIR/WEB/resources/haarcascade_frontalface_default.xml") clf = cv2.face.LBPHFaceRecognizer_create() clf.read("classifier.xml") # cap = cv2.VideoCapture(0) # cap = cv2.VideoCapture("rtsp://admin:LKAYXK@192.168.188.80:554") video_stream = get_video_stream() while True: try: frame = next(video_stream) img = cv2.imdecode(np.frombuffer(frame.split(b'\r\n\r\n')[1], np.uint8), cv2.IMREAD_COLOR) img = recognize(img, clf, faceCascade) frame = cv2.imencode('.jpg', img)[1].tobytes() yield (b'--frame\r\n' b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n\r\n') key = cv2.waitKey(1) if key == 27: break except StopIteration: break cv2.destroyAllWindows() #>>>>>>>>>>>>>>>>>>>>>>>>>>>ROUTE<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< @app.route('/') def home(): mycursor.execute("SELECT prs_nbr, prs_name, prs_skill, prs_active, prs_added FROM prs_mstr") data = mycursor.fetchall() if 'username' in session: return render_template('index.html', username=session['username'], data=data) else: return render_template('index.html', data=data) @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] cur = mydb.cursor() cur.execute("SELECT username, password FROM user_adm WHERE username = %s", (username,)) user = cur.fetchone() cur.close() if user and password == user[1]: session['username'] = user[0] return redirect(url_for('home')) else: error = "Invalid username or password!" return render_template('login.html', error=error) return render_template('login.html') @app.route('/register', methods=['GET', 'POST']) def register(): if request.method == 'POST': username = request.form['username'] email = request.form['email'] password = request.form['password'] cur = mydb.cursor() cur.execute("INSERT INTO user_adm (username, email, password) VALUES (%s, %s, %s)", (username, email, password)) mydb.commit() cur.close() session['message'] = 'Registration successful. You can now login.' return redirect(url_for('register')) return render_template('register.html') @app.route('/logout') def logout(): session.pop('username', None) return redirect(url_for('login')) @app.route('/user') def user(): mycursor.execute("select prs_nbr, prs_name, prs_skill, prs_active, prs_added from prs_mstr") data = mycursor.fetchall() return render_template('user.html', data=data) @app.route('/registration') def addprsn(): mycursor.execute("select ifnull(max(prs_nbr) + 1, 101) from prs_mstr") row = mycursor.fetchone() nbr = row[0] # print(int(nbr)) return render_template('addprsn.html', newnbr=int(nbr)) @app.route('/addprsn_submit', methods=['POST']) def addprsn_submit(): prsnbr = request.form.get('txtnbr') prsname = request.form.get('txtname') prsskill = request.form.get('optskill') mycursor.execute("""INSERT INTO `prs_mstr` (`prs_nbr`, `prs_name`, `prs_skill`) VALUES ('{}', '{}', '{}')""".format(prsnbr, prsname, prsskill)) mydb.commit() # return redirect(url_for('home')) return redirect(url_for('vfdataset_page', prs=prsnbr)) @app.route('/vfdataset_page/') def vfdataset_page(prs): return render_template('gendataset.html', prs=prs) @app.route('/vidfeed_dataset/') def vidfeed_dataset(nbr): #Video streaming route. Put this in the src attribute of an img tag return Response(generate_dataset(nbr), mimetype='multipart/x-mixed-replace; boundary=frame') # @app.route('/video_feed') # def video_feed(): # # Video streaming route. Put this in the src attribute of an img tag # return Response(face_recognition(), mimetype='multipart/x-mixed-replace; boundary=frame') # # Rute untuk streaming video @app.route('/video_feed') def video_feed(): start_video_stream() # Memulai streaming video jika belum berjalan start_face_recognition_thread() return Response(face_recognition(), mimetype='multipart/x-mixed-replace; boundary=frame') # return Response(get_video_stream(), mimetype='multipart/x-mixed-replace; boundary=frame') @app.route('/fr_page') def fr_page(): """Video streaming home page.""" mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, a.accs_added " " from accs_hist a " " left join prs_mstr b on a.accs_prsn = b.prs_nbr " " where a.accs_date = curdate() " " order by 1 desc") data = mycursor.fetchall() return render_template('fr_page.html', data=data) @app.route('/countTodayScan') def countTodayScan(): mydb = mysql.connector.connect( host="localhost", user="root", passwd="", database="flask_db" ) mycursor = mydb.cursor() mycursor.execute("select count(*) " " from accs_hist " " where accs_date = curdate() ") row = mycursor.fetchone() rowcount = row[0] return jsonify({'rowcount': rowcount}) @app.route('/loadData', methods = ['GET', 'POST']) def loadData(): mydb = mysql.connector.connect( host="localhost", user="root", passwd="", database="flask_db" ) mycursor = mydb.cursor() mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, date_format(a.accs_added, '%H:%i:%s') " " from accs_hist a " " left join prs_mstr b on a.accs_prsn = b.prs_nbr " " where a.accs_date = curdate() " " order BY a.accs_id DESC" " limit 10") data = mycursor.fetchall() return jsonify(response = data) @app.route('/loadDataLog', methods = ['GET', 'POST']) def loadDataLog(): mydb = mysql.connector.connect( host="localhost", user="root", passwd="", database="flask_db" ) mycursor = mydb.cursor() mycursor.execute("select a.accs_id, a.accs_prsn, b.prs_name, b.prs_skill, date_format(a.accs_added, '%d-%m-%Y %H:%i:%s') " " from accs_hist a " " left join prs_mstr b on a.accs_prsn = b.prs_nbr " " order BY a.accs_id DESC") data = mycursor.fetchall() return jsonify(response = data) @app.route('/log') def log(): mycursor.execute("select accs_id, accs_prsn, accs_date, accs_added from accs_hist") data = mycursor.fetchall() return render_template('log.html', data=data) if __name__ == "__main__": app.run(host='0.0.0.0', port=5000, debug=True)