|
|
import streamlit as st |
|
|
from PIL import Image |
|
|
import numpy as np |
|
|
import torch |
|
|
import gc |
|
|
import os |
|
|
import tempfile |
|
|
import math |
|
|
import imageio |
|
|
import traceback |
|
|
|
|
|
|
|
|
try: |
|
|
import moviepy.editor as mpy |
|
|
MOVIEPY_AVAILABLE = True |
|
|
except (ImportError, OSError) as e: |
|
|
MOVIEPY_AVAILABLE = False |
|
|
st.warning( |
|
|
"MoviePy library is not available or ffmpeg is missing. " |
|
|
"Video syncing features will be disabled. " |
|
|
"If running locally, install with: pip install moviepy. Ensure ffmpeg is installed." |
|
|
) |
|
|
print(f"MoviePy load error: {e}") |
|
|
|
|
|
|
|
|
|
|
|
IMAGE_CAPTION_MODEL = "Salesforce/blip-image-captioning-base" |
|
|
AUDIO_GEN_MODEL = "facebook/musicgen-small" |
|
|
|
|
|
|
|
|
DEFAULT_NUM_FRAMES = 2 |
|
|
DEFAULT_AUDIO_DURATION_S = 5 |
|
|
MAX_FRAMES_TO_SHOW_UI = 3 |
|
|
DEVICE = torch.device("cpu") |
|
|
|
|
|
|
|
|
st.set_page_config(page_title="AI Video Sound Designer (HF Space)", layout="wide", page_icon="π¬") |
|
|
|
|
|
st.title("π¬ AI Video Sound Designer (for Hugging Face Spaces)") |
|
|
st.markdown(""" |
|
|
Upload a short MP4 video. The tool will: |
|
|
1. Extract frames from the video. |
|
|
2. Analyze frames using an image captioning model to generate sound ideas. |
|
|
3. Synthesize audio using MusicGen based on these ideas. |
|
|
4. Optionally, combine the new audio with your video. |
|
|
--- |
|
|
**Note:** Processing on CPU (especially audio generation) can be slow. Please be patient! |
|
|
""") |
|
|
|
|
|
|
|
|
def clear_memory(model_obj=None, processor_obj=None): |
|
|
"""Clears model objects from memory and runs garbage collection.""" |
|
|
if model_obj: |
|
|
del model_obj |
|
|
if processor_obj: |
|
|
del processor_obj |
|
|
gc.collect() |
|
|
if torch.cuda.is_available(): |
|
|
torch.cuda.empty_cache() |
|
|
print("Memory cleared.") |
|
|
|
|
|
@st.cache_resource |
|
|
def load_image_caption_model_and_processor(): |
|
|
"""Loads the image captioning model and processor.""" |
|
|
try: |
|
|
from transformers import BlipProcessor, BlipForConditionalGeneration |
|
|
st.write(f"Loading Image Captioning Model: {IMAGE_CAPTION_MODEL} (this might take a moment)...") |
|
|
processor = BlipProcessor.from_pretrained(IMAGE_CAPTION_MODEL) |
|
|
model = BlipForConditionalGeneration.from_pretrained(IMAGE_CAPTION_MODEL).to(DEVICE) |
|
|
st.toast("Image Captioning model loaded!", icon="πΌοΈ") |
|
|
return processor, model |
|
|
except Exception as e: |
|
|
st.error(f"Error loading image captioning model: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
return None, None |
|
|
|
|
|
@st.cache_resource |
|
|
def load_audio_gen_model_and_processor(): |
|
|
"""Loads the audio generation model and processor.""" |
|
|
try: |
|
|
from transformers import AutoProcessor, MusicgenForConditionalGeneration |
|
|
st.write(f"Loading Audio Generation Model: {AUDIO_GEN_MODEL} (this might take a while on CPU)...") |
|
|
processor = AutoProcessor.from_pretrained(AUDIO_GEN_MODEL) |
|
|
model = MusicgenForConditionalGeneration.from_pretrained(AUDIO_GEN_MODEL).to(DEVICE) |
|
|
st.toast("Audio Generation model loaded! (CPU generation will be slow)", icon="πΆ") |
|
|
return processor, model |
|
|
except Exception as e: |
|
|
st.error(f"Error loading audio generation model: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
return None, None |
|
|
|
|
|
def extract_frames_from_video(video_path, num_frames): |
|
|
"""Extracts a specified number of frames evenly from a video.""" |
|
|
frames = [] |
|
|
reader = None |
|
|
try: |
|
|
reader = imageio.get_reader(video_path, "ffmpeg") |
|
|
total_frames = reader.count_frames() |
|
|
if total_frames == 0: |
|
|
meta = reader.get_meta_data() |
|
|
duration = meta.get('duration') |
|
|
fps = meta.get('fps', 25) |
|
|
if duration: |
|
|
total_frames = int(duration * fps) |
|
|
else: |
|
|
st.warning("Could not determine video length. Will attempt to read initial frames.") |
|
|
|
|
|
for i, frame_data in enumerate(reader): |
|
|
if i < num_frames * 5: |
|
|
frames.append(Image.fromarray(frame_data).convert("RGB")) |
|
|
if len(frames) >= num_frames: |
|
|
break |
|
|
if reader: reader.close() |
|
|
return frames[::len(frames)//num_frames] if frames else [] |
|
|
|
|
|
|
|
|
if total_frames < num_frames: |
|
|
indices = np.arange(total_frames) |
|
|
else: |
|
|
indices = np.linspace(0, total_frames - 1, num_frames, dtype=int, endpoint=True) |
|
|
|
|
|
actual_frames_extracted = 0 |
|
|
for i in indices: |
|
|
if actual_frames_extracted >= num_frames: |
|
|
break |
|
|
try: |
|
|
frame_data = reader.get_data(i) |
|
|
frames.append(Image.fromarray(frame_data).convert("RGB")) |
|
|
actual_frames_extracted +=1 |
|
|
except Exception as e: |
|
|
st.warning(f"Skipping problematic frame {i}: {e}") |
|
|
continue |
|
|
return frames |
|
|
except (imageio.core.fetching.NeedDownloadError, OSError) as e_ffmpeg: |
|
|
st.error(f"FFmpeg not found or failed: {e_ffmpeg}. Please ensure ffmpeg is installed and in PATH if running locally.") |
|
|
return [] |
|
|
except Exception as e: |
|
|
st.error(f"Error extracting frames: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
return [] |
|
|
finally: |
|
|
if reader: |
|
|
reader.close() |
|
|
|
|
|
def generate_sound_prompt_from_frames(frames, caption_processor, caption_model): |
|
|
"""Generates sound descriptions from frames using BLIP.""" |
|
|
if not frames: |
|
|
return "ambient background noise" |
|
|
|
|
|
descriptions = [] |
|
|
instruction = "A short description of this image, focusing on elements that might produce sound:" |
|
|
|
|
|
with st.spinner(f"Generating sound ideas from {len(frames)} frames..."): |
|
|
for i, frame in enumerate(frames): |
|
|
try: |
|
|
inputs = caption_processor(images=frame, text=instruction, return_tensors="pt").to(DEVICE) |
|
|
|
|
|
|
|
|
generated_ids = caption_model.generate(**inputs, max_length=50) |
|
|
description = caption_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip() |
|
|
if description: |
|
|
descriptions.append(description) |
|
|
st.progress((i + 1) / len(frames), text=f"Frame {i+1}/{len(frames)} analyzed.") |
|
|
except Exception as e: |
|
|
st.warning(f"Could not get description for a frame: {e}") |
|
|
continue |
|
|
|
|
|
if not descriptions: |
|
|
return "general ambiance, subtle environmental sounds" |
|
|
|
|
|
|
|
|
unique_descriptions = list(dict.fromkeys(descriptions)) |
|
|
combined_prompt = ". ".join(unique_descriptions) |
|
|
|
|
|
final_prompt = f"Sounds for a scene featuring: {combined_prompt}. Focus on atmosphere, key sound events, and textures." |
|
|
return final_prompt |
|
|
|
|
|
def generate_audio_from_prompt(prompt, duration_s, audio_processor, audio_model, guidance, temp): |
|
|
"""Generates audio using MusicGen.""" |
|
|
try: |
|
|
inputs = audio_processor(text=[prompt], return_tensors="pt", padding=True).to(DEVICE) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if inputs.input_ids.shape[1] > 512: |
|
|
st.warning(f"Prompt is long ({inputs.input_ids.shape[1]} tokens), might be truncated by MusicGen.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tokens_per_second = audio_model.config.audio_encoder.token_per_second |
|
|
max_new_tokens = min(int(duration_s * tokens_per_second), 1500) |
|
|
|
|
|
with st.spinner(f"Synthesizing {duration_s}s audio... (CPU: This will take several minutes!)"): |
|
|
|
|
|
audio_values = audio_model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=max_new_tokens, |
|
|
do_sample=True, |
|
|
guidance_scale=guidance, |
|
|
temperature=temp, |
|
|
|
|
|
) |
|
|
|
|
|
audio_array = audio_values[0, 0].cpu().numpy() |
|
|
sampling_rate = audio_model.config.audio_encoder.sampling_rate |
|
|
|
|
|
|
|
|
if np.abs(audio_array).max() > 0: |
|
|
audio_array = audio_array / np.abs(audio_array).max() * 0.9 |
|
|
return audio_array, sampling_rate |
|
|
except Exception as e: |
|
|
st.error(f"Error generating audio: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
return None, None |
|
|
|
|
|
def combine_audio_video(video_path, audio_array, sampling_rate, mix_original): |
|
|
"""Combines generated audio with the video using MoviePy.""" |
|
|
if not MOVIEPY_AVAILABLE: |
|
|
st.error("MoviePy is not available. Cannot combine audio and video.") |
|
|
return None |
|
|
|
|
|
output_video_path = None |
|
|
temp_audio_path = None |
|
|
video_clip = None |
|
|
generated_audio_clip = None |
|
|
final_clip = None |
|
|
|
|
|
try: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio: |
|
|
|
|
|
import scipy.io.wavfile |
|
|
scipy.io.wavfile.write(tmp_audio.name, sampling_rate, audio_array) |
|
|
temp_audio_path = tmp_audio.name |
|
|
|
|
|
video_clip = mpy.VideoFileClip(video_path) |
|
|
generated_audio_clip = mpy.AudioFileClip(temp_audio_path) |
|
|
|
|
|
|
|
|
if generated_audio_clip.duration < video_clip.duration: |
|
|
generated_audio_clip = generated_audio_clip.fx(mpy.afx.audio_loop, duration=video_clip.duration) |
|
|
elif generated_audio_clip.duration > video_clip.duration: |
|
|
generated_audio_clip = generated_audio_clip.subclip(0, video_clip.duration) |
|
|
|
|
|
final_audio = generated_audio_clip |
|
|
if mix_original and video_clip.audio: |
|
|
|
|
|
original_audio = video_clip.audio.volumex(0.5) |
|
|
generated_audio = generated_audio_clip.volumex(0.8) |
|
|
final_audio = mpy.CompositeAudioClip([original_audio, generated_audio]) |
|
|
final_audio = final_audio.set_duration(video_clip.duration) |
|
|
|
|
|
final_clip = video_clip.set_audio(final_audio) |
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_out: |
|
|
output_video_path = tmp_video_out.name |
|
|
|
|
|
final_clip.write_videofile( |
|
|
output_video_path, |
|
|
codec="libx264", |
|
|
audio_codec="aac", |
|
|
temp_audiofile_path=os.path.dirname(temp_audio_path), |
|
|
threads=2, |
|
|
logger=None |
|
|
) |
|
|
return output_video_path |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"Error combining audio and video: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
return None |
|
|
finally: |
|
|
|
|
|
if video_clip: video_clip.close() |
|
|
if generated_audio_clip: generated_audio_clip.close() |
|
|
|
|
|
|
|
|
if temp_audio_path and os.path.exists(temp_audio_path): |
|
|
os.remove(temp_audio_path) |
|
|
|
|
|
|
|
|
|
|
|
with st.sidebar: |
|
|
st.header("βοΈ Settings") |
|
|
num_frames_analysis = st.slider("Number of Frames to Analyze", 1, 5, DEFAULT_NUM_FRAMES, 1, |
|
|
help="More frames provide more context but increase analysis time.") |
|
|
audio_duration = st.slider("Target Audio Duration (seconds)", 3, 15, DEFAULT_AUDIO_DURATION_S, 1, |
|
|
help="Shorter durations generate much faster on CPU.") |
|
|
|
|
|
st.subheader("MusicGen Parameters") |
|
|
guidance = st.slider("Guidance Scale (MusicGen)", 1.0, 7.0, 3.0, 0.5, |
|
|
help="Higher values make audio follow prompt more closely. Default is 3.0.") |
|
|
temperature = st.slider("Temperature (MusicGen)", 0.5, 1.5, 1.0, 0.1, |
|
|
help="Controls randomness. Higher is more diverse. Default is 1.0.") |
|
|
|
|
|
if MOVIEPY_AVAILABLE: |
|
|
st.subheader("Video Output") |
|
|
mix_audio = st.checkbox("Mix with original video audio", value=False) |
|
|
else: |
|
|
mix_audio = False |
|
|
|
|
|
|
|
|
|
|
|
uploaded_file = st.file_uploader("π€ Upload your MP4 video file (short clips recommended):", type=["mp4", "mov", "avi"]) |
|
|
|
|
|
|
|
|
if 'generated_audio_file' not in st.session_state: |
|
|
st.session_state.generated_audio_file = None |
|
|
if 'output_video_file' not in st.session_state: |
|
|
st.session_state.output_video_file = None |
|
|
|
|
|
|
|
|
if uploaded_file is not None: |
|
|
st.video(uploaded_file) |
|
|
|
|
|
|
|
|
if st.button("β¨ Generate Sound Design!", type="primary", use_container_width=True): |
|
|
|
|
|
if st.session_state.generated_audio_file and os.path.exists(st.session_state.generated_audio_file): |
|
|
os.remove(st.session_state.generated_audio_file) |
|
|
st.session_state.generated_audio_file = None |
|
|
if st.session_state.output_video_file and os.path.exists(st.session_state.output_video_file): |
|
|
os.remove(st.session_state.output_video_file) |
|
|
st.session_state.output_video_file = None |
|
|
clear_memory() |
|
|
|
|
|
video_bytes = uploaded_file.read() |
|
|
temp_video_path = None |
|
|
|
|
|
try: |
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_vid: |
|
|
tmp_vid.write(video_bytes) |
|
|
temp_video_path = tmp_vid.name |
|
|
|
|
|
|
|
|
st.subheader("1. Extracting Frames") |
|
|
with st.spinner("Extracting frames from video..."): |
|
|
frames = extract_frames_from_video(temp_video_path, num_frames_analysis) |
|
|
|
|
|
if not frames: |
|
|
st.error("No frames extracted. Cannot proceed.") |
|
|
st.stop() |
|
|
|
|
|
st.success(f"Extracted {len(frames)} frames.") |
|
|
if frames: |
|
|
cols_to_show = min(len(frames), MAX_FRAMES_TO_SHOW_UI) |
|
|
if cols_to_show > 0: |
|
|
st.write("Sampled Frames:") |
|
|
cols = st.columns(cols_to_show) |
|
|
for i, frame_img in enumerate(frames[:cols_to_show]): |
|
|
cols[i].image(frame_img, caption=f"Frame {i+1}", use_column_width=True) |
|
|
|
|
|
|
|
|
st.subheader("2. Generating Sound Ideas (Image Analysis)") |
|
|
caption_processor, caption_model = load_image_caption_model_and_processor() |
|
|
if caption_processor and caption_model: |
|
|
sound_prompt = generate_sound_prompt_from_frames(frames, caption_processor, caption_model) |
|
|
st.info(f"βοΈ **Generated Sound Prompt:** {sound_prompt}") |
|
|
|
|
|
|
|
|
clear_memory(caption_model, caption_processor) |
|
|
else: |
|
|
st.error("Failed to load image captioning model. Using a default prompt.") |
|
|
sound_prompt = "ambient nature sounds with a gentle breeze" |
|
|
|
|
|
|
|
|
st.subheader("3. Synthesizing Audio (MusicGen)") |
|
|
st.warning("π§ Audio generation on CPU can take several minutes. Please be patient!") |
|
|
audio_processor, audio_model = load_audio_gen_model_and_processor() |
|
|
generated_audio_array, sr = None, None |
|
|
|
|
|
if audio_processor and audio_model: |
|
|
generated_audio_array, sr = generate_audio_from_prompt(sound_prompt, audio_duration, audio_processor, audio_model, guidance, temperature) |
|
|
|
|
|
clear_memory(audio_model, audio_processor) |
|
|
else: |
|
|
st.error("Failed to load audio generation model. Cannot generate audio.") |
|
|
|
|
|
if generated_audio_array is not None and sr is not None: |
|
|
st.success("Audio generated!") |
|
|
st.audio(generated_audio_array, sample_rate=sr) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_out: |
|
|
import scipy.io.wavfile |
|
|
scipy.io.wavfile.write(tmp_audio_out.name, sr, generated_audio_array) |
|
|
st.session_state.generated_audio_file = tmp_audio_out.name |
|
|
|
|
|
with open(st.session_state.generated_audio_file, "rb") as f: |
|
|
st.download_button( |
|
|
"π₯ Download Generated Audio (.wav)", |
|
|
f, |
|
|
file_name="generated_sound.wav", |
|
|
mime="audio/wav" |
|
|
) |
|
|
|
|
|
|
|
|
if MOVIEPY_AVAILABLE: |
|
|
st.subheader("4. Combining Audio with Video") |
|
|
with st.spinner("Processing video with new audio... (can be slow)"): |
|
|
output_video_file_path = combine_audio_video(temp_video_path, generated_audio_array, sr, mix_audio) |
|
|
|
|
|
if output_video_file_path and os.path.exists(output_video_file_path): |
|
|
st.success("Video processing complete!") |
|
|
st.video(output_video_file_path) |
|
|
st.session_state.output_video_file = output_video_file_path |
|
|
|
|
|
with open(output_video_file_path, "rb") as f_vid: |
|
|
st.download_button( |
|
|
"π¬ Download Video with New Sound (.mp4)", |
|
|
f_vid, |
|
|
file_name="video_with_new_sound.mp4", |
|
|
mime="video/mp4" |
|
|
) |
|
|
elif MOVIEPY_AVAILABLE: |
|
|
st.error("Failed to combine audio and video.") |
|
|
else: |
|
|
st.error("Audio generation failed. Cannot proceed to video syncing.") |
|
|
|
|
|
except Exception as e: |
|
|
st.error(f"An unexpected error occurred in the main processing pipeline: {e}") |
|
|
st.error(traceback.format_exc()) |
|
|
finally: |
|
|
if temp_video_path and os.path.exists(temp_video_path): |
|
|
os.remove(temp_video_path) |
|
|
|
|
|
|
|
|
print("Main processing finished or errored. Temp video (if any) cleaned up.") |
|
|
clear_memory() |
|
|
|
|
|
|
|
|
elif st.session_state.generated_audio_file and os.path.exists(st.session_state.generated_audio_file): |
|
|
st.markdown("---") |
|
|
st.write("Previously generated audio:") |
|
|
st.audio(st.session_state.generated_audio_file) |
|
|
with open(st.session_state.generated_audio_file, "rb") as f: |
|
|
st.download_button( |
|
|
"π₯ Download Previously Generated Audio (.wav)", |
|
|
f, |
|
|
file_name="generated_sound_previous.wav", |
|
|
mime="audio/wav", |
|
|
key="prev_audio_dl" |
|
|
) |
|
|
if st.session_state.output_video_file and os.path.exists(st.session_state.output_video_file) and MOVIEPY_AVAILABLE: |
|
|
st.markdown("---") |
|
|
st.write("Previously generated video with new sound:") |
|
|
st.video(st.session_state.output_video_file) |
|
|
with open(st.session_state.output_video_file, "rb") as f_vid: |
|
|
st.download_button( |
|
|
"π¬ Download Previously Generated Video (.mp4)", |
|
|
f_vid, |
|
|
file_name="video_with_new_sound_previous.mp4", |
|
|
mime="video/mp4", |
|
|
key="prev_video_dl" |
|
|
) |
|
|
|
|
|
else: |
|
|
st.info("βοΈ Upload a video to get started.") |
|
|
|
|
|
st.markdown("---") |
|
|
st.markdown("Made for Hugging Face Spaces. Model loading & generation can be slow on CPU.") |