|
|
import gradio as gr |
|
|
import shutil |
|
|
import os |
|
|
import subprocess |
|
|
from src.mcp.tools import letter_counter |
|
|
from src.mcp import video_tools |
|
|
from src.llm import llm |
|
|
from gradio.oauth import OAuthToken |
|
|
|
|
|
def is_ffmpeg_installed(): |
|
|
"""Checks if FFmpeg is installed and accessible in the system's PATH.""" |
|
|
return shutil.which("ffmpeg") is not None |
|
|
|
|
|
def update_ffmpeg_status(): |
|
|
"""Creates a colored status message indicating if FFmpeg is installed.""" |
|
|
if is_ffmpeg_installed(): |
|
|
return "<p style='color:green; font-weight:bold;'>ffmpeg is installed</p>", True |
|
|
return "<p style='color:red; font-weight:bold;'>ffmpeg is not installed</p>", False |
|
|
|
|
|
def handle_video_upload(file_obj, ffmpeg_installed): |
|
|
""" |
|
|
Validates an uploaded MP4 file, saves it to a temporary directory, |
|
|
and controls the visibility of UI components based on validation success. |
|
|
""" |
|
|
if not ffmpeg_installed: |
|
|
return None, "Cannot process video: FFmpeg is not installed.", None, gr.update(visible=False) |
|
|
if not file_obj: |
|
|
return None, "Please upload a file.", None, gr.update(visible=False) |
|
|
|
|
|
tmp_dir = "tmp" |
|
|
if not os.path.exists(tmp_dir): |
|
|
os.makedirs(tmp_dir) |
|
|
|
|
|
|
|
|
if not file_obj.name.lower().endswith('.mp4'): |
|
|
return None, "File must be an .mp4 file.", None, gr.update(visible=False) |
|
|
|
|
|
|
|
|
try: |
|
|
subprocess.run( |
|
|
["ffprobe", "-v", "error", "-show_format", "-show_streams", file_obj.name], |
|
|
capture_output=True, text=True, check=True |
|
|
) |
|
|
except (subprocess.CalledProcessError, FileNotFoundError): |
|
|
return None, "Could not validate file. Ensure it's a valid MP4.", None, gr.update(visible=False) |
|
|
|
|
|
file_path = os.path.join(tmp_dir, os.path.basename(file_obj.name)) |
|
|
shutil.copy(file_obj.name, file_path) |
|
|
|
|
|
return file_path, "File uploaded successfully!", file_path, gr.update(visible=True) |
|
|
|
|
|
def clear_previous_outputs(): |
|
|
"""Clears all video-related outputs to ensure a clean state for new uploads.""" |
|
|
return ( |
|
|
None, |
|
|
"", |
|
|
None, |
|
|
gr.update(visible=False), |
|
|
None, |
|
|
None, |
|
|
None, |
|
|
"", |
|
|
"", |
|
|
"", |
|
|
) |
|
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
|
|
ffmpeg_installed_state = gr.State(False) |
|
|
uploaded_video_path_state = gr.State("") |
|
|
|
|
|
with gr.Tabs(): |
|
|
|
|
|
with gr.Tab("Setup & Video"): |
|
|
gr.Markdown("## System Status") |
|
|
with gr.Row(): |
|
|
check_ffmpeg_btn = gr.Button("Check FFmpeg Status") |
|
|
status_text = gr.Markdown("Status will be checked on load.") |
|
|
|
|
|
gr.Markdown("---") |
|
|
gr.Markdown("## Video Tools") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Upload Video") |
|
|
file_input = gr.File(label="Upload MP4", file_types=[".mp4"]) |
|
|
video_output = gr.Video(label="Preview", interactive=False, height="50vh") |
|
|
upload_status_text = gr.Textbox(label="Upload Status", interactive=False) |
|
|
with gr.Column(scale=1): |
|
|
|
|
|
pass |
|
|
|
|
|
with gr.Tab("Debug"): |
|
|
with gr.Column(scale=2, visible=False) as video_tools_group: |
|
|
gr.Markdown("### Manual Frame Extraction") |
|
|
with gr.Row(): |
|
|
get_first_frame_btn = gr.Button("Get First Frame") |
|
|
get_last_frame_btn = gr.Button("Get Last Frame") |
|
|
with gr.Row(): |
|
|
first_frame_img = gr.Image(label="First Frame", type="filepath", interactive=False) |
|
|
last_frame_img = gr.Image(label="Last Frame", type="filepath", interactive=False) |
|
|
|
|
|
with gr.Tab("LLM Video Commands"): |
|
|
gr.Markdown("## Test MCP Tool Calls with an LLM") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### Video Command Center") |
|
|
video_prompt_textbox = gr.Textbox(label="Video Command Prompt", placeholder="e.g., 'Get me the first frame of the video'") |
|
|
process_prompt_btn = gr.Button("Run Command") |
|
|
video_command_status = gr.Textbox(label="LLM Status", interactive=False) |
|
|
|
|
|
with gr.Accordion("Debug Info", open=False): |
|
|
llm_debug_output = gr.Textbox(label="Debug Info", lines=10, interactive=False) |
|
|
llm_raw_response = gr.Textbox(label="Raw LLM Response", lines=10, interactive=True) |
|
|
|
|
|
with gr.Column(scale=1): |
|
|
gr.Markdown("### LLM Result") |
|
|
llm_media_output = gr.Image(label="Output", type="filepath", interactive=False) |
|
|
|
|
|
|
|
|
with gr.Tab("LLM Configuration"): |
|
|
gr.Markdown("## Configure LLM") |
|
|
llm_provider = gr.Radio( |
|
|
["Ollama", "Hugging Face"], |
|
|
value="Ollama", |
|
|
label="LLM Provider", |
|
|
info="Select the LLM provider to use." |
|
|
) |
|
|
|
|
|
|
|
|
model_name_state = gr.State("") |
|
|
|
|
|
with gr.Group(visible=True) as ollama_config: |
|
|
gr.Markdown("### Ollama Configuration") |
|
|
with gr.Row(): |
|
|
ollama_url_textbox = gr.Textbox( |
|
|
placeholder="http://localhost:11434", |
|
|
label="Ollama Endpoint URL", |
|
|
interactive=True, |
|
|
elem_id="ollama_url" |
|
|
) |
|
|
check_endpoint_btn = gr.Button("Check Endpoint") |
|
|
ollama_status_textbox = gr.Textbox(label="Status", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
ollama_model_dropdown = gr.Dropdown( |
|
|
label="Select a Model", |
|
|
interactive=True, |
|
|
visible=False, |
|
|
elem_id="ollama_model_dropdown" |
|
|
) |
|
|
set_preferred_btn = gr.Button("Set as Preferred", visible=False) |
|
|
|
|
|
preferred_llm_display = gr.Textbox(label="Preferred Model Status", interactive=False) |
|
|
|
|
|
with gr.Group(visible=False) as hf_config: |
|
|
gr.Markdown("### Hugging Face Configuration") |
|
|
gr.Markdown(f"**Model ID:** `{llm.HF_MODEL_ID}` (hardcoded).") |
|
|
gr.Markdown("To use this provider, please log in with your Hugging Face account. API calls will use your personal token.") |
|
|
login_button = gr.LoginButton() |
|
|
|
|
|
|
|
|
with gr.Tab("MCP Tools", visible=False): |
|
|
gr.Markdown("## Tools for MCP Server") |
|
|
|
|
|
|
|
|
mcp_video_path_input = gr.Textbox(label="Video Path for MCP") |
|
|
|
|
|
|
|
|
lc_word_input = gr.Textbox(label="Word") |
|
|
lc_letter_input = gr.Textbox(label="Letter") |
|
|
lc_output = gr.Number(label="Count") |
|
|
lc_btn = gr.Button("Count Letters") |
|
|
|
|
|
|
|
|
mcp_get_first_frame_btn = gr.Button("MCP Get First Frame") |
|
|
mcp_get_last_frame_btn = gr.Button("MCP Get Last Frame") |
|
|
mcp_convert_to_gif_btn = gr.Button("MCP Convert to GIF") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def update_provider_visibility(provider): |
|
|
if provider == "Ollama": |
|
|
return gr.update(visible=True), gr.update(visible=False) |
|
|
else: |
|
|
return gr.update(visible=False), gr.update(visible=True) |
|
|
|
|
|
llm_provider.change( |
|
|
fn=update_provider_visibility, |
|
|
inputs=llm_provider, |
|
|
outputs=[ollama_config, hf_config], |
|
|
show_progress=False |
|
|
) |
|
|
|
|
|
|
|
|
demo.load( |
|
|
fn=update_ffmpeg_status, |
|
|
outputs=[status_text, ffmpeg_installed_state] |
|
|
).then( |
|
|
fn=llm.load_settings, |
|
|
outputs=[ |
|
|
llm_provider, |
|
|
ollama_url_textbox, |
|
|
model_name_state, |
|
|
preferred_llm_display, |
|
|
] |
|
|
).then( |
|
|
fn=update_provider_visibility, |
|
|
inputs=llm_provider, |
|
|
outputs=[ollama_config, hf_config], |
|
|
show_progress=False |
|
|
).then( |
|
|
fn=llm.check_on_load, |
|
|
inputs=[ollama_url_textbox, model_name_state], |
|
|
outputs=[ollama_status_textbox, ollama_model_dropdown, set_preferred_btn, ollama_url_textbox, preferred_llm_display] |
|
|
) |
|
|
|
|
|
|
|
|
check_ffmpeg_btn.click( |
|
|
fn=update_ffmpeg_status, |
|
|
outputs=[status_text, ffmpeg_installed_state] |
|
|
) |
|
|
|
|
|
file_input.upload( |
|
|
fn=clear_previous_outputs, |
|
|
outputs=[ |
|
|
video_output, |
|
|
upload_status_text, |
|
|
uploaded_video_path_state, |
|
|
video_tools_group, |
|
|
first_frame_img, |
|
|
last_frame_img, |
|
|
llm_media_output, |
|
|
video_command_status, |
|
|
llm_debug_output, |
|
|
llm_raw_response |
|
|
] |
|
|
).then( |
|
|
fn=handle_video_upload, |
|
|
inputs=[file_input, ffmpeg_installed_state], |
|
|
outputs=[video_output, upload_status_text, uploaded_video_path_state, video_tools_group] |
|
|
) |
|
|
|
|
|
get_first_frame_btn.click( |
|
|
fn=video_tools.getFirstFrame, |
|
|
inputs=uploaded_video_path_state, |
|
|
outputs=first_frame_img |
|
|
) |
|
|
|
|
|
get_last_frame_btn.click( |
|
|
fn=video_tools.getLastFrame, |
|
|
inputs=uploaded_video_path_state, |
|
|
outputs=last_frame_img |
|
|
) |
|
|
|
|
|
|
|
|
check_endpoint_btn.click( |
|
|
fn=llm.check_ollama_endpoint, |
|
|
inputs=[ollama_url_textbox, model_name_state], |
|
|
outputs=[ollama_status_textbox, ollama_model_dropdown, set_preferred_btn, ollama_url_textbox] |
|
|
) |
|
|
|
|
|
set_preferred_btn.click( |
|
|
fn=llm.set_preferred_model, |
|
|
inputs=[ollama_model_dropdown, ollama_url_textbox], |
|
|
outputs=[model_name_state, preferred_llm_display] |
|
|
) |
|
|
|
|
|
|
|
|
process_prompt_btn.click( |
|
|
fn=llm.dispatch_video_prompt, |
|
|
inputs=[ |
|
|
llm_provider, |
|
|
video_prompt_textbox, |
|
|
uploaded_video_path_state, |
|
|
ollama_url_textbox, |
|
|
model_name_state, |
|
|
], |
|
|
outputs=[ |
|
|
llm_media_output, |
|
|
llm_debug_output, |
|
|
video_command_status, |
|
|
llm_raw_response |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
lc_btn.click( |
|
|
fn=letter_counter, |
|
|
inputs=[lc_word_input, lc_letter_input], |
|
|
outputs=lc_output, |
|
|
api_name="letter_counter" |
|
|
) |
|
|
|
|
|
mcp_get_first_frame_btn.click( |
|
|
fn=video_tools.getFirstFrame, |
|
|
inputs=[mcp_video_path_input], |
|
|
api_name="getFirstFrame" |
|
|
) |
|
|
|
|
|
mcp_get_last_frame_btn.click( |
|
|
fn=video_tools.getLastFrame, |
|
|
inputs=[mcp_video_path_input], |
|
|
api_name="getLastFrame" |
|
|
) |
|
|
|
|
|
mcp_convert_to_gif_btn.click( |
|
|
fn=video_tools.convert_mp4_to_gif, |
|
|
inputs=[mcp_video_path_input], |
|
|
api_name="convert_mp4_to_gif" |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch(mcp_server=True) |
|
|
|
|
|
|
|
|
|