mirror of
https://git.roussel.pro/telecom-paris/pact.git
synced 2026-02-09 10:30:17 +01:00
160 lines
4.9 KiB
Python
160 lines
4.9 KiB
Python
import librosa
|
|
import os
|
|
import numpy as np
|
|
import scipy.spatial.distance as dist
|
|
import pyaudio
|
|
import wave
|
|
import json
|
|
|
|
def dp(distmat):
|
|
N,M = distmat.shape
|
|
# Initialisons the cost matrix
|
|
costmat =np.zeros((N+1,M+1))
|
|
for i in range (1,N+1):
|
|
costmat[i,0]=np.inf
|
|
for i in range (1,M+1):
|
|
costmat[0,i]=np.inf
|
|
|
|
for i in range (N):
|
|
for j in range (M):
|
|
#on calcule le cout minimal pour chaque chemin.pour atteindre the costmat[i][j] il y a trois chemins possibles on choisit celui de cout minimal
|
|
penalty = [
|
|
costmat[i,j], # cas T==0
|
|
costmat[i,j+1] , # cas T==1
|
|
costmat[i+1,j]] # cas T==2
|
|
ipenalty = np.argmin(penalty)
|
|
costmat[i+1,j+1] = distmat[i,j] + penalty[ipenalty]
|
|
|
|
#enlever les valeurs de l infini
|
|
costmat = costmat[1: , 1:]
|
|
return (costmat, costmat[-1, -1]/(N+M))
|
|
def calculate_mfcc(audio, sr):
|
|
# Define parameters for MFCC calculation
|
|
n_mfcc = 13
|
|
n_fft = 2048
|
|
hop_length = 512
|
|
fmin = 0
|
|
fmax = sr/2
|
|
|
|
# Calculate MFCCs
|
|
mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length, fmin=fmin, fmax=fmax)
|
|
return mfccs.T
|
|
def calculate_dtw_cost(mfccs_query , mfccs_train):
|
|
distmat = dist.cdist(mfccs_query, mfccs_train,"cosine")
|
|
costmat,mincost = dp(distmat)
|
|
return mincost
|
|
def recognize_speech(audio_query, audio_train_list, sr):#sr frequence d echantillonnage
|
|
# Calculate MFCCs for query audio
|
|
mfccs_query = calculate_mfcc(audio_query, sr)
|
|
|
|
# Calculate DTW cost for each audio in training data
|
|
dtw_costs = []
|
|
for audio_train in audio_train_list:
|
|
mfccs_train = calculate_mfcc(audio_train, sr)
|
|
mincost = calculate_dtw_cost(mfccs_query, mfccs_train)
|
|
dtw_costs.append(mincost)
|
|
|
|
# Find index of word with lowest DTW cost
|
|
index = np.argmin(dtw_costs)
|
|
|
|
# Return recognized word
|
|
return index
|
|
|
|
def record_audio(filename, duration, sr):
|
|
chunk = 1024
|
|
sample_format = pyaudio.paInt16
|
|
channels = 1
|
|
record_seconds = duration
|
|
filename = f"{filename}.wav"
|
|
|
|
p = pyaudio.PyAudio()
|
|
|
|
stream = p.open(format=sample_format,
|
|
channels=channels,
|
|
rate=sr,
|
|
frames_per_buffer=chunk,
|
|
input=True)
|
|
|
|
frames = []
|
|
|
|
print(f"Enregistrement en cours...")
|
|
|
|
for i in range(0, int(sr / chunk * record_seconds)):
|
|
data = stream.read(chunk)
|
|
frames.append(data)
|
|
|
|
stream.stop_stream()
|
|
stream.close()
|
|
|
|
p.terminate()
|
|
|
|
|
|
print("Enregistrement terminé")
|
|
|
|
wf = wave.open(filename, "wb")
|
|
wf.setnchannels(channels)
|
|
wf.setsampwidth(p.get_sample_size(sample_format))
|
|
wf.setframerate(sr)
|
|
wf.writeframes(b"".join(frames))
|
|
wf.close()
|
|
|
|
print(f"Fichier enregistré sous {filename}")
|
|
|
|
def coupe_silence(signal):
|
|
t = 0
|
|
if signal[t] == 0 :
|
|
p = 0
|
|
while signal[t+p] == 0 :
|
|
if p == 88 :
|
|
signal = signal[:t] + signal[t+p:]
|
|
coupe_silence(signal)
|
|
else :
|
|
p = p+1
|
|
|
|
def init_database():
|
|
data_dir = "audio_data/"
|
|
words = []
|
|
files = []
|
|
for word in os.listdir(data_dir):
|
|
if not os.path.isfile(os.path.join(data_dir, word)):
|
|
for file in os.listdir(os.path.join(data_dir,word)):
|
|
if os.path.isfile(os.path.join(data_dir, word,file)):
|
|
words.append(word)
|
|
files.append(os.path.join(data_dir, word,file))
|
|
return words,files
|
|
|
|
def get_word_metadata(word):
|
|
with open("audio_data/metadata.json") as f:
|
|
data = json.loads(f.read())
|
|
return data[word]
|
|
|
|
#Todo : detecte si pas de note donnée
|
|
def record():
|
|
sr = 44100 # fréquence d'échantillonnage
|
|
duration = 6 # durée d'enregistrement en secondes
|
|
filename = "recording" # nom du fichier à enregistrer
|
|
record_audio(filename, duration, sr)
|
|
audio_query,sr = librosa.load(f'{filename}.wav', sr=sr)
|
|
return audio_query,sr
|
|
|
|
def analyze(audio_query,sr):
|
|
coupe_silence(audio_query)
|
|
words, files = init_database()
|
|
audio_train_list = [librosa.load(file, sr=sr)[0] for file in files]
|
|
recognized_word_index = recognize_speech(audio_query, audio_train_list, sr)
|
|
recognized_word = words[recognized_word_index]
|
|
return get_word_metadata(recognized_word)
|
|
|
|
def test():
|
|
sr = 44100 # fréquence d'échantillonnage
|
|
duration = 6 # durée d'enregistrement en secondes
|
|
filename = "recording" # nom du fichier à enregistrer
|
|
data_dir = "audio_data/"
|
|
record_audio(filename, duration, sr)
|
|
audio_query, sr = librosa.load(f'{filename}.wav', sr=sr)
|
|
coupe_silence(audio_query)
|
|
words, files = init_database()
|
|
audio_train_list = [librosa.load(file, sr=sr)[0] for file in files]
|
|
recognized_word_index = recognize_speech(audio_query, audio_train_list, sr)
|
|
recognized_word = words[recognized_word_index]
|
|
return get_word_metadata(recognized_word) |