import librosa import os import numpy as np import math from scipy.io import wavfile import wave from scipy.fftpack import fft,dct import time from matplotlib.patches import ConnectionPatch import matplotlib.pyplot as plt import numpy as np import scipy.spatial.distance as dist import pyaudio import wave def dp(distmat): N,M = distmat.shape # Initialisons the cost matrix costmat =np.zeros((N+1,M+1)) for i in range (1,N+1): costmat[i,0]=np.inf for i in range (1,M+1): costmat[0,i]=np.inf for i in range (N): for j in range (M): #on calcule le cout minimal pour chaque chemin.pour atteindre the costmat[i][j] il y a trois chemins possibles on choisit celui de cout minimal penalty = [ costmat[i,j], # cas T==0 costmat[i,j+1] , # cas T==1 costmat[i+1,j]] # cas T==2 ipenalty = np.argmin(penalty) costmat[i+1,j+1] = distmat[i,j] + penalty[ipenalty] #enlever les valeurs de l infini costmat = costmat[1: , 1:] return (costmat, costmat[-1, -1]/(N+M)) def divsignalaudiobis(signal): long_signal = 20 # 20 ms recouvrement = 10 # 10 ms long_echantillon = long_signal*sr//1000 recouvrement_echantillon = recouvrement*sr//1000 nb_echantillon = int(np.ceil((len(signal) - long_echantillon)/recouvrement_echantillon) + 1) long_a_completer = recouvrement_echantillon*(nb_echantillon -1) + long_echantillon - len(signal) if (long_a_completer != 0): echantillon_data = np.pad(signal,(0,long_a_completer),mode='constant') # on complète le dernier échantillon par des 0 else : nb_echantillon -= 1 echantillon_data = np.append(echantillon_data[0], echantillon_data[1:]) data = np.zeros((nb_echantillon, long_echantillon)) for i in range(nb_echantillon): echantillon_i = echantillon_data[i*recouvrement_echantillon : i*recouvrement_echantillon + long_echantillon] data[i,:] = echantillon_i return data def myfft(signal, fe): n= len(signal) Te = 1/fe S = [0 + 0j]*(450) for l in range(50, 500): f = l for i in range (n): t= Te * i S[l-50] += signal[i]*np.exp(-2*math.pi*f*t*1j) S[l-50] = abs(S[l-50])/n return S def puissance_spec(signal): amplitude_fft = np.absolute(signal) return (amplitude_fft**2)/44100 # long fft = 512 def BankFiltre(rate, puis_spec): freq_min = 20 freq_max = rate//2 freq_mel_min = 1000*np.log2(1 +freq_min/1000) freq_mel_max = 1000*np.log2(1+ freq_max/1000) nb_filtre = 40 # on prend en général 40 filtres mel_points = np.linspace(freq_mel_min, freq_mel_max, 42) hz_points = 1000*(2**(mel_points/1000)-1)# on convertit en hz bankf = np.zeros((nb_filtre, int(np.floor(22050 +1)))) for m in range(1, nb_filtre +1): # pour chaque filtre, on fait : f_m_min = int(math.floor(hz_points[m-1])) # point de gauche f_m = int(math.floor(hz_points[m])) # sommet f_m_max = int(math.floor(hz_points[m+1])) # point de droite for k in range(f_m_min, f_m): bankf[m - 1, k] = (k - hz_points[m - 1]) / (hz_points[m] - hz_points[m - 1]) for k in range(f_m, f_m_max): bankf[m - 1, k] = ((hz_points[m + 1]) - k) / (hz_points[m + 1] - hz_points[m]) filter_bank = np.dot(puis_spec,np.transpose(bankf)) # Produit vectoriel/matriciel #ipdb filter_bank = np.where(filter_bank == 0, np.finfo(float).eps, filter_bank) # attention à 0 dans le log. return filter_bank def mfcc(signal, rate): data = divsignalaudiobis(signal) data_fft = np.fft.rfft(data, 44100) data_puiss = puissance_spec(data_fft) data_filtre = BankFiltre(rate, data_puiss) pre_mfcc = np.log(data_filtre) mfcc = dct(pre_mfcc, type=2, axis=1, norm="ortho")[:, 0 : 13] # on ne garde que les 13 premiers #return mfcc return mfcc def calculate_dtw_cost(mfccs_query , mfccs_train): distmat = dist.cdist(mfccs_query, mfccs_train,"cosine") costmat,mincost = dp(distmat) return mincost def recognize_speech(audio_query, audio_train_list, sr):#sr frequence d echantillonnage # Calculate MFCCs for query audio mfccs_query = mfcc(audio_query, sr) # Calculate DTW cost for each audio in training data dtw_costs = [] for audio_train in audio_train_list: mfccs_train = mfcc(audio_train, sr) mincost = calculate_dtw_cost(mfccs_query, mfccs_train) dtw_costs.append(mincost) # Find index of word with lowest DTW cost index = np.argmin(dtw_costs) # Return recognized word return index # Example usage def record_audio(filename, duration, sr): chunk = 1024 sample_format = pyaudio.paInt16 channels = 1 record_seconds = duration filename = f"{filename}.wav" p = pyaudio.PyAudio() stream = p.open(format=sample_format, channels=channels, rate=sr, frames_per_buffer=chunk, input=True) frames = [] print(f"Enregistrement en cours...") for i in range(0, int(sr / chunk * record_seconds)): data = stream.read(chunk) frames.append(data) stream.stop_stream() stream.close() p.terminate() print("Enregistrement terminé") wf = wave.open(filename, "wb") wf.setnchannels(channels) wf.setsampwidth(p.get_sample_size(sample_format)) wf.setframerate(sr) wf.writeframes(b"".join(frames)) wf.close() print(f"Fichier enregistré sous {filename}") def coupe_silence(signal): t = 0 if signal[t] == 0 : p = 0 while signal[t+p] == 0 : if p == 88 : signal = signal[:t] + signal[t+p:] coupe_silence(signal) else : p = p+1 #Todo : detecte si pas de note donnée def get_grade(): ######## TEST DEBUG ######## time.sleep(6) return 5 sr = 44100 # fréquence d'échantillonnage duration = 6 # durée d'enregistrement en secondes filename = "recording" # nom du fichier à enregistrer data_dir = "audio_data/" record_audio(filename, duration, sr) audio_query, sr = librosa.load(f'{filename}.wav', sr=sr) coupe_silence(audio_query) training_file_names = [] for path in os.listdir(data_dir): if os.path.isfile(os.path.join(data_dir, path)): training_file_names.append(data_dir + path) print(training_file_names) audio_train_list = [librosa.load(file, sr=sr)[0] for file in training_file_names] recognized_word_index = recognize_speech(audio_query, audio_train_list, sr) print(f'Recognized word: {recognized_word_index}') return recognized_word_index print(get_grade())