mirror of
https://git.roussel.pro/telecom-paris/pact.git
synced 2026-02-09 02:20:17 +01:00
début intégration module reconaissance vocale
utilisation de la bdd
This commit is contained in:
1
code/backend_reconnaissance/.gitignore
vendored
Normal file
1
code/backend_reconnaissance/.gitignore
vendored
Normal file
@@ -0,0 +1 @@
|
||||
*.wav
|
||||
@@ -7,7 +7,7 @@ ENV PYTHONUNBUFFERED=1
|
||||
|
||||
#Installation des dépendances de opencv
|
||||
RUN apt-get update
|
||||
RUN apt-get install ffmpeg libsm6 libxext6 portaudio19-dev python3-pyaudio -y
|
||||
RUN apt-get install ffmpeg libsm6 libxext6 portaudio19-dev python3-pyaudio pulseaudio -y
|
||||
|
||||
# Installation des dépendances python
|
||||
COPY requirements.txt .
|
||||
|
||||
22
code/backend_reconnaissance/audio_data/metadata.json
Normal file
22
code/backend_reconnaissance/audio_data/metadata.json
Normal file
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"ennuyant": {
|
||||
"grade": 2,
|
||||
"display": "Ennuyant"
|
||||
},
|
||||
"genial": {
|
||||
"grade": 9,
|
||||
"display": "Génial"
|
||||
},
|
||||
"j_ai_beaucoup_aime": {
|
||||
"grade": 9,
|
||||
"display": "J'ai beaucoup aimé"
|
||||
},
|
||||
"j_ai_trouve_ca_genial": {
|
||||
"grade": 10,
|
||||
"display": "J'ai trouvé ça génial"
|
||||
},
|
||||
"nul": {
|
||||
"grade": 0,
|
||||
"display": "Nul"
|
||||
}
|
||||
}
|
||||
@@ -1,18 +1,11 @@
|
||||
import librosa
|
||||
import os
|
||||
import numpy as np
|
||||
import math
|
||||
from scipy.io import wavfile
|
||||
import wave
|
||||
from scipy.fftpack import fft,dct
|
||||
import time
|
||||
|
||||
from matplotlib.patches import ConnectionPatch
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import scipy.spatial.distance as dist
|
||||
import pyaudio
|
||||
import wave
|
||||
import json
|
||||
|
||||
def dp(distmat):
|
||||
N,M = distmat.shape
|
||||
# Initialisons the cost matrix
|
||||
@@ -35,91 +28,29 @@ def dp(distmat):
|
||||
#enlever les valeurs de l infini
|
||||
costmat = costmat[1: , 1:]
|
||||
return (costmat, costmat[-1, -1]/(N+M))
|
||||
def calculate_mfcc(audio, sr):
|
||||
# Define parameters for MFCC calculation
|
||||
n_mfcc = 13
|
||||
n_fft = 2048
|
||||
hop_length = 512
|
||||
fmin = 0
|
||||
fmax = sr/2
|
||||
|
||||
def divsignalaudiobis(signal):
|
||||
long_signal = 20 # 20 ms
|
||||
recouvrement = 10 # 10 ms
|
||||
long_echantillon = long_signal*sr//1000
|
||||
recouvrement_echantillon = recouvrement*sr//1000
|
||||
nb_echantillon = int(np.ceil((len(signal) - long_echantillon)/recouvrement_echantillon) + 1)
|
||||
long_a_completer = recouvrement_echantillon*(nb_echantillon -1) + long_echantillon - len(signal)
|
||||
|
||||
if (long_a_completer != 0):
|
||||
echantillon_data = np.pad(signal,(0,long_a_completer),mode='constant') # on complète le dernier échantillon par des 0
|
||||
|
||||
else :
|
||||
nb_echantillon -= 1
|
||||
|
||||
echantillon_data = np.append(echantillon_data[0], echantillon_data[1:])
|
||||
data = np.zeros((nb_echantillon, long_echantillon))
|
||||
for i in range(nb_echantillon):
|
||||
echantillon_i = echantillon_data[i*recouvrement_echantillon : i*recouvrement_echantillon + long_echantillon]
|
||||
data[i,:] = echantillon_i
|
||||
return data
|
||||
|
||||
|
||||
def myfft(signal, fe):
|
||||
n= len(signal)
|
||||
Te = 1/fe
|
||||
S = [0 + 0j]*(450)
|
||||
for l in range(50, 500):
|
||||
f = l
|
||||
for i in range (n):
|
||||
t= Te * i
|
||||
S[l-50] += signal[i]*np.exp(-2*math.pi*f*t*1j)
|
||||
S[l-50] = abs(S[l-50])/n
|
||||
return S
|
||||
|
||||
def puissance_spec(signal):
|
||||
amplitude_fft = np.absolute(signal)
|
||||
return (amplitude_fft**2)/44100 # long fft = 512
|
||||
|
||||
def BankFiltre(rate, puis_spec):
|
||||
freq_min = 20
|
||||
freq_max = rate//2
|
||||
freq_mel_min = 1000*np.log2(1 +freq_min/1000)
|
||||
freq_mel_max = 1000*np.log2(1+ freq_max/1000)
|
||||
nb_filtre = 40 # on prend en général 40 filtres
|
||||
mel_points = np.linspace(freq_mel_min, freq_mel_max, 42)
|
||||
hz_points = 1000*(2**(mel_points/1000)-1)# on convertit en hz
|
||||
|
||||
bankf = np.zeros((nb_filtre, int(np.floor(22050 +1))))
|
||||
|
||||
for m in range(1, nb_filtre +1): # pour chaque filtre, on fait :
|
||||
f_m_min = int(math.floor(hz_points[m-1])) # point de gauche
|
||||
f_m = int(math.floor(hz_points[m])) # sommet
|
||||
f_m_max = int(math.floor(hz_points[m+1])) # point de droite
|
||||
|
||||
for k in range(f_m_min, f_m):
|
||||
bankf[m - 1, k] = (k - hz_points[m - 1]) / (hz_points[m] - hz_points[m - 1])
|
||||
for k in range(f_m, f_m_max):
|
||||
bankf[m - 1, k] = ((hz_points[m + 1]) - k) / (hz_points[m + 1] - hz_points[m])
|
||||
|
||||
filter_bank = np.dot(puis_spec,np.transpose(bankf)) # Produit vectoriel/matriciel #ipdb
|
||||
filter_bank = np.where(filter_bank == 0, np.finfo(float).eps, filter_bank) # attention à 0 dans le log.
|
||||
return filter_bank
|
||||
|
||||
def mfcc(signal, rate):
|
||||
data = divsignalaudiobis(signal)
|
||||
data_fft = np.fft.rfft(data, 44100)
|
||||
data_puiss = puissance_spec(data_fft)
|
||||
data_filtre = BankFiltre(rate, data_puiss)
|
||||
pre_mfcc = np.log(data_filtre)
|
||||
mfcc = dct(pre_mfcc, type=2, axis=1, norm="ortho")[:, 0 : 13] # on ne garde que les 13 premiers
|
||||
#return mfcc
|
||||
return mfcc
|
||||
# Calculate MFCCs
|
||||
mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length, fmin=fmin, fmax=fmax)
|
||||
return mfccs.T
|
||||
def calculate_dtw_cost(mfccs_query , mfccs_train):
|
||||
distmat = dist.cdist(mfccs_query, mfccs_train,"cosine")
|
||||
costmat,mincost = dp(distmat)
|
||||
return mincost
|
||||
def recognize_speech(audio_query, audio_train_list, sr):#sr frequence d echantillonnage
|
||||
# Calculate MFCCs for query audio
|
||||
mfccs_query = mfcc(audio_query, sr)
|
||||
mfccs_query = calculate_mfcc(audio_query, sr)
|
||||
|
||||
# Calculate DTW cost for each audio in training data
|
||||
dtw_costs = []
|
||||
for audio_train in audio_train_list:
|
||||
mfccs_train = mfcc(audio_train, sr)
|
||||
mfccs_train = calculate_mfcc(audio_train, sr)
|
||||
mincost = calculate_dtw_cost(mfccs_query, mfccs_train)
|
||||
dtw_costs.append(mincost)
|
||||
|
||||
@@ -129,8 +60,6 @@ def recognize_speech(audio_query, audio_train_list, sr):#sr frequence d echantil
|
||||
# Return recognized word
|
||||
return index
|
||||
|
||||
# Example usage
|
||||
|
||||
def record_audio(filename, duration, sr):
|
||||
chunk = 1024
|
||||
sample_format = pyaudio.paInt16
|
||||
@@ -159,6 +88,7 @@ def record_audio(filename, duration, sr):
|
||||
|
||||
p.terminate()
|
||||
|
||||
|
||||
print("Enregistrement terminé")
|
||||
|
||||
wf = wave.open(filename, "wb")
|
||||
@@ -181,13 +111,27 @@ def coupe_silence(signal):
|
||||
else :
|
||||
p = p+1
|
||||
|
||||
def init_database():
|
||||
data_dir = "audio_data/"
|
||||
words = []
|
||||
files = []
|
||||
for word in os.listdir(data_dir):
|
||||
if not os.path.isfile(os.path.join(data_dir, word)):
|
||||
for file in os.listdir(os.path.join(data_dir,word)):
|
||||
if os.path.isfile(os.path.join(data_dir, word,file)):
|
||||
print(word,os.path.join(data_dir, word,file))
|
||||
words.append(word)
|
||||
files.append(os.path.join(data_dir, word,file))
|
||||
return words,files
|
||||
|
||||
def get_word_metadata(word):
|
||||
with open("audio_data/metadata.json") as f:
|
||||
data = json.loads(f.read())
|
||||
return data[word]
|
||||
|
||||
#Todo : detecte si pas de note donnée
|
||||
def get_grade():
|
||||
######## TEST DEBUG ########
|
||||
time.sleep(6)
|
||||
return 5
|
||||
|
||||
|
||||
return 4
|
||||
sr = 44100 # fréquence d'échantillonnage
|
||||
duration = 6 # durée d'enregistrement en secondes
|
||||
filename = "recording" # nom du fichier à enregistrer
|
||||
@@ -195,14 +139,8 @@ def get_grade():
|
||||
record_audio(filename, duration, sr)
|
||||
audio_query, sr = librosa.load(f'{filename}.wav', sr=sr)
|
||||
coupe_silence(audio_query)
|
||||
training_file_names = []
|
||||
for path in os.listdir(data_dir):
|
||||
if os.path.isfile(os.path.join(data_dir, path)):
|
||||
training_file_names.append(data_dir + path)
|
||||
print(training_file_names)
|
||||
audio_train_list = [librosa.load(file, sr=sr)[0] for file in training_file_names]
|
||||
words, files = init_database()
|
||||
audio_train_list = [librosa.load(file, sr=sr)[0] for file in files]
|
||||
recognized_word_index = recognize_speech(audio_query, audio_train_list, sr)
|
||||
print(f'Recognized word: {recognized_word_index}')
|
||||
return recognized_word_index
|
||||
|
||||
print(get_grade())
|
||||
recognized_word = words[recognized_word_index]
|
||||
return get_word_metadata(recognized_word)
|
||||
@@ -18,7 +18,8 @@ class HandDetector():
|
||||
|
||||
self.resultBuffer = []
|
||||
|
||||
|
||||
def reset(self):
|
||||
self.resultBuffer = []
|
||||
|
||||
def reconnaissancePouce(self,handLandmarks):
|
||||
etatDuPouce=["neutre","thumbs_down","thumbs_up"]
|
||||
@@ -28,7 +29,7 @@ class HandDetector():
|
||||
V1=[handLandmarks[(4*cpt)+6][0]-handLandmarks[(4*cpt)+5][0],handLandmarks[(4*cpt)+6][1]-handLandmarks[(4*cpt)+5][1]]
|
||||
V2=[handLandmarks[(4*cpt)+8][0]-handLandmarks[(4*cpt)+6][0],handLandmarks[(4*cpt)+8][1]-handLandmarks[(4*cpt)+6][1]]
|
||||
j=np.dot(V1,V2)
|
||||
if (j>0):
|
||||
if (j>0.005):
|
||||
return etatDuPouce[0]
|
||||
V1=[handLandmarks[4][0]-handLandmarks[1][0],handLandmarks[4][1]-handLandmarks[1][1]]
|
||||
V2=[handLandmarks[2][0]-handLandmarks[1][0],handLandmarks[2][1]-handLandmarks[1][1]]
|
||||
@@ -52,14 +53,28 @@ class HandDetector():
|
||||
image.flags.writeable = False
|
||||
results = self.hands.process(image)
|
||||
# print(results)
|
||||
handLandmarks = []
|
||||
if results.multi_hand_landmarks:
|
||||
handsPositions = []
|
||||
for hand_landmarks in results.multi_hand_landmarks:
|
||||
handLandmarks = []
|
||||
# Fill list with x and y positions of each landmark
|
||||
for landmarks in hand_landmarks.landmark:
|
||||
handLandmarks.append([landmarks.x, landmarks.y])
|
||||
|
||||
thumbState = self.reconnaissancePouce(handLandmarks)
|
||||
#On ajoute la position de chaque mains a une liste
|
||||
handsPositions.append(self.reconnaissancePouce(handLandmarks))
|
||||
|
||||
#On calcule le résultat suivant la position des deux mains
|
||||
if(len(handsPositions) == 2):
|
||||
if(handsPositions[0] == handsPositions[1]):
|
||||
thumbState = handsPositions[0]
|
||||
elif(handsPositions[0] == "neutre"):
|
||||
thumbState = handsPositions[1]
|
||||
elif(handsPositions[1] == "neutre"):
|
||||
thumbState = handsPositions[0]
|
||||
else:
|
||||
thumbState = "neutre"
|
||||
else:
|
||||
thumbState = handsPositions[0]
|
||||
|
||||
self.resultBuffer.append(thumbState)
|
||||
if(len(self.resultBuffer) > self.BUFFER_LENGTH):
|
||||
|
||||
@@ -13,11 +13,15 @@ class Manager():
|
||||
"notes_autres": {}
|
||||
}
|
||||
|
||||
self.TIMEOUT_CAMERA = 5
|
||||
|
||||
self.avis = self.defualtAvis
|
||||
self.server = WebsocketServer(None)
|
||||
self.server.start()
|
||||
self.handDetector = HandDetector()
|
||||
self.api = ApiClient()
|
||||
self.timeLastChange = time.time()
|
||||
self.isLastHandPacketEmpty = False
|
||||
print("Backend ready")
|
||||
|
||||
#Boucle principale
|
||||
@@ -38,34 +42,51 @@ class Manager():
|
||||
res = self.handDetector.detect()
|
||||
if(res != False):
|
||||
self.state = 1
|
||||
self.timeLastChange = time.time()
|
||||
self.server.sendMessage({"type": "state", "state": 1})
|
||||
|
||||
#Envoie la position de la main a l'écran et passe a l'étape suivante si une main est detectée pendant assez longtemps
|
||||
def camera(self):
|
||||
if(time.time() - self.timeLastChange > self.TIMEOUT_CAMERA):
|
||||
self.server.sendMessage({"type":"reset"})
|
||||
self.reset()
|
||||
return
|
||||
|
||||
res = self.handDetector.detect()
|
||||
if(res != False):
|
||||
state, coords, size, finalDecision = res
|
||||
self.server.sendMessage({"type": "effects", "effects": [{"type": state, "x":coords[0], "y": coords[1], "width": size, "height": size}]})
|
||||
self.isLastHandPacketEmpty = False
|
||||
if(finalDecision != False):
|
||||
self.avis["note"] = 10 if finalDecision == "thumbs_up" else 0
|
||||
self.state = 2
|
||||
self.timeLastChange = time.time()
|
||||
self.server.sendMessage({"type": "state", "state": 2})
|
||||
elif self.isLastHandPacketEmpty == False:
|
||||
self.server.sendMessage({"type":"effects","effects":[]})
|
||||
self.isLastHandPacketEmpty = True
|
||||
|
||||
def audio(self):
|
||||
grade = get_grade()
|
||||
if(grade != False):
|
||||
self.server.sendMessage({"type":"new_grade","grade":grade})
|
||||
self.avis["notes_autres"]["test"] = grade
|
||||
result = get_grade()
|
||||
if(result != False):
|
||||
self.server.sendMessage({"type":"new_grade","word":result["display"]})
|
||||
self.avis["notes_autres"]["test"] = result["grade"]
|
||||
time.sleep(3)
|
||||
self.state = 3
|
||||
self.timeLastChange = time.time()
|
||||
self.server.sendMessage({"type": "state", "state": 3})
|
||||
|
||||
def thankYou(self):
|
||||
time.sleep(10)
|
||||
print("Reseting...")
|
||||
self.state = 0
|
||||
self.timeLastChange = time.time()
|
||||
self.server.sendMessage({"type": "state", "state": 0})
|
||||
res = self.api.send(self.avis["note"],self.avis["notes_autres"]["test"])
|
||||
print(res.text)
|
||||
self.avis = self.defualtAvis
|
||||
self.reset()
|
||||
|
||||
def reset(self):
|
||||
self.state = 0
|
||||
self.avis = self.defualtAvis
|
||||
self.handDetector.reset()
|
||||
|
||||
|
||||
Binary file not shown.
@@ -91,11 +91,15 @@ services:
|
||||
restart: always
|
||||
devices:
|
||||
- /dev/video3:/dev/video0
|
||||
# volumes:
|
||||
# - /run/user/1000/pulse/native:/run/user/1000/pulse/native
|
||||
environment:
|
||||
- PORT=5000
|
||||
- HOST=backend_reconnaissance
|
||||
- API_HOST=reviews_api
|
||||
- API_PORT=8080
|
||||
# - PULSE_SERVER=unix:/run/user/1000/pulse/native
|
||||
user: 1000:1000
|
||||
ports:
|
||||
#Ce container est le serveur websocker dont le client est l'interface de la borne qui tourne dans le navigateur
|
||||
- 5000:5000
|
||||
|
||||
@@ -15,4 +15,8 @@ class AudioPage {
|
||||
document.getElementById("grade").innerHTML = grade.toString();
|
||||
}
|
||||
}
|
||||
|
||||
reset() {
|
||||
document.getElementById("grade").innerHTML = "";
|
||||
}
|
||||
}
|
||||
@@ -146,4 +146,8 @@ class CameraPage {
|
||||
setEffects(effects) {
|
||||
this.activeEffects = effects;
|
||||
}
|
||||
|
||||
reset() {
|
||||
this.activeEffects = [];
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,5 @@
|
||||
class WebsocketClient {
|
||||
constructor(onNewEffects, onNewState, onNewGrade) {
|
||||
constructor(onNewEffects, onNewState, onNewGrade, onReset) {
|
||||
this.socket = new WebSocket("ws://localhost:5000");
|
||||
this.socket.addEventListener("open", (event) => {
|
||||
this.socket.send("connected");
|
||||
@@ -14,6 +14,8 @@ class WebsocketClient {
|
||||
onNewState(msg.state);
|
||||
}else if(msg.type == "new_grade") {
|
||||
onNewGrade(Number(msg.grade));
|
||||
}else if(msg.type == "reset") {
|
||||
onReset();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@@ -19,7 +19,8 @@ class StateManager {
|
||||
this._cameraPage.setEffects(effects)
|
||||
},
|
||||
(state) => this.setState(state),
|
||||
(grade) => this._audioPage.setGrade(grade)
|
||||
(grade) => this._audioPage.setGrade(grade),
|
||||
() => this.reset(),
|
||||
);
|
||||
|
||||
this._sleepingPage.enabled = true;
|
||||
@@ -48,4 +49,15 @@ class StateManager {
|
||||
this._state = newState;
|
||||
}
|
||||
}
|
||||
|
||||
reset() {
|
||||
this._state = 0;
|
||||
this._cameraPage.enabled = false;
|
||||
this._audioPage.enabled = false;
|
||||
this._thankYouPage.enabled = false;
|
||||
this._audioPage.reset();
|
||||
this._cameraPage.reset();
|
||||
this._sleepingPage.enabled = true;
|
||||
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user