Stt / app_ori.py
archivartaunik's picture
Rename app.py to app_ori.py
4ca1cc1 verified
import gradio as gr
from argparse import Namespace
import os
import shutil
import sys
#sys.path.insert(0, '/content/sub-tools/src') # заменіце шлях на рэальны
from sub_tools.media.converter import hls_to_media, media_to_signature, video_to_audio
from sub_tools.media.segmenter import segment_audio
from sub_tools.subtitles.combiner import combine_subtitles
from sub_tools.system.directory import change_directory
from sub_tools.system.console import header, success, error
from sub_tools.transcribe import transcribe
def main_logic(args: Namespace) -> tuple:
"""
Асноўная логіка прыкладання: ад загрузкі відэа/аўдыё да зліцця субтытраў.
Пасля зліцця субтытраў вяртае (тэкст субтытраў, шлях да SRT‑файла для спампоўкі).
"""
output_str = ""
subtitles_text = ""
srt_file_path = None
try:
change_directory(args.output_path)
step = 1
if "video" in args.tasks:
if not args.hls_url:
output_str += f"{step}. Download Video: No video file uploaded\n"
raise Exception("No video file uploaded")
header(f"{step}. Download Video")
output_str += f"{step}. Download Video: Started\n"
hls_to_media(args.hls_url, args.video_file, False, args.overwrite)
success("Done!")
output_str += "Done!\n"
step += 1
if "audio" in args.tasks:
header(f"{step}. Video to Audio")
output_str += f"{step}. Video to Audio: Started\n"
video_to_audio(args.video_file, args.audio_file, args.overwrite)
success("Done!")
output_str += "Done!\n"
step += 1
if "signature" in args.tasks:
header(f"{step}. Audio to Signature")
output_str += f"{step}. Audio to Signature: Started\n"
media_to_signature(args.audio_file, args.signature_file, args.overwrite)
success("Done!")
output_str += "Done!\n"
step += 1
if "segment" in args.tasks:
header(f"{step}. Segment Audio")
output_str += f"{step}. Segment Audio: Started\n"
segment_audio(args.audio_file, args.audio_segment_prefix, args.audio_segment_format, args.audio_segment_length, args.overwrite)
success("Done!")
output_str += "Done!\n"
step += 1
if "transcribe" in args.tasks:
if not (args.gemini_api_key and args.gemini_api_key.strip()):
output_str += f"{step}. Transcribe Audio: No Gemini API Key provided\n"
raise Exception("No Gemini API Key provided")
header(f"{step}. Transcribe Audio")
output_str += f"{step}. Transcribe Audio: Started\n"
transcribe(args)
success("Done!")
output_str += "Done!\n"
step += 1
if "combine" in args.tasks:
header(f"{step}. Combine Subtitles")
output_str += f"{step}. Combine Subtitles: Started\n"
combine_subtitles(args.languages, args.audio_segment_prefix, args.audio_segment_format)
success("Done!")
output_str += "Done!\n"
if args.languages:
language = args.languages[0]
srt_file_path = os.path.join(os.getcwd(), f"{language}.srt")
try:
with open(srt_file_path, "r", encoding="utf-8") as f:
subtitles_text = f.read()
except Exception as e:
subtitles_text = f"Error reading subtitles file: {str(e)}"
else:
subtitles_text = "No language specified"
step += 1
return (subtitles_text, srt_file_path)
except Exception as e:
error_msg = f"Error: {str(e)}"
error(error_msg)
return (error_msg, None)
def run_subtools(
tasks,
hls_url,
video_file,
audio_file,
signature_file,
output_path,
languages,
overwrite,
retry,
gemini_api_key,
debug,
audio_segment_prefix,
audio_segment_format,
audio_segment_length
):
"""
Падрыхтоўка каталога вываду і запуск асноўнай логікі.
"""
if os.path.exists(output_path):
shutil.rmtree(output_path)
os.makedirs(output_path, exist_ok=True)
if isinstance(languages, str):
languages = [lang.strip() for lang in languages.split(",") if lang.strip()]
args = Namespace(
tasks=tasks,
hls_url=hls_url,
video_file=video_file,
audio_file=audio_file,
signature_file=signature_file,
output_path=output_path,
languages=languages,
overwrite=overwrite,
retry=retry,
gemini_api_key=gemini_api_key,
debug=debug,
audio_segment_prefix=audio_segment_prefix,
audio_segment_format=audio_segment_format,
audio_segment_length=audio_segment_length,
)
return main_logic(args)
# Функцыі для дэактывацыі процілеглага поля пры змене
def update_on_audio_change(audio):
if audio is not None:
return gr.update(interactive=False)
return gr.update(interactive=True)
def update_on_video_change(video):
if video is not None:
return gr.update(interactive=False)
return gr.update(interactive=True)
def process_uploaded_file(audio, video):
"""
Выбірае, які файл загружаны, і фармуе параметры для апрацоўкі.
Калі загружаны аўдыёфайл – запускае апрацоўку для аўдыё,
калі відэафайл – запускае поўны ланцуг апрацоўкі.
"""
# Калі загружаны толькі аўдыёфайл:
if audio is not None and video is None:
tasks = ["signature", "segment", "transcribe", "combine"]
video_file = "" # відэа не выкарыстоўваецца
audio_file = audio
hls_url = "" # не патрабуецца
# Калі загружаны толькі відэафайл:
elif video is not None and audio is None:
tasks = ["video", "audio", "signature", "segment", "transcribe", "combine"]
video_file = video
audio_file = "audio.mp3" # прызначаем імя для аўдыёфайла
hls_url = "dummy" # задаём няпустое значэнне для праверкі
else:
return "Error: Загрузіце толькі АЎДЫЁ або ВІДЭАфайл, а не абодва.", None
return run_subtools(
tasks=tasks,
hls_url=hls_url,
video_file=video_file,
audio_file=audio_file,
signature_file="message.shazamsignature",
output_path="output",
languages="be",
overwrite=False,
retry=50,
gemini_api_key="AIzaSyCwvZ_s4TvxoMaegbpQOOW1nzjZ6IbqGbg",
debug=False,
audio_segment_prefix="audio_segment",
audio_segment_format="mp3",
audio_segment_length=300000
)
# --------------------- Gradio UI ---------------------
with gr.Blocks() as demo:
with gr.Row():
audio_input = gr.Audio(type="filepath", label="Аўдыёфайл")
video_input = gr.Video(label="Відэафайл")
# Пры змене аднаго з палёў дэактывуецца процілеглае
audio_input.change(fn=update_on_audio_change, inputs=audio_input, outputs=video_input)
video_input.change(fn=update_on_video_change, inputs=video_input, outputs=audio_input)
submit_btn = gr.Button("Submit")
output_text = gr.Textbox(label="Тэкст субтытраў")
output_file = gr.File(label="Спампаваць SRT файл")
submit_btn.click(fn=process_uploaded_file, inputs=[audio_input, video_input], outputs=[output_text, output_file])
demo.launch()