Spaces:
Runtime error
Runtime error
| # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. | |
| # | |
| # Licensed under the Apache License, Version 2.0 (the "License"); | |
| # you may not use this file except in compliance with the License. | |
| # You may obtain a copy of the License at | |
| # | |
| # http://www.apache.org/licenses/LICENSE-2.0 | |
| # | |
| # Unless required by applicable law or agreed to in writing, software | |
| # distributed under the License is distributed on an "AS IS" BASIS, | |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| # See the License for the specific language governing permissions and | |
| # limitations under the License. | |
| import argparse | |
| import json | |
| import os | |
| from glob import glob | |
| import numpy as np | |
| from scipy.io import wavfile | |
| from tqdm import tqdm | |
| parser = argparse.ArgumentParser(description="Cut audio on the segments based on segments") | |
| parser.add_argument("--output_dir", type=str, help="Path to output directory", required=True) | |
| parser.add_argument( | |
| "--alignment", | |
| type=str, | |
| required=True, | |
| help="Path to a data directory with alignments or a single .txt file with timestamps - result of the ctc-segmentation", | |
| ) | |
| parser.add_argument("--threshold", type=float, default=-5, help="Minimum score value accepted") | |
| parser.add_argument("--offset", type=int, default=0, help="Offset, s") | |
| parser.add_argument("--batch_size", type=int, default=64, help="Batch size for inference") | |
| parser.add_argument( | |
| "--edge_duration", | |
| type=float, | |
| help="Duration of audio for mean absolute value calculation at the edges, s", | |
| default=0.05, | |
| ) | |
| parser.add_argument("--sample_rate", type=int, help="Sample rate, Hz", default=16000) | |
| parser.add_argument( | |
| "--max_duration", | |
| type=int, | |
| help="Maximum audio duration (seconds). Samples that are longer will be dropped", | |
| default=60, | |
| ) | |
| def process_alignment(alignment_file: str, manifest: str, clips_dir: str, args): | |
| """ Cut original audio file into audio segments based on alignment_file | |
| Args: | |
| alignment_file: path to the file with segmented text and corresponding time stamps. | |
| The first line of the file contains the path to the original audio file | |
| manifest: path to .json manifest to save segments metadata | |
| clips_dir: path to a directory to save audio clips | |
| args: main script args | |
| """ | |
| if not os.path.exists(alignment_file): | |
| raise ValueError(f"{alignment_file} not found") | |
| base_name = os.path.basename(alignment_file).replace("_segments.txt", "") | |
| # read the segments, note the first line contains the path to the original audio | |
| segments = [] | |
| ref_text_processed = [] | |
| ref_text_no_preprocessing = [] | |
| ref_text_normalized = [] | |
| with open(alignment_file, "r") as f: | |
| for line in f: | |
| line = line.split("|") | |
| # read audio file name from the first line | |
| if len(line) == 1: | |
| audio_file = line[0].strip() | |
| continue | |
| ref_text_processed.append(line[1].strip()) | |
| ref_text_no_preprocessing.append(line[2].strip()) | |
| ref_text_normalized.append(line[3].strip()) | |
| line = line[0].split() | |
| segments.append((float(line[0]) + args.offset / 1000, float(line[1]) + args.offset / 1000, float(line[2]))) | |
| # cut the audio into segments and save the final manifests at output_dir | |
| sampling_rate, signal = wavfile.read(audio_file) | |
| original_duration = len(signal) / sampling_rate | |
| num_samples = int(args.edge_duration * args.sample_rate) | |
| low_score_dur = 0 | |
| high_score_dur = 0 | |
| with open(manifest, "a", encoding="utf8") as f: | |
| for i, (st, end, score) in enumerate(segments): | |
| segment = signal[round(st * sampling_rate) : round(end * sampling_rate)] | |
| duration = len(segment) / sampling_rate | |
| if duration > args.max_duration: | |
| continue | |
| if duration > 0: | |
| text_processed = ref_text_processed[i].strip() | |
| text_no_preprocessing = ref_text_no_preprocessing[i].strip() | |
| text_normalized = ref_text_normalized[i].strip() | |
| if score >= args.threshold: | |
| high_score_dur += duration | |
| audio_filepath = os.path.join(clips_dir, f"{base_name}_{i:04}.wav") | |
| wavfile.write(audio_filepath, sampling_rate, segment) | |
| assert len(signal.shape) == 1 and sampling_rate == args.sample_rate, "check sampling rate" | |
| info = { | |
| "audio_filepath": audio_filepath, | |
| "duration": duration, | |
| "text": text_processed, | |
| "text_no_preprocessing": text_no_preprocessing, | |
| "text_normalized": text_normalized, | |
| "score": round(score, 2), | |
| "start_abs": float(np.mean(np.abs(segment[:num_samples]))), | |
| "end_abs": float(np.mean(np.abs(segment[-num_samples:]))), | |
| } | |
| json.dump(info, f, ensure_ascii=False) | |
| f.write("\n") | |
| else: | |
| low_score_dur += duration | |
| # keep track of duration of the deleted segments | |
| del_duration = 0 | |
| begin = 0 | |
| for i, (st, end, _) in enumerate(segments): | |
| if st - begin > 0.01: | |
| segment = signal[int(begin * sampling_rate) : int(st * sampling_rate)] | |
| duration = len(segment) / sampling_rate | |
| del_duration += duration | |
| begin = end | |
| segment = signal[int(begin * sampling_rate) :] | |
| duration = len(segment) / sampling_rate | |
| del_duration += duration | |
| stats = ( | |
| args.output_dir, | |
| base_name, | |
| round(original_duration), | |
| round(high_score_dur), | |
| round(low_score_dur), | |
| round(del_duration), | |
| ) | |
| return stats | |
| if __name__ == "__main__": | |
| args = parser.parse_args() | |
| print("Splitting audio files into segments...") | |
| if os.path.isdir(args.alignment): | |
| alignment_files = glob(f"{args.alignment}/*_segments.txt") | |
| else: | |
| alignment_files = [args.alignment] | |
| # create a directory to store segments with alignement confindence score avove the threshold | |
| args.output_dir = os.path.abspath(args.output_dir) | |
| clips_dir = os.path.join(args.output_dir, "clips") | |
| manifest_dir = os.path.join(args.output_dir, "manifests") | |
| os.makedirs(clips_dir, exist_ok=True) | |
| os.makedirs(manifest_dir, exist_ok=True) | |
| manifest = os.path.join(manifest_dir, "manifest.json") | |
| if os.path.exists(manifest): | |
| os.remove(manifest) | |
| stats_file = os.path.join(args.output_dir, "stats.tsv") | |
| with open(stats_file, "w") as f: | |
| f.write("Folder\tSegment\tOriginal dur (s)\tHigh quality dur (s)\tLow quality dur (s)\tDeleted dur (s)\n") | |
| high_score_dur = 0 | |
| low_score_dur = 0 | |
| del_duration = 0 | |
| original_dur = 0 | |
| for alignment_file in tqdm(alignment_files): | |
| stats = process_alignment(alignment_file, manifest, clips_dir, args) | |
| original_dur += stats[-4] | |
| high_score_dur += stats[-3] | |
| low_score_dur += stats[-2] | |
| del_duration += stats[-1] | |
| stats = "\t".join([str(t) for t in stats]) + "\n" | |
| f.write(stats) | |
| f.write(f"Total\t\t{round(high_score_dur)}\t{round(low_score_dur)}\t{del_duration}") | |
| print(f"Original duration : {round(original_dur / 60)}min") | |
| print(f"High score segments: {round(high_score_dur / 60)}min ({round(high_score_dur/original_dur*100)}%)") | |
| print(f"Low score segments : {round(low_score_dur / 60)}min ({round(low_score_dur/original_dur*100)}%)") | |
| print(f"Deleted segments : {round(del_duration / 60)}min ({round(del_duration/original_dur*100)}%)") | |
| print(f"Stats saved at {stats_file}") | |
| print(f"Manifest saved at {manifest}") | |