| @@ -0,0 +1,350 @@ | |||||
| import librosa | |||||
| import librosa.display | |||||
| import numpy as np | |||||
| import matplotlib.pyplot as plt | |||||
| import soundfile as sf | |||||
| from pydub import AudioSegment | |||||
| from pydub.silence import split_on_silence, detect_nonsilent | |||||
| import math | |||||
| import wave | |||||
| import contextlib | |||||
| import random | |||||
| from mpl_toolkits.axes_grid1.axes_divider import VBoxDivider | |||||
| import mpl_toolkits.axes_grid1.axes_size as Size | |||||
| import cv2 | |||||
| import webrtcvad | |||||
| min_silence_len = 400 | |||||
| def calc_dtw_sim(y1, y2, sr1, sr2, plot_result=False): | |||||
| hop_length = 64 | |||||
| assert sr1 == sr2 | |||||
| l = min(len(y1), len(y2)) | |||||
| to_consider = min(l, max(round(0.2*l), 2048)) | |||||
| min_len = millisecond_to_samples(100, sr1) | |||||
| bound = round(0.5 * l) | |||||
| if bound < min_len: | |||||
| bound = min_len | |||||
| #bound = max(round(0.2 * l), millisecond_to_samples(200, sr1)) | |||||
| y1 = y1[0:bound] | |||||
| y2 = y2[0:bound] | |||||
| if bound < 2048: | |||||
| n_fft = bound | |||||
| n_mels = 64 | |||||
| else: | |||||
| n_fft = 2048 | |||||
| n_mels = 128 | |||||
| mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:] | |||||
| mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:] | |||||
| D, wp = librosa.sequence.dtw(mfcc1, mfcc2) | |||||
| if plot_result: | |||||
| fig, ax = plt.subplots(nrows=4) | |||||
| img = librosa.display.specshow(D, x_axis='frames', y_axis='frames', | |||||
| ax=ax[0]) | |||||
| ax[0].set(title='DTW cost', xlabel='Noisy sequence', ylabel='Target') | |||||
| ax[0].plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y') | |||||
| ax[0].legend() | |||||
| fig.colorbar(img, ax=ax[0]) | |||||
| ax[1].plot(D[-1, :] / wp.shape[0]) | |||||
| ax[1].set(xlim=[0, mfcc1.shape[1]], | |||||
| title='Matching cost function') | |||||
| ax[2].imshow(mfcc1) | |||||
| ax[3].imshow(mfcc2) | |||||
| plt.show() | |||||
| total_alignment_cost = D[-1, -1] / wp.shape[0] | |||||
| return total_alignment_cost | |||||
| def calc_xcorr_sim(y1, y2, sr1, sr2): | |||||
| hop_length = 256 | |||||
| y1 = y1[0:round(len(y1)*0.2)] | |||||
| y2 = y2[0:round(len(y2)*0.2)] | |||||
| mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=13)[1:,:] | |||||
| mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=13)[1:,:] | |||||
| xsim = librosa.segment.cross_similarity(mfcc1, mfcc2, mode='distance') | |||||
| return xsim | |||||
| def match_target_amplitude(aChunk, target_dBFS): | |||||
| ''' Normalize given audio chunk ''' | |||||
| change_in_dBFS = target_dBFS - aChunk.dBFS | |||||
| return aChunk.apply_gain(change_in_dBFS) | |||||
| def spl_on_silence(): | |||||
| # Import the AudioSegment class for processing audio and the | |||||
| # Load your audio. | |||||
| song = AudioSegment.from_wav("recording.wav") | |||||
| # Split track where the silence is 2 seconds or more and get chunks using | |||||
| # the imported function. | |||||
| chunks = split_on_silence ( | |||||
| # Use the loaded audio. | |||||
| song, | |||||
| # Specify that a silent chunk must be at least 2 seconds or 2000 ms long. | |||||
| min_silence_len = 1000, | |||||
| # Consider a chunk silent if it's quieter than -16 dBFS. | |||||
| # (You may want to adjust this parameter.) | |||||
| silence_thresh = -50, | |||||
| timestamps=True | |||||
| ) | |||||
| ## Process each chunk with your parameters | |||||
| #for i, chunk in enumerate(chunks): | |||||
| # # Create a silence chunk that's 0.5 seconds (or 500 ms) long for padding. | |||||
| # silence_chunk = AudioSegment.silent(duration=500) | |||||
| # # Add the padding chunk to beginning and end of the entire chunk. | |||||
| # audio_chunk = silence_chunk + chunk + silence_chunk | |||||
| # # Normalize the entire chunk. | |||||
| # normalized_chunk = match_target_amplitude(audio_chunk, -20.0) | |||||
| # # Export the audio chunk with new bitrate. | |||||
| # print("Exporting chunk{0}.mp3.".format(i)) | |||||
| # normalized_chunk.export( | |||||
| # ".//chunk{0}.wav".format(i), | |||||
| # bitrate = "192k", | |||||
| # format = "wav" | |||||
| # ) | |||||
| return ([ audiosegment_to_librosawav(c) for c in chunks ], song.frame_rate) | |||||
| def non_silent_chunks(song): | |||||
| #song = AudioSegment.from_wav("recording.wav") | |||||
| return detect_nonsilent(song, min_silence_len=min_silence_len, silence_thresh=-50) | |||||
| def audiosegment_to_librosawav(audiosegment): | |||||
| channel_sounds = audiosegment.split_to_mono() | |||||
| samples = [s.get_array_of_samples() for s in channel_sounds] | |||||
| fp_arr = np.array(samples).T.astype(np.float32) | |||||
| fp_arr /= np.iinfo(samples[0].typecode).max | |||||
| fp_arr = fp_arr.reshape(-1) | |||||
| return fp_arr | |||||
| # sr = samples / second | |||||
| def millisecond_to_samples(ms, sr): | |||||
| return round((ms / 1000) * sr) | |||||
| def samples_to_millisecond(samples, sr): | |||||
| return (samples / sr) * 1000 | |||||
| def ms_to_time(ms): | |||||
| secs = ms / 1000 | |||||
| return "{0}:{1}".format(math.floor(secs / 60), secs % 60) | |||||
| def seg_is_speech(seg): | |||||
| f = lambda x: int(32768 * x) | |||||
| x = np.vectorize(f)(seg) | |||||
| pcm_data = x.tobytes() | |||||
| speeches = 0 | |||||
| total = 0 | |||||
| offset = 0 | |||||
| n = int(sr * (frame_duration_ms / 1000.0) * 2) | |||||
| duration = (float(n) / sr) / 2.0 | |||||
| while offset + n < len(pcm_data): | |||||
| frame = pcm_data[offset:(offset+n)] | |||||
| if vad.is_speech(frame, sr): | |||||
| speeches += 1 | |||||
| offset = offset + n | |||||
| total += 1 | |||||
| #return speeches / total | |||||
| return 1.0 | |||||
| def calculate_best_offset(mfcc_ref, mfcc_seg, sr): | |||||
| return librosa.segment.cross_similarity(mfcc_seg, mfcc_ref, mode='affinity', metric='cosine') | |||||
| def detect_lines(img, duration_x, duration_y): | |||||
| #print(img.shape) | |||||
| #print(np.min(img), np.max(img)) | |||||
| img = cv2.imread('affine_similarity.png') | |||||
| gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) | |||||
| kernel_size = 5 | |||||
| blur_gray = cv2.GaussianBlur(gray, (kernel_size, kernel_size), 0) | |||||
| low_threshold = 50 | |||||
| high_threshold = 150 | |||||
| edges = cv2.Canny(blur_gray, low_threshold, high_threshold) | |||||
| rho = 1 # distance resolution in pixels of the Hough grid | |||||
| theta = np.pi / 180 # angular resolution in radians of the Hough grid | |||||
| threshold = 15 # minimum number of votes (intersections in Hough grid cell) | |||||
| min_line_length = 50 # minimum number of pixels making up a line | |||||
| max_line_gap = 20 # maximum gap in pixels between connectable line segments | |||||
| line_image = np.copy(img) * 0 # creating a blank to draw lines on | |||||
| # Run Hough on edge detected image | |||||
| # Output "lines" is an array containing endpoints of detected line segments | |||||
| lines = cv2.HoughLinesP(edges, rho, theta, threshold, np.array([]), | |||||
| min_line_length, max_line_gap) | |||||
| width, height = img.shape[1], img.shape[0] | |||||
| scale_x = duration_x / width | |||||
| scale_y = duration_y / height | |||||
| print(img.shape, scale_x, scale_y, duration_x, duration_y) | |||||
| #slope = duration_y / duration_x | |||||
| slope = 1 | |||||
| expected_slope = scale_x / scale_y | |||||
| #expected_slope = 0.101694915 | |||||
| print(expected_slope) | |||||
| offsets = [] | |||||
| for line in lines: | |||||
| for x1,y1,x2,y2 in line: | |||||
| # swapped y1 and y2 since y is measured from the top | |||||
| slope = (y1-y2)/(x2-x1) | |||||
| if abs(slope - expected_slope) < 0.03: | |||||
| cv2.line(line_image,(x1,y1),(x2,y2),(255,0,0),5) | |||||
| cv2.putText(img, "{:.2f}".format(slope), (x1, y1), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=0.5,color=(0, 0, 255)) | |||||
| if (x1 / width) < 0.15: | |||||
| print(height-y1) | |||||
| y = height - y1 | |||||
| y0 = y - x1 * slope | |||||
| offsets.append(y0 * scale_y) | |||||
| #actual_lines.append((x1 * scale_x, (height - y1) * scale_y, x2 * scale_x, (height - y2) * scale_y)) | |||||
| #print(max(slopes)) | |||||
| lines_edges = cv2.addWeighted(img, 0.8, line_image, 1, 0) | |||||
| #cv2.imshow("lines", lines_edges) | |||||
| #cv2.waitKey(0) | |||||
| return offsets | |||||
| def map2d(x, y, f): | |||||
| n_x = len(x) | |||||
| n_y = len(y) | |||||
| res = np.zeros((n_x, n_y)) | |||||
| for i in range(n_x): | |||||
| for j in range(n_y): | |||||
| res[i,j] = f(x[i], y[j]) | |||||
| return res | |||||
| def make_widths_equal(fig, rect, ax1, ax2, ax3, pad): | |||||
| # pad in inches | |||||
| divider = VBoxDivider( | |||||
| fig, rect, | |||||
| horizontal=[Size.AxesX(ax1), Size.Scaled(1), Size.AxesX(ax2), Size.Scaled(1), Size.AxesX(ax3)], | |||||
| vertical=[Size.AxesY(ax1), Size.Fixed(pad), Size.AxesY(ax2), Size.Fixed(pad), Size.AxesY(ax3)]) | |||||
| ax1.set_axes_locator(divider.new_locator(0)) | |||||
| ax2.set_axes_locator(divider.new_locator(2)) | |||||
| ax3.set_axes_locator(divider.new_locator(4)) | |||||
| if __name__ == '__main__': | |||||
| #vad = webrtcvad.Vad() | |||||
| #hop_length = 128 | |||||
| #n_mfcc = 13 | |||||
| #frame_duration_ms = 10 | |||||
| fp = "hard_piece_7.wav" | |||||
| y, sr = librosa.load(fp, mono=True) | |||||
| song = AudioSegment.from_wav(fp) | |||||
| ts_non_sil_ms = non_silent_chunks(song) | |||||
| #print(y.shape) | |||||
| #mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)[1:,:] | |||||
| #print(mfcc.shape) | |||||
| #seg_duration_ms = 100 | |||||
| #seg_duration_samples = millisecond_to_samples(seg_duration_ms, sr) | |||||
| ## split complete audio in 10ms segments, only keep those that have voice in it | |||||
| ##segs = [] | |||||
| ##offset = 0 | |||||
| ###i = 0 | |||||
| ##while offset + seg_duration_samples < len(y): | |||||
| ## seg = y[ offset : offset + seg_duration_samples ] | |||||
| ## if seg_is_speech(seg): | |||||
| ## segs.append((seg, offset)) | |||||
| ## offset += seg_duration_samples | |||||
| ##segs = segs[1:] | |||||
| ##n_segs = len(segs) | |||||
| ##(seg, offset) = segs[0] | |||||
| fp_segment = "segment.wav" | |||||
| seg, sr_seg = librosa.load(fp_segment, mono=True) | |||||
| assert sr==sr_seg | |||||
| ##for seg in segs: | |||||
| #mfcc_seg = librosa.feature.mfcc(y=seg, sr=sr_seg, hop_length=hop_length, n_mfcc=n_mfcc)[1:,:] | |||||
| #xsim = calculate_best_offset(mfcc, mfcc_seg, sr) | |||||
| #fig, ax = plt.subplots(nrows=1, sharex=True) | |||||
| #img = librosa.display.specshow(xsim, x_axis='s', y_axis='s', hop_length=hop_length, ax=ax, cmap='magma_r') | |||||
| #print(detect_lines(xsim)) | |||||
| #ax.imshow(np.transpose(xsim), aspect='auto') | |||||
| #ax[1].imshow(diffs_penalised) | |||||
| #ax[1].imshow(np.reshape(vad_coeffs, (1, n_segs))) | |||||
| #ax[2].imshow(np.reshape(lengths, (1, n_segs))) | |||||
| #make_widths_equal(fig, 111, ax[0], ax[1], ax[2], pad=0.5) | |||||
| #plt.show() | |||||
| found_starts = sorted([ samples_to_millisecond(y0, sr) for y0 in detect_lines(None, len(seg), len(y))]) | |||||
| def f(ts, start): | |||||
| return abs(ts[0] - start) | |||||
| closest = map2d(ts_non_sil_ms, found_starts, f) | |||||
| plt.imshow(closest) | |||||
| plt.show() | |||||
| latest = -1 | |||||
| for i, row in enumerate(closest): | |||||
| # min silence len = 400 | |||||
| if min(row) < min_silence_len / 2: | |||||
| latest = ts_non_sil_ms[i] | |||||
| print("delete until:", ms_to_time(latest[0])) | |||||
| #print("possible starts:", [ ms_to_time(t) for t in found_starts]) | |||||
| #for n, seg in enumerate(segs): | |||||
| # sf.write('part' + str(n) + '.wav', seg, sr) | |||||
| #print(segs) | |||||
| #y1, sr1 = librosa.load("out000.wav") | |||||
| #y2, sr2 = librosa.load("out004.wav") | |||||
| #print("total alignment cost:", calc_dtw_sim(y1, y2, sr1, sr2, plot_result=True)) | |||||
| #print("xcorr:", np.trace(calc_xcorr_sim(y1, y2, sr1, sr2))) | |||||