File size: 21,963 Bytes
ba26b0a
 
e03938b
ba26b0a
e03938b
ba26b0a
 
 
e03938b
 
ba26b0a
e03938b
ba26b0a
 
e03938b
 
 
 
 
 
 
 
 
 
b526120
e03938b
 
 
ba26b0a
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ba26b0a
 
e03938b
 
b526120
e03938b
 
 
 
 
 
b526120
e03938b
b526120
e03938b
ba26b0a
 
e03938b
 
b526120
e03938b
 
 
 
 
b526120
 
e03938b
b526120
 
ba26b0a
e03938b
 
ba26b0a
b526120
 
 
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
b526120
e03938b
 
 
 
 
 
 
 
 
 
b526120
e03938b
b526120
 
e03938b
b526120
 
 
 
 
e03938b
 
 
 
 
 
ba26b0a
e03938b
 
 
b526120
 
e03938b
 
 
 
 
 
 
 
b526120
e03938b
b526120
 
 
e03938b
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b526120
 
 
e03938b
 
 
b526120
 
e03938b
 
b526120
e03938b
 
 
 
b526120
e03938b
b526120
 
 
e03938b
 
 
 
 
ba26b0a
b526120
e03938b
b526120
e03938b
b526120
 
 
e03938b
 
 
 
 
 
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b526120
 
 
e03938b
 
 
b526120
e03938b
 
 
 
 
 
b526120
 
 
 
e03938b
b526120
 
 
e03938b
b526120
e03938b
 
 
 
 
 
b526120
e03938b
ba26b0a
e03938b
 
 
 
 
b526120
 
e03938b
 
 
 
b526120
e03938b
 
 
 
 
b526120
 
e03938b
 
b526120
e03938b
 
 
 
 
b526120
 
e03938b
 
b526120
e03938b
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
 
 
b526120
e03938b
 
 
 
b526120
e03938b
 
b526120
 
e03938b
 
 
 
 
 
 
 
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
 
 
 
b526120
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
b526120
 
e03938b
b526120
 
e03938b
 
 
 
 
 
 
 
 
 
 
 
 
b526120
e03938b
 
 
 
 
b526120
e03938b
 
 
 
 
b526120
e03938b
 
 
 
 
b526120
 
ba26b0a
e03938b
b526120
 
e03938b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
import streamlit as st
from PIL import Image
import numpy as np
import torch
import gc
import os
import tempfile
import math
import imageio
import traceback

# --- Attempt to import moviepy for video processing ---
try:
    import moviepy.editor as mpy
    MOVIEPY_AVAILABLE = True
except (ImportError, OSError) as e:
    MOVIEPY_AVAILABLE = False
    st.warning(
        "MoviePy library is not available or ffmpeg is missing. "
        "Video syncing features will be disabled. "
        "If running locally, install with: pip install moviepy. Ensure ffmpeg is installed."
    )
    print(f"MoviePy load error: {e}")


# --- Model Configuration ---
IMAGE_CAPTION_MODEL = "Salesforce/blip-image-captioning-base"
AUDIO_GEN_MODEL = "facebook/musicgen-small"

# --- Constants ---
DEFAULT_NUM_FRAMES = 2        # Fewer frames for faster processing on free tier
DEFAULT_AUDIO_DURATION_S = 5  # Shorter audio for faster generation
MAX_FRAMES_TO_SHOW_UI = 3
DEVICE = torch.device("cpu") # Explicitly use CPU for Hugging Face free tier

# --- Page Setup ---
st.set_page_config(page_title="AI Video Sound Designer (HF Space)", layout="wide", page_icon="🎬")

st.title("🎬 AI Video Sound Designer (for Hugging Face Spaces)")
st.markdown("""
    Upload a short MP4 video. The tool will:
    1. Extract frames from the video.
    2. Analyze frames using an image captioning model to generate sound ideas.
    3. Synthesize audio using MusicGen based on these ideas.
    4. Optionally, combine the new audio with your video.
    ---
    **Note:** Processing on CPU (especially audio generation) can be slow. Please be patient!
""")

# --- Utility Functions ---
def clear_memory(model_obj=None, processor_obj=None):
    """Clears model objects from memory and runs garbage collection."""
    if model_obj:
        del model_obj
    if processor_obj:
        del processor_obj
    gc.collect()
    if torch.cuda.is_available(): # Though we target CPU, good practice
        torch.cuda.empty_cache()
    print("Memory cleared.")

