Files
2023-05-04 13:45:21 +02:00

178 lines
6.3 KiB
Python

from hand_detector import HandDetector, FingerCountDetector
from audio_detector import record, analyze, test
from network import ApiClient, WebsocketServer
import time
import speech_recognition as sr
#Classe qui coordonne les différents modules et qui s'occupe de construire l'avis au fur et a mesure
class Manager():
def __init__(self):
self.state = 0
self.defualtAvis = {
"note": None,
"commentaire": None,
"notes_autres": {}
}
self.TIMEOUT_CAMERA = 5
self.avis = self.defualtAvis
self.server = WebsocketServer(None)
self.server.start()
self.handDetector = HandDetector()
self.fingerCountDetector = FingerCountDetector()
self.api = ApiClient()
self.timeLastChange = time.time()
self.isLastHandPacketEmpty = False
self.recongizer = sr.Recognizer()
#For step 2
self.criteria_list = self.api.get_criteria()
self.currentCriteria = None
print("Backend ready")
#Boucle principale
def loop(self):
while(True):
if(self.state == 0):
self.sleep()
if(self.state == 1):
self.camera()
if(self.state == 2):
self.grade()
if(self.state == 3):
self.audio()
if(self.state == 4):
self.thankYou()
time.sleep(0.01)
#Fonction qui est executée pendant que la borne est en veille, reveille la borne si une main est detectée
def sleep(self):
res = self.handDetector.loop()
if(res != False):
self.state = 1
self.timeLastChange = time.time()
print("Change to state 1")
self.server.sendMessage({"type": "state", "state": 1})
#Envoie la position de la main a l'écran et passe a l'étape suivante si une main est detectée pendant assez longtemps
def camera(self):
if(time.time() - self.timeLastChange > self.TIMEOUT_CAMERA):
self.server.sendMessage({"type":"reset"})
self.reset()
return
res = self.handDetector.loop()
if(res != False):
state, coords, size, finalDecision, progress = res
self.server.sendMessage({
"type": "effects",
"effects": [
{
"type": "loading",
"x":coords[0],
"y": coords[1],
"width": size,
"height": size,
"progress": progress
},
{
"type": state,
"x":coords[0],
"y": coords[1],
"width": size,
"height": size
}
]
})
self.isLastHandPacketEmpty = False
self.timeLastChange = time.time()
if(finalDecision != False):
self.avis["note"] = 10 if finalDecision == "thumbs_up" else 0
self.state = 2
self.server.sendMessage({"type": "state", "state": 2})
elif self.isLastHandPacketEmpty == False:
self.server.sendMessage({"type":"effects","effects":[]})
self.isLastHandPacketEmpty = True
def grade(self):
#Premier passage dans la boucle
if(self.currentCriteria == None):
self.currentCriteria = 0
self.server.sendMessage({
"type": "state",
"state": 2
})
self.server.sendMessage({
"type":"new_criteria",
"criteria":self.criteria_list[self.currentCriteria]
})
return
grade = self.fingerCountDetector.loop()
if(grade != None):
self.avis["notes_autres"][self.criteria_list[self.currentCriteria]] = grade
self.server.sendMessage({
"type":"criteria_grade",
"grade":grade
})
time.sleep(3)
self.currentCriteria+=1
if(self.currentCriteria < len(self.criteria_list)):
self.server.sendMessage({
"type":"new_criteria",
"criteria":self.criteria_list[self.currentCriteria]
})
self.fingerCountDetector.reset()
else:
print("Change to state 3")
self.state = 3
self.server.sendMessage({"type": "state", "state": 3})
def audio(self):
self.server.sendMessage({"type":"recording_started"})
with sr.Microphone() as source:
audio = self.recongizer.listen(source)
self.server.sendMessage({"type":"recording_done"})
try:
# utilisez le recognizer pour effectuer la reconnaissance vocale
texte = self.recongizer.recognize_google(audio, language='fr-FR')
print("AUDIO MESSAGE RECONGIZED : " + texte)
self.server.sendMessage({"type":"new_grade","word":texte})
self.avis["commentaire"] = texte
# Audio non reconnu
except sr.UnknownValueError:
print("MESSAGE NOT RECONGIZED")
self.server.sendMessage({"type":"new_grade","word":"Avis non reconnu"})
self.avis["commentaire"] = ""
#Erreur d'accès à l'api
except sr.RequestError as e:
print("Erreur avec l'api de reconnaissance vocale")
self.avis["commentaire"] = ""
self.server.sendMessage({"type":"new_grade","word":"Avis non reconnu"})
print("Impossible de récupérer les résultats de la reconnaissance vocale; {0}".format(e))
#On passe a la suite
time.sleep(5)
self.state = 4
self.server.sendMessage({"type": "state", "state": 4})
def thankYou(self):
#On envoie l'avis à l'api
res = self.api.send(self.avis["note"],self.avis["commentaire"])
time.sleep(10)
print("Reseting...")
self.timeLastChange = time.time()
self.server.sendMessage({"type": "state", "state": 0})
print(res.text)
self.reset()
def reset(self):
self.state = 0
self.currentCriteria = None
self.avis = self.defualtAvis
self.handDetector.reset()