mirror of
https://git.roussel.pro/telecom-paris/pact.git
synced 2026-02-09 10:30:17 +01:00
ajouter des trucs
This commit is contained in:
200
tmp2.py
Normal file
200
tmp2.py
Normal file
@@ -0,0 +1,200 @@
|
||||
import librosa
|
||||
import os
|
||||
import numpy as np
|
||||
import math
|
||||
from scipy.io import wavfile
|
||||
import wave
|
||||
from scipy.fftpack import fft,dct
|
||||
|
||||
from matplotlib.patches import ConnectionPatch
|
||||
import matplotlib.pyplot as plt
|
||||
import numpy as np
|
||||
import scipy.spatial.distance as dist
|
||||
import pyaudio
|
||||
import wave
|
||||
def dp(distmat):
|
||||
N,M = distmat.shape
|
||||
# Initialisons the cost matrix
|
||||
costmat =np.zeros((N+1,M+1))
|
||||
for i in range (1,N+1):
|
||||
costmat[i,0]=np.inf
|
||||
for i in range (1,M+1):
|
||||
costmat[0,i]=np.inf
|
||||
|
||||
for i in range (N):
|
||||
for j in range (M):
|
||||
#on calcule le cout minimal pour chaque chemin.pour atteindre the costmat[i][j] il y a trois chemins possibles on choisit celui de cout minimal
|
||||
penalty = [
|
||||
costmat[i,j], # cas T==0
|
||||
costmat[i,j+1] , # cas T==1
|
||||
costmat[i+1,j]] # cas T==2
|
||||
ipenalty = np.argmin(penalty)
|
||||
costmat[i+1,j+1] = distmat[i,j] + penalty[ipenalty]
|
||||
|
||||
#enlever les valeurs de l infini
|
||||
costmat = costmat[1: , 1:]
|
||||
return (costmat, costmat[-1, -1]/(N+M))
|
||||
|
||||
def divsignalaudiobis(signal):
|
||||
long_signal = 20 # 20 ms
|
||||
recouvrement = 10 # 10 ms
|
||||
long_echantillon = long_signal*sr//1000
|
||||
recouvrement_echantillon = recouvrement*sr//1000
|
||||
nb_echantillon = int(np.ceil((len(signal) - long_echantillon)/recouvrement_echantillon) + 1)
|
||||
long_a_completer = recouvrement_echantillon*(nb_echantillon -1) + long_echantillon - len(signal)
|
||||
|
||||
if (long_a_completer != 0):
|
||||
echantillon_data = np.pad(signal,(0,long_a_completer),mode='constant') # on complète le dernier échantillon par des 0
|
||||
|
||||
else :
|
||||
nb_echantillon -= 1
|
||||
|
||||
echantillon_data = np.append(echantillon_data[0], echantillon_data[1:])
|
||||
data = np.zeros((nb_echantillon, long_echantillon))
|
||||
for i in range(nb_echantillon):
|
||||
echantillon_i = echantillon_data[i*recouvrement_echantillon : i*recouvrement_echantillon + long_echantillon]
|
||||
data[i,:] = echantillon_i
|
||||
return data
|
||||
|
||||
|
||||
def myfft(signal, fe):
|
||||
n= len(signal)
|
||||
Te = 1/fe
|
||||
S = [0 + 0j]*(450)
|
||||
for l in range(50, 500):
|
||||
f = l
|
||||
for i in range (n):
|
||||
t= Te * i
|
||||
S[l-50] += signal[i]*np.exp(-2*math.pi*f*t*1j)
|
||||
S[l-50] = abs(S[l-50])/n
|
||||
return S
|
||||
|
||||
def puissance_spec(signal):
|
||||
amplitude_fft = np.absolute(signal)
|
||||
return (amplitude_fft**2)/44100 # long fft = 512
|
||||
|
||||
def BankFiltre(rate, puis_spec):
|
||||
freq_min = 20
|
||||
freq_max = rate//2
|
||||
freq_mel_min = 1000*np.log2(1 +freq_min/1000)
|
||||
freq_mel_max = 1000*np.log2(1+ freq_max/1000)
|
||||
nb_filtre = 40 # on prend en général 40 filtres
|
||||
mel_points = np.linspace(freq_mel_min, freq_mel_max, 42)
|
||||
hz_points = 1000*(2**(mel_points/1000)-1)# on convertit en hz
|
||||
|
||||
bankf = np.zeros((nb_filtre, int(np.floor(22050 +1))))
|
||||
|
||||
for m in range(1, nb_filtre +1): # pour chaque filtre, on fait :
|
||||
f_m_min = int(math.floor(hz_points[m-1])) # point de gauche
|
||||
f_m = int(math.floor(hz_points[m])) # sommet
|
||||
f_m_max = int(math.floor(hz_points[m+1])) # point de droite
|
||||
|
||||
for k in range(f_m_min, f_m):
|
||||
bankf[m - 1, k] = (k - hz_points[m - 1]) / (hz_points[m] - hz_points[m - 1])
|
||||
for k in range(f_m, f_m_max):
|
||||
bankf[m - 1, k] = ((hz_points[m + 1]) - k) / (hz_points[m + 1] - hz_points[m])
|
||||
|
||||
filter_bank = np.dot(puis_spec,np.transpose(bankf)) # Produit vectoriel/matriciel #ipdb
|
||||
filter_bank = np.where(filter_bank == 0, np.finfo(float).eps, filter_bank) # attention à 0 dans le log.
|
||||
return filter_bank
|
||||
|
||||
def mfcc(signal, rate):
|
||||
data = divsignalaudiobis(signal)
|
||||
data_fft = np.fft.rfft(data, 44100)
|
||||
data_puiss = puissance_spec(data_fft)
|
||||
data_filtre = BankFiltre(rate, data_puiss)
|
||||
pre_mfcc = np.log(data_filtre)
|
||||
mfcc = dct(pre_mfcc, type=2, axis=1, norm="ortho")[:, 0 : 13] # on ne garde que les 13 premiers
|
||||
#return mfcc
|
||||
return mfcc
|
||||
def calculate_dtw_cost(mfccs_query , mfccs_train):
|
||||
distmat = dist.cdist(mfccs_query, mfccs_train,"cosine")
|
||||
costmat,mincost = dp(distmat)
|
||||
return mincost
|
||||
def recognize_speech(audio_query, audio_train_list, sr):#sr frequence d echantillonnage
|
||||
# Calculate MFCCs for query audio
|
||||
mfccs_query = mfcc(audio_query, sr)
|
||||
|
||||
# Calculate DTW cost for each audio in training data
|
||||
dtw_costs = []
|
||||
for audio_train in audio_train_list:
|
||||
mfccs_train = mfcc(audio_train, sr)
|
||||
mincost = calculate_dtw_cost(mfccs_query, mfccs_train)
|
||||
dtw_costs.append(mincost)
|
||||
|
||||
# Find index of word with lowest DTW cost
|
||||
index = np.argmin(dtw_costs)
|
||||
|
||||
# Return recognized word
|
||||
return index
|
||||
|
||||
# Example usage
|
||||
|
||||
def record_audio(filename, duration, sr):
|
||||
chunk = 1024
|
||||
sample_format = pyaudio.paInt16
|
||||
channels = 1
|
||||
record_seconds = duration
|
||||
filename = f"{filename}.wav"
|
||||
|
||||
p = pyaudio.PyAudio()
|
||||
|
||||
stream = p.open(format=sample_format,
|
||||
channels=channels,
|
||||
rate=sr,
|
||||
frames_per_buffer=chunk,
|
||||
input=True)
|
||||
|
||||
frames = []
|
||||
|
||||
print(f"Enregistrement en cours...")
|
||||
|
||||
for i in range(0, int(sr / chunk * record_seconds)):
|
||||
data = stream.read(chunk)
|
||||
frames.append(data)
|
||||
|
||||
stream.stop_stream()
|
||||
stream.close()
|
||||
|
||||
p.terminate()
|
||||
|
||||
print("Enregistrement terminé")
|
||||
|
||||
wf = wave.open(filename, "wb")
|
||||
wf.setnchannels(channels)
|
||||
wf.setsampwidth(p.get_sample_size(sample_format))
|
||||
wf.setframerate(sr)
|
||||
wf.writeframes(b"".join(frames))
|
||||
wf.close()
|
||||
|
||||
print(f"Fichier enregistré sous {filename}")
|
||||
|
||||
def coupe_silence(signal):
|
||||
|
||||
t = 0
|
||||
|
||||
if signal[t] == 0 :
|
||||
|
||||
p = 0
|
||||
|
||||
while signal[t+p] == 0 :
|
||||
|
||||
if p == 88 :
|
||||
|
||||
signal = signal[:t] + signal[t+p:]
|
||||
|
||||
coupe_silence(signal)
|
||||
|
||||
else :
|
||||
|
||||
p = p+1
|
||||
sr = 44100 # fréquence d'échantillonnage
|
||||
duration = 6 # durée d'enregistrement en secondes
|
||||
filename = "audio_query" # nom du fichier à enregistrer
|
||||
|
||||
record_audio(filename, duration, sr)
|
||||
audio_query, sr = librosa.load('C:\\Users\\HP\\audio_query.wav', sr=sr)
|
||||
coupe_silence(audio_query)
|
||||
audio_train_list = [librosa.load('C:\\Users\\HP\\Documents\\cool.wav', sr=sr)[0], librosa.load('C:\\Users\\HP\\Documents\\formidable.wav', sr=sr)[0], librosa.load('C:\\Users\\HP\\Documents\\cest mauvais.wav', sr=sr)[0] , librosa.load('C:\\Users\\HP\\Documents\\un.wav', sr=sr)[0], librosa.load('C:\\Users\\HP\\Documents\\parfait.wav', sr=sr)[0]]
|
||||
recognized_word_index = recognize_speech(audio_query, audio_train_list, sr)
|
||||
print(f'Recognized word: {recognized_word_index}')
|
||||
Reference in New Issue
Block a user