@st.cache_resource
def load_image_caption_model_and_processor():
    """Loads the image captioning model and processor."""
    try:
        from transformers import BlipProcessor, BlipForConditionalGeneration
        st.write(f"Loading Image Captioning Model: {IMAGE_CAPTION_MODEL} (this might take a moment)...")
        processor = BlipProcessor.from_pretrained(IMAGE_CAPTION_MODEL)
        model = BlipForConditionalGeneration.from_pretrained(IMAGE_CAPTION_MODEL).to(DEVICE)
        st.toast("Image Captioning model loaded!", icon="πŸ–ΌοΈ")
        return processor, model
    except Exception as e:
        st.error(f"Error loading image captioning model: {e}")
        st.error(traceback.format_exc())
        return None, None

@st.cache_resource
def load_audio_gen_model_and_processor():
    """Loads the audio generation model and processor."""
    try:
        from transformers import AutoProcessor, MusicgenForConditionalGeneration
        st.write(f"Loading Audio Generation Model: {AUDIO_GEN_MODEL} (this might take a while on CPU)...")
        processor = AutoProcessor.from_pretrained(AUDIO_GEN_MODEL)
        model = MusicgenForConditionalGeneration.from_pretrained(AUDIO_GEN_MODEL).to(DEVICE)
        st.toast("Audio Generation model loaded! (CPU generation will be slow)", icon="🎢")
        return processor, model
    except Exception as e:
        st.error(f"Error loading audio generation model: {e}")
        st.error(traceback.format_exc())
        return None, None

