import librosa import librosa.display import numpy as np import matplotlib.pyplot as plt import soundfile as sf from pydub import AudioSegment from pydub.silence import split_on_silence, detect_nonsilent import math import wave import contextlib import random from mpl_toolkits.axes_grid1.axes_divider import VBoxDivider import mpl_toolkits.axes_grid1.axes_size as Size import webrtcvad def calc_dtw_sim(y1, y2, sr1, sr2, plot_result=False): hop_length = 64 assert sr1 == sr2 l = min(len(y1), len(y2)) to_consider = min(l, max(round(0.2*l), 2048)) min_len = millisecond_to_samples(100, sr1) bound = round(0.5 * l) if bound < min_len: bound = min_len #bound = max(round(0.2 * l), millisecond_to_samples(200, sr1)) y1 = y1[0:bound] y2 = y2[0:bound] if bound < 2048: n_fft = bound n_mels = 64 else: n_fft = 2048 n_mels = 128 mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:] mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:] D, wp = librosa.sequence.dtw(mfcc1, mfcc2) if plot_result: fig, ax = plt.subplots(nrows=4) img = librosa.display.specshow(D, x_axis='frames', y_axis='frames', ax=ax[0]) ax[0].set(title='DTW cost', xlabel='Noisy sequence', ylabel='Target') ax[0].plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y') ax[0].legend() fig.colorbar(img, ax=ax[0]) ax[1].plot(D[-1, :] / wp.shape[0]) ax[1].set(xlim=[0, mfcc1.shape[1]], title='Matching cost function') ax[2].imshow(mfcc1) ax[3].imshow(mfcc2) plt.show() total_alignment_cost = D[-1, -1] / wp.shape[0] return total_alignment_cost def calc_xcorr_sim(y1, y2, sr1, sr2): hop_length = 256 y1 = y1[0:round(len(y1)*0.2)] y2 = y2[0:round(len(y2)*0.2)] mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=13)[1:,:] mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=13)[1:,:] xsim = librosa.segment.cross_similarity(mfcc1, mfcc2, mode='distance') return xsim def match_target_amplitude(aChunk, target_dBFS): ''' Normalize given audio chunk ''' change_in_dBFS = target_dBFS - aChunk.dBFS return aChunk.apply_gain(change_in_dBFS) def spl_on_silence(): # Import the AudioSegment class for processing audio and the # Load your audio. song = AudioSegment.from_wav("recording.wav") # Split track where the silence is 2 seconds or more and get chunks using # the imported function. chunks = split_on_silence ( # Use the loaded audio. song, # Specify that a silent chunk must be at least 2 seconds or 2000 ms long. min_silence_len = 1000, # Consider a chunk silent if it's quieter than -16 dBFS. # (You may want to adjust this parameter.) silence_thresh = -50, timestamps=True ) ## Process each chunk with your parameters #for i, chunk in enumerate(chunks): # # Create a silence chunk that's 0.5 seconds (or 500 ms) long for padding. # silence_chunk = AudioSegment.silent(duration=500) # # Add the padding chunk to beginning and end of the entire chunk. # audio_chunk = silence_chunk + chunk + silence_chunk # # Normalize the entire chunk. # normalized_chunk = match_target_amplitude(audio_chunk, -20.0) # # Export the audio chunk with new bitrate. # print("Exporting chunk{0}.mp3.".format(i)) # normalized_chunk.export( # ".//chunk{0}.wav".format(i), # bitrate = "192k", # format = "wav" # ) return ([ audiosegment_to_librosawav(c) for c in chunks ], song.frame_rate) def non_silent_chunks(song): #song = AudioSegment.from_wav("recording.wav") return detect_nonsilent(song, min_silence_len=400, silence_thresh=-50) def audiosegment_to_librosawav(audiosegment): channel_sounds = audiosegment.split_to_mono() samples = [s.get_array_of_samples() for s in channel_sounds] fp_arr = np.array(samples).T.astype(np.float32) fp_arr /= np.iinfo(samples[0].typecode).max fp_arr = fp_arr.reshape(-1) return fp_arr # sr = samples / second def millisecond_to_samples(ms, sr): return round((ms / 1000) * sr) def ms_to_time(ms): secs = ms / 1000 return "{0}:{1}".format(math.floor(secs / 60), secs % 60) def seg_is_speech(seg): f = lambda x: int(32768 * x) x = np.vectorize(f)(seg) pcm_data = x.tobytes() speeches = 0 total = 0 offset = 0 n = int(sr * (frame_duration_ms / 1000.0) * 2) duration = (float(n) / sr) / 2.0 while offset + n < len(pcm_data): frame = pcm_data[offset:(offset+n)] if vad.is_speech(frame, sr): speeches += 1 offset = offset + n total += 1 return speeches / total def make_widths_equal(fig, rect, ax1, ax2, ax3, pad): # pad in inches divider = VBoxDivider( fig, rect, horizontal=[Size.AxesX(ax1), Size.Scaled(1), Size.AxesX(ax2), Size.Scaled(1), Size.AxesX(ax3)], vertical=[Size.AxesY(ax1), Size.Fixed(pad), Size.AxesY(ax2), Size.Fixed(pad), Size.AxesY(ax3)]) ax1.set_axes_locator(divider.new_locator(0)) ax2.set_axes_locator(divider.new_locator(2)) ax3.set_axes_locator(divider.new_locator(4)) if __name__ == '__main__': vad = webrtcvad.Vad() frame_duration_ms = 10 fp = "hard_pieces.wav" y, sr = librosa.load(fp, mono=True, sr=32000) #pcm_data = y.tobytes() #n = int(sample_rate * (frame_duration_ms / 1000.0) * 2) #duration = (float(n) / sample_rate) / 2.0 #frame = pcm_data[0:n] #y, sr = librosa.load("recording.wav") song = AudioSegment.from_wav(fp) #print("pydub load done") #with contextlib.closing(wave.open(fp, "rb")) as wf: # num_channels = wf.getnchannels() # assert num_channels == 1 # sample_width = wf.getsampwidth() # assert sample_width == 2 # sample_rate = wf.getframerate() # assert sample_rate in (8000, 16000, 32000, 48000) # pcm_data = wf.readframes(wf.getnframes()) # n = int(sample_rate * (frame_duration_ms / 1000.0) * 2) # duration = (float(n) / sample_rate) / 2.0 # frame = pcm_data[0:n] # #print(len(pcm_data)) # print(vad.is_speech(frame, sample_rate)) #y2 = audiosegment_to_librosawav(song) #print(y) #print(y2) #segs = librosa.effects.split(y, top_db = 5, hop_length=512, frame_length=4096) #segs, sr = spl_on_silence() #print("librosa load done") segs = [] #i = 0 for ts in non_silent_chunks(song): start, end = ts[0], ts[1] seg = y[ millisecond_to_samples(start, sr) : millisecond_to_samples(end, sr) ] segs.append(((start, end), seg)) #sf.write("part{0}.wav".format(i), seg, sr, 'PCM_16') #i += 1 #segs = segs[1:] n_segs = len(segs) #random.shuffle(segs) diffs = np.zeros((n_segs, n_segs)) diffs_penalised = np.zeros((n_segs, n_segs)) vad_coeffs = np.zeros((n_segs,)) lengths = np.zeros((n_segs,)) for i in range(n_segs): (s1, e1), y1 = segs[i] for j in range(i): (s2, e2), y2 = segs[j] diffs[i,j] = calc_dtw_sim(y1, y2, sr, sr, plot_result=False) diffs[j,i] = diffs[i,j] distance_penalty = abs(i-j)**(100/min((e1-s1), (e2-s2))) diffs_penalised[i,j] = diffs[i,j] * distance_penalty diffs_penalised[j,i] = diffs_penalised[i,j] vad_coeffs[i] = seg_is_speech(y1) lengths[i] = e1 - s1 delete_segs = np.zeros((n_segs,), dtype=bool) for i in range(n_segs): if delete_segs[i]: continue max_j = i for j in range(i, n_segs): if diffs_penalised[i,j] < 80: max_j = j delete_segs[i:max_j] = True for i in range(n_segs): (s1, e1), y1 = segs[i] print("{0}\t{1}\tn: {2} delete: {3}, vad: {4}".format(s1/1000, e1/1000, i, delete_segs[i], vad_coeffs[i])) #if diff < 100: #print("{0}\t{1}\tdiff: {2}, vad: {3}".format(s1/1000, e1/1000, diff, vad_coeff)) #print(ms_to_time(s1), ms_to_time(e1), ms_to_time(s2), ms_to_time(e2), diff) #if vad_coeff < 0.9: #print("{0}\t{1}\tvad {2}".format(s1/1000, e1/1000, vad_coeff)) fig, ax = plt.subplots(nrows=3, sharex=True) ax[0].imshow(diffs) ax[1].imshow(diffs_penalised) #ax[1].imshow(np.reshape(vad_coeffs, (1, n_segs))) ax[2].imshow(np.reshape(lengths, (1, n_segs))) make_widths_equal(fig, 111, ax[0], ax[1], ax[2], pad=0.5) plt.show() #for n, seg in enumerate(segs): # sf.write('part' + str(n) + '.wav', seg, sr) #print(segs) #y1, sr1 = librosa.load("out000.wav") #y2, sr2 = librosa.load("out004.wav") #print("total alignment cost:", calc_dtw_sim(y1, y2, sr1, sr2, plot_result=True)) #print("xcorr:", np.trace(calc_xcorr_sim(y1, y2, sr1, sr2)))