|
- import librosa
- import librosa.display
- import numpy as np
- import matplotlib.pyplot as plt
- import soundfile as sf
- from pydub import AudioSegment
- from pydub.silence import split_on_silence, detect_nonsilent
- import math
- import wave
- import contextlib
- import random
- from mpl_toolkits.axes_grid1.axes_divider import VBoxDivider
- import mpl_toolkits.axes_grid1.axes_size as Size
- import cv2
- import sys
-
- import webrtcvad
-
- min_silence_len = 400
- frame_duration_ms = 10
-
-
- def calc_dtw_sim(y1, y2, sr1, sr2, plot_result=False):
- hop_length = 64
- assert sr1 == sr2
-
- l = min(len(y1), len(y2))
-
- to_consider = min(l, max(round(0.2*l), 2048))
- min_len = millisecond_to_samples(100, sr1)
- bound = round(0.5 * l)
- if bound < min_len:
- bound = min_len
- #bound = max(round(0.2 * l), millisecond_to_samples(200, sr1))
-
- y1 = y1[0:bound]
- y2 = y2[0:bound]
-
- if bound < 2048:
- n_fft = bound
- n_mels = 64
- else:
- n_fft = 2048
- n_mels = 128
-
- mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:]
- mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=42, n_fft=n_fft, n_mels=n_mels)[1:,:]
-
- D, wp = librosa.sequence.dtw(mfcc1, mfcc2)
-
- if plot_result:
- fig, ax = plt.subplots(nrows=4)
-
- img = librosa.display.specshow(D, x_axis='frames', y_axis='frames',
- ax=ax[0])
-
- ax[0].set(title='DTW cost', xlabel='Noisy sequence', ylabel='Target')
-
- ax[0].plot(wp[:, 1], wp[:, 0], label='Optimal path', color='y')
-
- ax[0].legend()
-
- fig.colorbar(img, ax=ax[0])
-
- ax[1].plot(D[-1, :] / wp.shape[0])
-
- ax[1].set(xlim=[0, mfcc1.shape[1]],
- title='Matching cost function')
-
- ax[2].imshow(mfcc1)
- ax[3].imshow(mfcc2)
-
- plt.show()
- total_alignment_cost = D[-1, -1] / wp.shape[0]
-
- return total_alignment_cost
-
-
- def calc_xcorr_sim(y1, y2, sr1, sr2):
- hop_length = 256
-
- y1 = y1[0:round(len(y1)*0.2)]
- y2 = y2[0:round(len(y2)*0.2)]
-
- mfcc1 = librosa.feature.mfcc(y=y1, sr=sr1, hop_length=hop_length, n_mfcc=13)[1:,:]
- mfcc2 = librosa.feature.mfcc(y=y2, sr=sr2, hop_length=hop_length, n_mfcc=13)[1:,:]
-
- xsim = librosa.segment.cross_similarity(mfcc1, mfcc2, mode='distance')
- return xsim
-
-
- def match_target_amplitude(aChunk, target_dBFS):
- ''' Normalize given audio chunk '''
- change_in_dBFS = target_dBFS - aChunk.dBFS
- return aChunk.apply_gain(change_in_dBFS)
-
-
- def spl_on_silence():
- # Import the AudioSegment class for processing audio and the
-
- # Load your audio.
- song = AudioSegment.from_wav("recording.wav")
-
- # Split track where the silence is 2 seconds or more and get chunks using
- # the imported function.
- chunks = split_on_silence (
- # Use the loaded audio.
- song,
- # Specify that a silent chunk must be at least 2 seconds or 2000 ms long.
- min_silence_len = 1000,
- # Consider a chunk silent if it's quieter than -16 dBFS.
- # (You may want to adjust this parameter.)
- silence_thresh = -50,
- timestamps=True
- )
-
- ## Process each chunk with your parameters
- #for i, chunk in enumerate(chunks):
- # # Create a silence chunk that's 0.5 seconds (or 500 ms) long for padding.
- # silence_chunk = AudioSegment.silent(duration=500)
-
- # # Add the padding chunk to beginning and end of the entire chunk.
- # audio_chunk = silence_chunk + chunk + silence_chunk
-
- # # Normalize the entire chunk.
- # normalized_chunk = match_target_amplitude(audio_chunk, -20.0)
-
- # # Export the audio chunk with new bitrate.
- # print("Exporting chunk{0}.mp3.".format(i))
- # normalized_chunk.export(
- # ".//chunk{0}.wav".format(i),
- # bitrate = "192k",
- # format = "wav"
- # )
- return ([ audiosegment_to_librosawav(c) for c in chunks ], song.frame_rate)
-
-
- def non_silent_chunks(song):
- #song = AudioSegment.from_wav("recording.wav")
-
- return detect_nonsilent(song, min_silence_len=min_silence_len, silence_thresh=-50)
-
-
- def audiosegment_to_librosawav(audiosegment):
- channel_sounds = audiosegment.split_to_mono()
- samples = [s.get_array_of_samples() for s in channel_sounds]
-
- fp_arr = np.array(samples).T.astype(np.float32)
- fp_arr /= np.iinfo(samples[0].typecode).max
- fp_arr = fp_arr.reshape(-1)
-
- return fp_arr
-
-
- # sr = samples / second
- def millisecond_to_samples(ms, sr):
- return round((ms / 1000) * sr)
-
-
- def samples_to_millisecond(samples, sr):
- return (samples / sr) * 1000
-
-
- def samples_to_time(samples, sr):
- return ms_to_time(samples_to_millisecond(samples, sr))
-
-
- def ms_to_time(ms):
- secs = ms / 1000
- return "{0}:{1:.4f}".format(math.floor(secs / 60), secs % 60)
-
-
- def seg_is_speech(seg):
- f = lambda x: int(32768 * x)
- x = np.vectorize(f)(seg)
- pcm_data = x.tobytes()
-
- speeches = 0
- total = 0
- offset = 0
- n = int(sr * (frame_duration_ms / 1000.0) * 2)
- duration = (float(n) / sr) / 2.0
-
- while offset + n < len(pcm_data):
- frame = pcm_data[offset:(offset+n)]
- if vad.is_speech(frame, sr):
- speeches += 1
- offset = offset + n
- total += 1
-
- return speeches / total
-
-
- def calculate_best_offset(mfcc_ref, mfcc_seg, sr):
- return librosa.segment.cross_similarity(mfcc_seg, mfcc_ref, mode='affinity', metric='cosine')
-
-
- def detect_lines(img, duration_x, duration_y, plot_result=False):
- #print(img.shape)
- #print(np.min(img), np.max(img))
- gray = np.vectorize(int)((1-img) * 255).astype('uint8')
- img = cv2.cvtColor(gray, cv2.COLOR_GRAY2BGR)
- #print(img, type(img))
- #img = cv2.imread('affine_similarity_2.png')
- #gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
- #cv2.imshow("gray", gray)
- #cv2.waitKey(0)
- #print(gray, type(gray), gray.shape, gray.dtype)
- #print(gray2, type(gray2), gray2.shape, gray2.dtype)
- kernel_size = 5
- blur_gray = cv2.GaussianBlur(gray, (kernel_size, kernel_size), 0)
- #cv2.imshow("blur gray", blur_gray)
- #cv2.waitKey(0)
-
- low_threshold = 50
- high_threshold = 150
- edges = cv2.Canny(blur_gray, low_threshold, high_threshold)
-
- rho = 1 # distance resolution in pixels of the Hough grid
- theta = np.pi / 180 # angular resolution in radians of the Hough grid
- threshold = 15 # minimum number of votes (intersections in Hough grid cell)
- min_line_length = 50 # minimum number of pixels making up a line
- max_line_gap = 20 # maximum gap in pixels between connectable line segments
- if plot_result:
- line_image = np.copy(img) * 0 # creating a blank to draw lines on
-
- # Run Hough on edge detected image
- # Output "lines" is an array containing endpoints of detected line segments
- lines = cv2.HoughLinesP(edges, rho, theta, threshold, np.array([]),
- min_line_length, max_line_gap)
-
- width, height = img.shape[1], img.shape[0]
-
- scale_x = duration_x / width
- scale_y = duration_y / height
- #print(img.shape, scale_x, scale_y, duration_x, duration_y)
-
- #slope = duration_y / duration_x
- slope = 1
-
- expected_slope = scale_x / scale_y
- #print(expected_slope)
- #expected_slope = 1.0 # y is inverted by opencv
- #expected_slope = 0.101694915
-
- ls = []
- offsets = []
- xs = []
- if lines is not None:
- for line in lines:
- for x1,y1,x2,y2 in line:
- # swapped y1 and y2 since y is measured from the top
- slope = (y2-y1)/(x2-x1) if x2 != x1 else 42
- if abs(slope - expected_slope) < 0.15:#and (x1 / width) < 0.15:
- y = y1
- y0 = (y - x1 * slope)
- if plot_result:
- #cv2.line(line_image,(0,int(y0)),(x2,y2),(0,255,0),5)
- cv2.line(line_image,(x1, y1),(x2,y2),(255,0,0),5)
- cv2.putText(img, "{:.2f}".format(slope), (x1, y1), fontFace=cv2.FONT_HERSHEY_COMPLEX, fontScale=2,color=(0, 0, 255))
- #if (x1 / width) < 0.15:
- #print(height-y1)
- #y = height - y1
- #y = y1
- #y0 = y - x1 * slope
- #offsets.append(y0 * scale_y)
- #xs.append(x1)
- ls.append((x1, y1, slope))
- #actual_lines.append((x1 * scale_x, (height - y1) * scale_y, x2 * scale_x, (height - y2) * scale_y))
- #print(max(slopes))
- x_min = min(ls, key=lambda a: a[0])[0] if len(ls) > 0 else 42 # just something > 10
- offsets = [ (y1 + (x_min - x1)*slope) * scale_y for x1, y1, slope in ls ]
- if plot_result:
- for x1, y1, slope in ls:
- y = y1 + (x_min -x1)*slope
- #cv2.line(line_image,(x_min,int(y)),(x1,y1),(0,255,0),5)
-
- #cv2.line(line_image, (x_min, 0), (x_min, height-1), (0, 0, 255), 2)
- lines_edges = cv2.addWeighted(img, 0.8, line_image, 1, 0)
- lines_edges_resized = cv2.resize(lines_edges, (int(1024 * duration_x / duration_y ), 1024))
- cv2.imshow("lines", lines_edges_resized)
- cv2.waitKey(0)
- return (x_min*scale_x, offsets)
-
-
- def map2d(x, y, f):
- n_x = len(x)
- n_y = len(y)
- res = np.zeros((n_x, n_y))
- for i in range(n_x):
- for j in range(n_y):
- res[i,j] = f(x[i], y[j])
- return res
-
-
- def find_repetition(mfcc_ref, seg, sr, hop_length, sentence_timestamps, plot_result=False):
- mfcc_seg = librosa.feature.mfcc(y=seg, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)[1:,:]
- xsim = calculate_best_offset(mfcc_ref, mfcc_seg, sr)
- x_min, offsets = detect_lines(xsim, len(seg), mfcc_ref.shape[1] * hop_length, plot_result=plot_result)
- found_starts = sorted([ samples_to_millisecond(y0, sr) for y0 in offsets ])
-
- def f(ts, start):
- return abs(ts - start)
-
- closest = map2d(sentence_timestamps, found_starts, f)
- if plot_result:
- plt.imshow(closest)
- plt.show()
- latest = None
- for i, row in enumerate(closest):
- if len(row) == 0:
- continue
- if min(row) < min_silence_len / 2:
- latest = sentence_timestamps[i]
- return (samples_to_millisecond(x_min, sr), latest)
-
-
- def samples_to_hops(samples, hop_length):
- return round(samples / hop_length)
-
-
- def hops_to_samples(hops, hop_length):
- return round(hop_length * hops)
-
-
- def cont_find_repetitions(y, sr, hop_length, sentence_timestamps):
- assert sorted(sentence_timestamps, key=lambda t: t[0]) == sentence_timestamps
- #print(y.shape)
- mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)[1:,:]
-
- step_length_ms = 200
- step_length_samples = millisecond_to_samples(step_length_ms, sr)
- window_length_ms = 1500
- window_length_samples = millisecond_to_samples(window_length_ms, sr)
- ref_window_length_ms = 20*1000 # 10 sekunden
- ref_window_length_samples = millisecond_to_samples(ref_window_length_ms, sr)
- ref_window_length_hops = samples_to_hops(ref_window_length_samples, hop_length)
-
- offset = 0
- available_ts = sentence_timestamps
- last_sentence_end = 0
- deletion_suggestions = []
-
- while offset + step_length_samples < len(y) and len(available_ts) > 0:
- offset_ms = samples_to_millisecond(offset, sr)
- #print(ms_to_time(offset_ms), file=sys.stderr)
- if offset_ms < available_ts[0][0] and offset_ms >= last_sentence_end:
- offset += step_length_samples
- continue
- seg = y[ offset : offset + window_length_samples ]
- # no longer needed since skipping based on sentence timestamps?
- #if seg_is_speech(seg) < 0.5:
- # offset += step_length_samples
- # continue
- relevant_start = offset_ms
- mfcc_window = mfcc[:,samples_to_hops(offset, hop_length) : samples_to_hops(offset, hop_length) + ref_window_length_hops]
- x_offset_ms, ts_ms = find_repetition(mfcc_window,
- seg,
- sr,
- hop_length,
- [ t[0] - offset_ms for t in available_ts ])
- if ts_ms is not None and x_offset_ms < step_length_ms:
- print("delete from {0} to {1}".format(samples_to_time(offset + millisecond_to_samples(x_offset_ms, sr), sr), ms_to_time(offset_ms + ts_ms)))
- deletion_suggestions.append((offset_ms + x_offset_ms, offset_ms + ts_ms))
- #print("window {0} - {1} is repeated at: {2}".format(samples_to_time(offset, sr), samples_to_time(offset + window_length_samples, sr), ms_to_time(ts_ms)))
- offset += step_length_samples
- if offset_ms + step_length_ms > available_ts[0][0]:
- last_sentence_end = available_ts[0][1]
- available_ts = available_ts[1:]
- #available_ts = [t for t in ts_non_sil_ms if t[0] > offset_ms ]
- deletions = []
- cur_deletion = None
- for sugg in deletion_suggestions:
- if cur_deletion is None:
- cur_deletion = [sugg]
- else:
- if sugg[0] - cur_deletion[-1][0] < 250:
- cur_deletion.append(sugg)
- else:
- deletions.append(cur_deletion)
- cur_deletion = [sugg]
- deletions = [(np.mean([d[0] for d in ds]), np.max([d[1] for d in ds])) for ds in deletions]
- for n, d in enumerate(deletions):
- offs = [abs(d[0]-ts[0]) for ts in sentence_timestamps]
- i = np.argmin(offs)
- if offs[i] < 150:
- deletions[n] = (sentence_timestamps[i][0], d[1])
- else:
- deletions[n] = (d[0], d[1])
- return deletions
-
-
- def make_widths_equal(fig, rect, ax1, ax2, ax3, pad):
- # pad in inches
- divider = VBoxDivider(
- fig, rect,
- horizontal=[Size.AxesX(ax1), Size.Scaled(1), Size.AxesX(ax2), Size.Scaled(1), Size.AxesX(ax3)],
- vertical=[Size.AxesY(ax1), Size.Fixed(pad), Size.AxesY(ax2), Size.Fixed(pad), Size.AxesY(ax3)])
- ax1.set_axes_locator(divider.new_locator(0))
- ax2.set_axes_locator(divider.new_locator(2))
- ax3.set_axes_locator(divider.new_locator(4))
-
-
- if __name__ == '__main__':
- vad = webrtcvad.Vad()
- hop_length = 128
- n_mfcc = 42
-
- fp = "hard_pieces.wav"
- print("loading file ...")
- y, sr = librosa.load(fp, mono=True, sr=32000)
- print("calculating mfcc ...")
- mfcc = librosa.feature.mfcc(y=y, sr=sr, hop_length=hop_length, n_mfcc=n_mfcc)[1:,:]
- song = AudioSegment.from_wav(fp)
- mf_w = mfcc.shape[1]
- l = y.shape[0]
- print(l / mf_w)
-
- ts_non_sil_ms = non_silent_chunks(song)
-
- #autocorr = librosa.autocorrelate(y)
- #fig, ax = plt.subplots()
- #ax.plot(autocorr)
- #plt.show()
- #ts_non_sil_ms = [ t[0] for t in non_silent_chunks(song) ]
- #print(mfcc.shape)
- #print("finding reps ...")
- dels = cont_find_repetitions(y, sr, hop_length, ts_non_sil_ms)
-
- for d in dels:
- print("{0}\t{1}\tdelete".format(d[0]/1000, d[1]/1000))
- #window_length_ms = 1000
- #window_length_samples = millisecond_to_samples(window_length_ms, sr)
- #seg = y[25280 : 25280 + window_length_samples]
-
- #seg_duration_ms = 100
- #seg_duration_samples = millisecond_to_samples(seg_duration_ms, sr)
-
- ## split complete audio in 10ms segments, only keep those that have voice in it
- ##segs = []
- ##offset = 0
- ###i = 0
- ##while offset + seg_duration_samples < len(y):
- ## seg = y[ offset : offset + seg_duration_samples ]
- ## if seg_is_speech(seg):
- ## segs.append((seg, offset))
- ## offset += seg_duration_samples
-
- ##segs = segs[1:]
- ##n_segs = len(segs)
-
- ##(seg, offset) = segs[0]
-
- fp_segment = "segment.wav"
- #seg = y
- #sr_seg = sr
- seg, sr_seg = librosa.load(fp_segment, mono=True, sr=32000)
-
- #assert sr==sr_seg
- #mfcc_window = mfcc[:,1000:]
-
- #x_offset, ts_ms = find_repetition(mfcc_window, seg, sr, hop_length, [ t[0] for t in ts_non_sil_ms], plot_result=True)
- #if ts_ms is not None:
- # print("starting from {0} the seg is repeated at {1}".format(ms_to_time(x_offset), ms_to_time(ts_ms)))
- #else:
- # print("no rep found")
-
- #cutoff = int(0.2*len(seg))
- #print(samples_to_millisecond(cutoff, sr))
-
- #print("calculating xcross ...")
- #xsim = librosa.segment.cross_similarity(mfcc, mfcc, mode='affinity', metric='cosine')
- #chroma = librosa.feature.chroma_cqt(y=y, sr=sr, hop_length=hop_length)
- #mfcc_stack = librosa.feature.stack_memory(mfcc, n_steps=10, delay=3)
- #xsim = librosa.segment.recurrence_matrix(mfcc, mode='affinity', metric='cosine')
- #lag = librosa.segment.recurrence_to_lag(xsim, pad=False)
-
- #xsim = librosa.segment.recurrence_matrix(mfcc, mode='affinity', metric='cosine',
- # width=50)
- #fig, ax = plt.subplots(nrows=1, sharex=True)
- #img = librosa.display.specshow(xsim, x_axis='s', y_axis='s', hop_length=hop_length, ax=ax, cmap='magma_r')
- #plt.show()
- print("detecting lines ...")
- #detect_lines(np.flip(xsim, 0), len(y), len(y), plot_result=True)
- #print(detect_lines(xsim))
- #ax.imshow(np.transpose(xsim), aspect='auto')
- #ax[1].imshow(diffs_penalised)
- #ax[1].imshow(np.reshape(vad_coeffs, (1, n_segs)))
- #ax[2].imshow(np.reshape(lengths, (1, n_segs)))
-
- #make_widths_equal(fig, 111, ax[0], ax[1], ax[2], pad=0.5)
- #plt.show()
- #print("possible starts:", [ ms_to_time(t) for t in found_starts])
- #for n, seg in enumerate(segs):
- # sf.write('part' + str(n) + '.wav', seg, sr)
- #print(segs)
-
- #y1, sr1 = librosa.load("out000.wav")
- #y2, sr2 = librosa.load("out004.wav")
-
- #print("total alignment cost:", calc_dtw_sim(y1, y2, sr1, sr2, plot_result=True))
- #print("xcorr:", np.trace(calc_xcorr_sim(y1, y2, sr1, sr2)))
|