def extract_frames_from_video(video_path, num_frames):
    """Extracts a specified number of frames evenly from a video."""
    frames = []
    reader = None
    try:
        reader = imageio.get_reader(video_path, "ffmpeg")
        total_frames = reader.count_frames()
        if total_frames == 0: # If count_frames fails, try metadata
            meta = reader.get_meta_data()
            duration = meta.get('duration')
            fps = meta.get('fps', 25)
            if duration:
                total_frames = int(duration * fps)
            else: # Fallback if duration isn't available
                 st.warning("Could not determine video length. Will attempt to read initial frames.")
                 # Try to read a few frames anyway if count fails
                 for i, frame_data in enumerate(reader):
                     if i < num_frames * 5: # Read a bit more than needed to find distinct frames
                         frames.append(Image.fromarray(frame_data).convert("RGB"))
                     if len(frames) >= num_frames:
                         break
                 if reader: reader.close()
                 return frames[::len(frames)//num_frames] if frames else []


        if total_frames < num_frames:
            indices = np.arange(total_frames)
        else:
            indices = np.linspace(0, total_frames - 1, num_frames, dtype=int, endpoint=True)
        
        actual_frames_extracted = 0
        for i in indices:
            if actual_frames_extracted >= num_frames:
                break
            try:
                frame_data = reader.get_data(i)
                frames.append(Image.fromarray(frame_data).convert("RGB"))
                actual_frames_extracted +=1
            except Exception as e:
                st.warning(f"Skipping problematic frame {i}: {e}")
                continue
        return frames
    except (imageio.core.fetching.NeedDownloadError, OSError) as e_ffmpeg:
        st.error(f"FFmpeg not found or failed: {e_ffmpeg}. Please ensure ffmpeg is installed and in PATH if running locally.")
        return []
    except Exception as e:
        st.error(f"Error extracting frames: {e}")
        st.error(traceback.format_exc())
        return []
    finally:
        if reader:
            reader.close()

def generate_sound_prompt_from_frames(frames, caption_processor, caption_model):
    """Generates sound descriptions from frames using BLIP."""
    if not frames:
        return "ambient background noise"
    
    descriptions = []
    instruction = "A short description of this image, focusing on elements that might produce sound:"
    
    with st.spinner(f"Generating sound ideas from {len(frames)} frames..."):
        for i, frame in enumerate(frames):
            try:
                inputs = caption_processor(images=frame, text=instruction, return_tensors="pt").to(DEVICE)
                # For BLIP, generate is typically used like this.
                # You might need to adjust max_length based on desired description length.
                generated_ids = caption_model.generate(**inputs, max_length=50) # Keep descriptions short
                description = caption_processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
                if description:
                    descriptions.append(description)
                st.progress((i + 1) / len(frames), text=f"Frame {i+1}/{len(frames)} analyzed.")
            except Exception as e:
                st.warning(f"Could not get description for a frame: {e}")
                continue
    
    if not descriptions:
        return "general ambiance, subtle environmental sounds" # Fallback
    
    # Simple combination: join unique descriptions
    unique_descriptions = list(dict.fromkeys(descriptions))
    combined_prompt = ". ".join(unique_descriptions)
    # Further processing to make it more like a sound design brief
    final_prompt = f"Sounds for a scene featuring: {combined_prompt}. Focus on atmosphere, key sound events, and textures."
    return final_prompt

def generate_audio_from_prompt(prompt, duration_s, audio_processor, audio_model, guidance, temp):
    """Generates audio using MusicGen."""
    try:
        inputs = audio_processor(text=[prompt], return_tensors="pt", padding=True).to(DEVICE)
        
        # MusicGen has a max sequence length for the prompt, often around 2048 tokens.
        # Forcing it to 512 to be safe on CPU and for typical descriptions.
        # The processor handles truncation.
        if inputs.input_ids.shape[1] > 512:
             st.warning(f"Prompt is long ({inputs.input_ids.shape[1]} tokens), might be truncated by MusicGen.")
        # inputs['input_ids'] = inputs['input_ids'][:, :512]
        # inputs['attention_mask'] = inputs['attention_mask'][:, :512]


        # Calculate max_new_tokens based on duration and model's token/sec rate
        # musicgen-small typically 50 tokens/second. Max output length ~2048 tokens.
        tokens_per_second = audio_model.config.audio_encoder.token_per_second # typically 50 for musicgen
        max_new_tokens = min(int(duration_s * tokens_per_second), 1500) # Cap at 1500 (30s) as a practical limit

        with st.spinner(f"Synthesizing {duration_s}s audio... (CPU: This will take several minutes!)"):
            # For CPU, do_sample=False might be faster but less diverse. Try True first.
            audio_values = audio_model.generate(
                **inputs,
                max_new_tokens=max_new_tokens,
                do_sample=True,
                guidance_scale=guidance,
                temperature=temp,
                # No pad_token_id for MusicGen's generate function, it uses eos_token_id for padding by default if needed
            )
        
        audio_array = audio_values[0, 0].cpu().numpy()
        sampling_rate = audio_model.config.audio_encoder.sampling_rate
        
        # Normalize
        if np.abs(audio_array).max() > 0:
            audio_array = audio_array / np.abs(audio_array).max() * 0.9
        return audio_array, sampling_rate
    except Exception as e:
        st.error(f"Error generating audio: {e}")
        st.error(traceback.format_exc())
        return None, None

def combine_audio_video(video_path, audio_array, sampling_rate, mix_original):
    """Combines generated audio with the video using MoviePy."""
    if not MOVIEPY_AVAILABLE:
        st.error("MoviePy is not available. Cannot combine audio and video.")
        return None

    output_video_path = None
    temp_audio_path = None
    video_clip = None
    generated_audio_clip = None
    final_clip = None

    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio:
            # Scipy.io.wavfile can also be used here, or soundfile
            import scipy.io.wavfile
            scipy.io.wavfile.write(tmp_audio.name, sampling_rate, audio_array)
            temp_audio_path = tmp_audio.name
        
        video_clip = mpy.VideoFileClip(video_path)
        generated_audio_clip = mpy.AudioFileClip(temp_audio_path)

        # Loop or trim generated audio to match video duration
        if generated_audio_clip.duration < video_clip.duration:
            generated_audio_clip = generated_audio_clip.fx(mpy.afx.audio_loop, duration=video_clip.duration)
        elif generated_audio_clip.duration > video_clip.duration:
            generated_audio_clip = generated_audio_clip.subclip(0, video_clip.duration)

        final_audio = generated_audio_clip
        if mix_original and video_clip.audio:
            # Adjust volumes for mixing
            original_audio = video_clip.audio.volumex(0.5) # Lower original audio
            generated_audio = generated_audio_clip.volumex(0.8) # Keep generated slightly louder
            final_audio = mpy.CompositeAudioClip([original_audio, generated_audio])
            final_audio = final_audio.set_duration(video_clip.duration) # Ensure composite duration matches

        final_clip = video_clip.set_audio(final_audio)
        
        with tempfile.NamedTemporaryFile(delete=False, suffix=".mp4") as tmp_video_out:
            output_video_path = tmp_video_out.name

        final_clip.write_videofile(
            output_video_path,
            codec="libx264",
            audio_codec="aac",
            temp_audiofile_path=os.path.dirname(temp_audio_path), # Ensure moviepy can write temp audio here
            threads=2, # Limit threads on free tier
            logger=None # or 'bar' for progress
        )
        return output_video_path

    except Exception as e:
        st.error(f"Error combining audio and video: {e}")
        st.error(traceback.format_exc())
        return None
    finally:
        # Close clips to release resources
        if video_clip: video_clip.close()
        if generated_audio_clip: generated_audio_clip.close()
        # if final_clip: final_clip.close() # final_clip is usually the same as video_clip with modified audio

        if temp_audio_path and os.path.exists(temp_audio_path):
            os.remove(temp_audio_path)
        # The output_video_path is handled by the caller (downloaded, then potentially cleaned up)

# --- Sidebar for Settings ---
with st.sidebar:
    st.header("βš™οΈ Settings")
    num_frames_analysis = st.slider("Number of Frames to Analyze", 1, 5, DEFAULT_NUM_FRAMES, 1,
                                   help="More frames provide more context but increase analysis time.")
    audio_duration = st.slider("Target Audio Duration (seconds)", 3, 15, DEFAULT_AUDIO_DURATION_S, 1,
                               help="Shorter durations generate much faster on CPU.")
    
    st.subheader("MusicGen Parameters")
    guidance = st.slider("Guidance Scale (MusicGen)", 1.0, 7.0, 3.0, 0.5,
                         help="Higher values make audio follow prompt more closely. Default is 3.0.")
    temperature = st.slider("Temperature (MusicGen)", 0.5, 1.5, 1.0, 0.1,
                            help="Controls randomness. Higher is more diverse. Default is 1.0.")
    
    if MOVIEPY_AVAILABLE:
        st.subheader("Video Output")
        mix_audio = st.checkbox("Mix with original video audio", value=False)
    else:
        mix_audio = False # Disable if moviepy not available


# --- Main Application Logic ---
uploaded_file = st.file_uploader("πŸ“€ Upload your MP4 video file (short clips recommended):", type=["mp4", "mov", "avi"])

# Initialize session state for generated file paths
if 'generated_audio_file' not in st.session_state:
    st.session_state.generated_audio_file = None
if 'output_video_file' not in st.session_state:
    st.session_state.output_video_file = None


if uploaded_file is not None:
    st.video(uploaded_file)
    
    # Use a button to trigger processing
    if st.button("✨ Generate Sound Design!", type="primary", use_container_width=True):
        # --- Clear previous results ---
        if st.session_state.generated_audio_file and os.path.exists(st.session_state.generated_audio_file):
            os.remove(st.session_state.generated_audio_file)
        st.session_state.generated_audio_file = None
        if st.session_state.output_video_file and os.path.exists(st.session_state.output_video_file):
            os.remove(st.session_state.output_video_file)
        st.session_state.output_video_file = None
        clear_memory()

        video_bytes = uploaded_file.read()
        temp_video_path = None

        try:
            with tempfile.NamedTemporaryFile(delete=False, suffix=os.path.splitext(uploaded_file.name)[1]) as tmp_vid:
                tmp_vid.write(video_bytes)
                temp_video_path = tmp_vid.name
            
            # === Stage 1: Frame Extraction ===
            st.subheader("1. Extracting Frames")
            with st.spinner("Extracting frames from video..."):
                frames = extract_frames_from_video(temp_video_path, num_frames_analysis)
            
            if not frames:
                st.error("No frames extracted. Cannot proceed.")
                st.stop()
            
            st.success(f"Extracted {len(frames)} frames.")
            if frames:
                cols_to_show = min(len(frames), MAX_FRAMES_TO_SHOW_UI)
                if cols_to_show > 0:
                    st.write("Sampled Frames:")
                    cols = st.columns(cols_to_show)
                    for i, frame_img in enumerate(frames[:cols_to_show]):
                        cols[i].image(frame_img, caption=f"Frame {i+1}", use_column_width=True)
            
            # === Stage 2: Image Captioning (Sound Prompt Generation) ===
            st.subheader("2. Generating Sound Ideas (Image Analysis)")
            caption_processor, caption_model = load_image_caption_model_and_processor()
            if caption_processor and caption_model:
                sound_prompt = generate_sound_prompt_from_frames(frames, caption_processor, caption_model)
                st.info(f"✍️ **Generated Sound Prompt:** {sound_prompt}")
                
                # Unload captioning model immediately
                clear_memory(caption_model, caption_processor)
            else:
                st.error("Failed to load image captioning model. Using a default prompt.")
                sound_prompt = "ambient nature sounds with a gentle breeze" # Fallback
            
            # === Stage 3: Audio Generation ===
            st.subheader("3. Synthesizing Audio (MusicGen)")
            st.warning("🎧 Audio generation on CPU can take several minutes. Please be patient!")
            audio_processor, audio_model = load_audio_gen_model_and_processor()
            generated_audio_array, sr = None, None # Initialize

            if audio_processor and audio_model:
                generated_audio_array, sr = generate_audio_from_prompt(sound_prompt, audio_duration, audio_processor, audio_model, guidance, temperature)
                # Unload audio model immediately
                clear_memory(audio_model, audio_processor)
            else:
                st.error("Failed to load audio generation model. Cannot generate audio.")

            if generated_audio_array is not None and sr is not None:
                st.success("Audio generated!")
                st.audio(generated_audio_array, sample_rate=sr)
                
                # Save audio for download
                with tempfile.NamedTemporaryFile(delete=False, suffix=".wav") as tmp_audio_out:
                    import scipy.io.wavfile # or soundfile
                    scipy.io.wavfile.write(tmp_audio_out.name, sr, generated_audio_array)
                    st.session_state.generated_audio_file = tmp_audio_out.name
                
                with open(st.session_state.generated_audio_file, "rb") as f:
                    st.download_button(
                        "πŸ“₯ Download Generated Audio (.wav)", 
                        f, 
                        file_name="generated_sound.wav",
                        mime="audio/wav"
                    )

                # === Stage 4: (Optional) Video and Audio Syncing ===
                if MOVIEPY_AVAILABLE:
                    st.subheader("4. Combining Audio with Video")
                    with st.spinner("Processing video with new audio... (can be slow)"):
                        output_video_file_path = combine_audio_video(temp_video_path, generated_audio_array, sr, mix_audio)
                    
                    if output_video_file_path and os.path.exists(output_video_file_path):
                        st.success("Video processing complete!")
                        st.video(output_video_file_path)
                        st.session_state.output_video_file = output_video_file_path
                        
                        with open(output_video_file_path, "rb") as f_vid:
                            st.download_button(
                                "🎬 Download Video with New Sound (.mp4)",
                                f_vid,
                                file_name="video_with_new_sound.mp4",
                                mime="video/mp4"
                            )
                    elif MOVIEPY_AVAILABLE: # Only show error if moviepy was expected to work
                         st.error("Failed to combine audio and video.")
            else:
                st.error("Audio generation failed. Cannot proceed to video syncing.")

        except Exception as e:
            st.error(f"An unexpected error occurred in the main processing pipeline: {e}")
            st.error(traceback.format_exc())
        finally:
            if temp_video_path and os.path.exists(temp_video_path):
                os.remove(temp_video_path)
            # Models are cleared within their stages using clear_memory()
            # Generated download files (audio/video) are kept in session_state until next run or session ends
            print("Main processing finished or errored. Temp video (if any) cleaned up.")
            clear_memory() # Final catch-all clear

    # Display download buttons if files were generated in a previous run within the session
    elif st.session_state.generated_audio_file and os.path.exists(st.session_state.generated_audio_file):
        st.markdown("---")
        st.write("Previously generated audio:")
        st.audio(st.session_state.generated_audio_file)
        with open(st.session_state.generated_audio_file, "rb") as f:
            st.download_button(
                "πŸ“₯ Download Previously Generated Audio (.wav)",
                f,
                file_name="generated_sound_previous.wav",
                mime="audio/wav",
                key="prev_audio_dl"
            )
    if st.session_state.output_video_file and os.path.exists(st.session_state.output_video_file) and MOVIEPY_AVAILABLE:
        st.markdown("---")
        st.write("Previously generated video with new sound:")
        st.video(st.session_state.output_video_file)
        with open(st.session_state.output_video_file, "rb") as f_vid:
            st.download_button(
                "🎬 Download Previously Generated Video (.mp4)",
                f_vid,
                file_name="video_with_new_sound_previous.mp4",
                mime="video/mp4",
                key="prev_video_dl"
            )

else:
    st.info("☝️ Upload a video to get started.")

st.markdown("---")
st.markdown("Made for Hugging Face Spaces. Model loading & generation can be slow on CPU.")