Spaces:
Running
Running
| import gradio as gr | |
| from gradio_pdf import PDF | |
| import os | |
| import sys | |
| import spaces | |
| import json | |
| import uuid | |
| import pandas as pd | |
| import asyncio | |
| from datetime import datetime | |
| from dotenv import load_dotenv | |
| # from mcp import ClientSession, StdioServerParameters | |
| # from mcp.client.stdio import stdio_client | |
| from huggingface_hub import HfApi, hf_hub_download, upload_file | |
| # Professional PDF Libraries | |
| from reportlab.lib.pagesizes import letter | |
| from reportlab.platypus import SimpleDocTemplate, Paragraph, Spacer, PageBreak | |
| from reportlab.lib.styles import getSampleStyleSheet, ParagraphStyle | |
| from reportlab.lib.enums import TA_JUSTIFY, TA_CENTER | |
| from mcp import ClientSession | |
| from mcp.client.sse import sse_client | |
| # FastMCP SSE app is mounted at /mcp -> net path /mcp/sse | |
| MCP_SSE_URL = "http://127.0.0.1:7860/mcp/sse" | |
| # Load environment variables | |
| load_dotenv() | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| # --- CONFIG --- | |
| DATASET_REPO = "daniel-was-taken/hugging_hack" | |
| # SERVER_PATH = "./server.py" | |
| # --- HELPER CLASSES --- | |
| class LibraryManager: | |
| def __init__(self): | |
| self.api = HfApi(token=HF_TOKEN) | |
| self.local_file = "library.json" | |
| def fetch_library(self): | |
| # Try to fetch from HF hub first. If that fails, read a local copy if present. | |
| try: | |
| path = hf_hub_download(repo_id=DATASET_REPO, filename=self.local_file, repo_type="dataset", token=HF_TOKEN) | |
| with open(path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception: | |
| # Fallback: load local file if exists (useful when HF_TOKEN missing or upload failed) | |
| if os.path.exists(self.local_file): | |
| try: | |
| with open(self.local_file, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| except Exception: | |
| return [] | |
| return [] | |
| def get_pdf_path(self, pdf_filename): | |
| try: | |
| if os.path.exists(pdf_filename): return os.path.abspath(pdf_filename) | |
| # Try hub download; if that fails, return None so caller can regenerate locally | |
| try: | |
| return hf_hub_download(repo_id=DATASET_REPO, filename=pdf_filename, repo_type="dataset", token=HF_TOKEN) | |
| except Exception: | |
| return None | |
| except: return None | |
| def save_novel(self, title, author, genre, content, local_pdf_path): | |
| library = self.fetch_library() | |
| pdf_filename = os.path.basename(local_pdf_path) | |
| # Try to upload the PDF first (if token provided); if upload fails, record that | |
| pdf_uploaded = False | |
| if HF_TOKEN: | |
| try: | |
| self.api.upload_file(path_or_fileobj=local_pdf_path, path_in_repo=pdf_filename, repo_id=DATASET_REPO, repo_type="dataset") | |
| pdf_uploaded = True | |
| except Exception as e: | |
| print(f"[LibraryManager] PDF upload failed: {e}") | |
| pdf_uploaded = False | |
| novel_entry = { | |
| "id": str(uuid.uuid4())[:8], | |
| "title": title, "author": author, "genre": genre, "likes": 0, | |
| "timestamp": datetime.now().isoformat(), | |
| "content": content, "pdf_filename": pdf_filename | |
| } | |
| library.insert(0, novel_entry) | |
| # Sync library.json to repo (or at least save locally) | |
| sync_ok = self._sync_hub(library) | |
| if sync_ok: | |
| return "Published successfully!" | |
| else: | |
| # If sync failed but PDF upload succeeded we still want to surface that to the user | |
| if pdf_uploaded: | |
| return "Published locally but failed to sync library to the Hub. Check HF_TOKEN and repo permissions." | |
| return "Publish failed: unable to sync to Hugging Face Hub. Ensure HF_TOKEN is set and has repo write permissions." | |
| def like_novel(self, novel_id, liked_session, request: gr.Request): | |
| # if request is None or request.username is None: | |
| # return self.get_leaderboard(), "β οΈ Login to HF to like.", None | |
| library = self.fetch_library() | |
| updated_likes = 0 | |
| msg = "" | |
| # Toggle Logic | |
| if novel_id in liked_session: | |
| # Unlike | |
| for book in library: | |
| if book['id'] == novel_id: | |
| book['likes'] = max(0, book['likes'] - 1) | |
| updated_likes = book['likes'] | |
| break | |
| liked_session.remove(novel_id) | |
| msg = "π Unliked" | |
| else: | |
| # Like | |
| for book in library: | |
| if book['id'] == novel_id: | |
| book['likes'] += 1 | |
| updated_likes = book['likes'] | |
| break | |
| liked_session.append(novel_id) | |
| msg = "β€οΈ Liked!" | |
| sync_ok = self._sync_hub(library) | |
| if not sync_ok: | |
| # Append warning to user-facing message | |
| msg = msg + " β Warning: failed to sync like to Hugging Face Hub." | |
| return self.get_leaderboard(), msg, updated_likes | |
| def _sync_hub(self, data): | |
| # Always write a local copy first | |
| try: | |
| with open(self.local_file, "w", encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| except Exception as e: | |
| print(f"[LibraryManager] Failed to write local library file: {e}") | |
| return False | |
| # If no token is provided, we cannot push to the hub. Return False to indicate no sync. | |
| if not HF_TOKEN: | |
| print("[LibraryManager] HF_TOKEN not provided; skipping hub upload.") | |
| return False | |
| try: | |
| self.api.upload_file(path_or_fileobj=self.local_file, path_in_repo=self.local_file, repo_id=DATASET_REPO, repo_type="dataset") | |
| return True | |
| except Exception as e: | |
| print(f"[LibraryManager] Failed to upload library.json to hub: {e}") | |
| return False | |
| def get_leaderboard(self): | |
| library = self.fetch_library() | |
| library.sort(key=lambda x: x.get('likes', 0), reverse=True) | |
| return [[b['title'], b['author'], b['genre'], b['likes'], b['id'], b.get('content', ''), b.get('pdf_filename', '')] for b in library] | |
| def create_typeset_pdf(title, author, raw_text): | |
| filename = f"novel_{uuid.uuid4().hex[:6]}.pdf" | |
| doc = SimpleDocTemplate(filename, pagesize=letter, rightMargin=72, leftMargin=72, topMargin=72, bottomMargin=72) | |
| styles = getSampleStyleSheet() | |
| # FIX: Use unique names to avoid KeyError if styles are added multiple times | |
| # Checking if style exists or just using a safe name convention | |
| if 'CustomJustify' not in styles: | |
| styles.add(ParagraphStyle(name='CustomJustify', alignment=TA_JUSTIFY, fontName='Times-Roman', fontSize=12, leading=16, spaceAfter=12)) | |
| if 'CustomHeader' not in styles: | |
| styles.add(ParagraphStyle(name='CustomHeader', alignment=TA_CENTER, fontName='Times-Bold', fontSize=18, leading=22, spaceAfter=24, spaceBefore=48)) | |
| if 'CustomTitle' not in styles: | |
| styles.add(ParagraphStyle(name='CustomTitle', alignment=TA_CENTER, fontName='Times-Bold', fontSize=32, leading=40, spaceAfter=10, spaceBefore=200)) | |
| if 'CustomAuthor' not in styles: | |
| styles.add(ParagraphStyle(name='CustomAuthor', alignment=TA_CENTER, fontName='Times-Italic', fontSize=16, leading=20, spaceAfter=100)) | |
| Story = [] | |
| Story.append(Paragraph(title, styles["CustomTitle"])) | |
| Story.append(Paragraph(f"By {author}", styles["CustomAuthor"])) | |
| Story.append(PageBreak()) | |
| lines = raw_text.split('\n') | |
| for line in lines: | |
| line = line.strip() | |
| if not line: continue | |
| if line.startswith("## "): | |
| Story.append(PageBreak()) | |
| Story.append(Paragraph(line.replace("## ", ""), styles["CustomHeader"])) | |
| else: | |
| Story.append(Paragraph(line, styles["CustomJustify"])) | |
| doc.build(Story) | |
| return os.path.abspath(filename) | |
| # --- MCP AGENT SETUP --- | |
| # server_params = StdioServerParameters(command=sys.executable, args=[SERVER_PATH], env=os.environ.copy()) | |
| # --- DYNAMIC FETCH LOGIC --- | |
| async def fetch_nebius_ui(api_key): | |
| if not api_key: return gr.update(choices=[]) | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| try: | |
| res = await session.call_tool("fetch_nebius_models_tool", {"api_key": api_key}) | |
| models = json.loads(res.content[0].text) | |
| return gr.update(choices=models, value=models[0] if models else None) | |
| except: return gr.update(choices=["meta-llama/Meta-Llama-3.3-70B-Instruct"]) | |
| async def fetch_gemini_ui(api_key): | |
| if not api_key: return gr.update(choices=[]) | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| try: | |
| res = await session.call_tool("fetch_gemini_models_tool", {"api_key": api_key}) | |
| models = json.loads(res.content[0].text) | |
| return gr.update(choices=models, value=models[0] if models else None) | |
| except: return gr.update(choices=["gemini-2.5-flash"]) | |
| async def fetch_claude_ui(api_key): | |
| if not api_key: return gr.update(choices=[]) | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| try: | |
| res = await session.call_tool("fetch_anthropic_models_tool", {"api_key": api_key}) | |
| models = json.loads(res.content[0].text) | |
| return gr.update(choices=models, value=models[0] if models else None) | |
| except: return gr.update(choices=["claude-4-5-sonnet"]) | |
| async def fetch_elevenlabs_data_ui(api_key): | |
| if not api_key: return gr.update(choices=[]), {}, gr.update(choices=[]) | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| try: | |
| res = await session.call_tool("fetch_elevenlabs_data_tool", {"api_key": api_key}) | |
| data = json.loads(res.content[0].text) | |
| voices_map = data.get("voices", {}) | |
| voice_names = list(voices_map.keys()) | |
| models_list = data.get("models", []) | |
| return ( | |
| gr.update(choices=voice_names, value=voice_names[0] if voice_names else None), | |
| voices_map, | |
| gr.update(choices=models_list, value="eleven_multilingual_v2" if "eleven_multilingual_v2" in models_list else models_list[0]) | |
| ) | |
| except: | |
| return gr.update(choices=[]), {}, gr.update(choices=["eleven_monolingual_v2"]) | |
| # --- WRAPPER FOR SYNC CLICK EVENT (FIXES COROUTINE ERROR) --- | |
| async def fetch_models_ui_wrapper(provider, k_n, k_g, k_c): | |
| """Wrapper to route model fetching asynchronously and AWAIT results.""" | |
| if provider == "Nebius": | |
| return await fetch_nebius_ui(k_n) | |
| elif provider == "Google Gemini": | |
| return await fetch_gemini_ui(k_g) | |
| elif provider == "Anthropic Claude": | |
| return await fetch_claude_ui(k_c) | |
| return gr.update(choices=[]) | |
| # --- ONE-CLICK AGENT FLOW --- | |
| async def run_one_click_novel(seed, mys, rom, hor, sci, lit, format_type, writing_style, length, provider, model, voice_name, el_model, voices_map, neb_key, gem_key, claude_key, aud_key): | |
| if not seed: yield "Please enter a story seed.", None, None, None, None; return | |
| active_txt_key = neb_key if provider == "Nebius" else (gem_key if provider == "Google Gemini" else claude_key) | |
| if not active_txt_key: yield f"Error: {provider} API Key is missing.", None, None, None, None; return | |
| # Genre & Style | |
| genres = {"Mystery": mys, "Romance": rom, "Horror": hor, "Sci-Fi": sci, "Literary": lit} | |
| sorted_genres = sorted(genres.items(), key=lambda x: x[1], reverse=True) | |
| genre_str = ", ".join([f"{v}% {k}" for k, v in sorted_genres if v > 0]) | |
| status_log = f"π Starting Engine ({provider} | {format_type})...\n" | |
| raw_text_for_pdf = "" | |
| raw_text_for_audio = "" | |
| generated_title = "Untitled" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| # PHASE 1: OUTLINE | |
| status_log += f"π Phase 1: Planning {format_type}...\n" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| try: | |
| res = await session.call_tool("generate_story_plan", { | |
| "seed": seed, "format_type": format_type, "genre_profile": genre_str, | |
| "provider": provider, "model": model, "api_key": active_txt_key | |
| }) | |
| outline_raw = res.content[0].text | |
| try: | |
| plan_data = json.loads(outline_raw) | |
| generated_title = plan_data.get("book_title", "Untitled") | |
| parts = plan_data.get("parts", []) | |
| except: | |
| generated_title = "Generated Work" | |
| parts = [] | |
| status_log += f"β Structure Ready: '{generated_title}' ({len(parts)} Parts).\n" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| except Exception as e: | |
| status_log += f"β Outline Error: {e}\n" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| return | |
| # PHASE 2: WRITE | |
| voice_id = voices_map.get(voice_name) if voices_map else None | |
| for i, part in enumerate(parts): | |
| title = part.get('title', f'Part {i+1}') | |
| description = part.get('description', '') | |
| status_log += f"βοΈ Writing Part {i+1}: {title}...\n" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| try: | |
| w_res = await session.call_tool("write_content_segment", { | |
| "title": title, "description": description, | |
| "format_type": format_type, | |
| "style_guide": f"{writing_style}. Genre: {genre_str}", "length": length, | |
| "provider": provider, "model": model, "api_key": active_txt_key | |
| }) | |
| text = w_res.content[0].text | |
| raw_text_for_pdf += f"\n\n## {title}\n\n{text}" | |
| raw_text_for_audio += f"{title}. {text}\n\n" | |
| yield status_log, None, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| except Exception as e: | |
| status_log += f"β Error Part {i+1}: {e}\n" | |
| # PHASE 3: PDF | |
| status_log += "π Binding PDF...\n" | |
| final_pdf = create_typeset_pdf(generated_title, "Anonymous AI", raw_text_for_pdf) | |
| status_log += "π Complete!" | |
| yield status_log, final_pdf, generated_title, raw_text_for_pdf, raw_text_for_audio | |
| # --- AUDIO WRAPPER --- | |
| async def generate_custom_audio(text, voice_name, el_model, voices_map, api_key): | |
| if not text: return None, "No text provided." | |
| if not api_key: return None, "No ElevenLabs Key." | |
| voice_id = voices_map.get(voice_name) if voices_map else None | |
| if not voice_id: return None, "Voice not found. Fetch voices first." | |
| async with sse_client(MCP_SSE_URL) as (read, write): | |
| async with ClientSession(read, write) as session: | |
| await session.initialize() | |
| try: | |
| res = await session.call_tool("generate_audio_narration", { | |
| "text": text, "voice_id": voice_id, "model_id": el_model, "api_key": api_key | |
| }) | |
| return res.content[0].text, "Audio Generated!" | |
| except Exception as e: return None, f"Error: {e}" | |
| # --- UI WRAPPER --- | |
| async def _ui_wrapper(*args): | |
| async for update in run_one_click_novel(*args): | |
| yield update | |
| # --- SOCIAL FUNCTIONS --- | |
| lib_mgr = LibraryManager() | |
| def get_first_chapter(audio_text: str): | |
| """Extract the first chapter/segment suitable for narration. | |
| The generator appends each part as: "{Title}. {text}\n\n" so splitting | |
| on double-newline and returning the first non-empty chunk yields the | |
| first chapter (title + content). Returns empty string if no text. | |
| """ | |
| if not audio_text: | |
| return "" | |
| # Normalize line endings and split on blank line separators | |
| chunks = [c.strip() for c in audio_text.replace('\r\n', '\n').split('\n\n') if c.strip()] | |
| return chunks[0] if chunks else audio_text | |
| def submit_novel_to_lib(user_title, user_author, auto_title, raw_text, pdf_path): | |
| if not raw_text: return "Error: No content to publish." | |
| final_title = user_title if user_title.strip() else auto_title | |
| final_author = user_author if user_author.strip() else "Anonymous" | |
| return lib_mgr.save_novel(final_title, final_author, "Mixed", raw_text, pdf_path) | |
| def refresh_library(): | |
| return lib_mgr.get_leaderboard() | |
| def select_book_from_leaderboard(evt: gr.SelectData, current_data, liked_session): | |
| # FIX: Use iloc for position-based access to avoid warnings | |
| row_data = current_data.iloc[evt.index[0]] | |
| novel_title = row_data[0] | |
| novel_author = row_data[1] | |
| novel_id = row_data[4] | |
| novel_content = row_data[5] # Hidden content | |
| pdf_filename = row_data[6] # Hidden filename | |
| # Try fetching existing PDF first, else regenerate | |
| pdf_path = lib_mgr.get_pdf_path(pdf_filename) | |
| if not pdf_path or not os.path.exists(pdf_path): | |
| pdf_path = create_typeset_pdf(novel_title, novel_author, novel_content) | |
| # Determine like button label from session | |
| btn_label = "π Unlike" if novel_id in liked_session else "β€οΈ Like this Story" | |
| return pdf_path, novel_id, f"π Reading: {novel_title}", gr.update(value=btn_label) | |
| def vote_current_book(novel_id, liked_session, request: gr.Request): | |
| # If no book is selected, return the current leaderboard, an error message, | |
| # and a safe default button label. | |
| if not novel_id: | |
| return lib_mgr.get_leaderboard(), "No book selected!", gr.update(value="β€οΈ Like this Story") | |
| # lib_mgr.like_novel returns (leaderboard, msg, updated_likes) | |
| leaderboard_data, msg, _ = lib_mgr.like_novel(novel_id, liked_session, request) | |
| # After toggling, check session to decide label | |
| btn_label = "π Unlike" if novel_id in liked_session else "β€οΈ Like this Story" | |
| return leaderboard_data, msg, gr.update(value=btn_label) | |
| # --- LAYOUT --- | |
| with gr.Blocks(title="Infinite Library") as demo: | |
| liked_session = gr.State([]) | |
| current_book_id = gr.State(None) | |
| hidden_raw_text = gr.State("") | |
| hidden_audio_text = gr.State("") | |
| voices_map_state = gr.State({}) | |
| gr.Markdown("# ποΈ The Infinite Library") | |
| with gr.Row(): | |
| # LEFT SIDEBAR | |
| with gr.Column(scale=1, variant="panel"): | |
| gr.Markdown("### π API Access") | |
| key_neb = gr.Textbox(label="Nebius API Key", type="password", placeholder="sk-...") | |
| key_gem = gr.Textbox(label="Google Gemini API Key", type="password", placeholder="AIza...") | |
| key_claude = gr.Textbox(label="Anthropic API Key", type="password", placeholder="sk-ant...") | |
| key_aud = gr.Textbox(label="ElevenLabs Key", type="password", placeholder="sk-...") | |
| gr.Markdown("### βοΈ Engine") | |
| provider_radio = gr.Radio(["Nebius", "Google Gemini", "Anthropic Claude"], value="Nebius", label="Provider", info="Select a provider and click fetch\n (Ensure API key is set above for the provider)") | |
| # Dynamic Model Fetching | |
| with gr.Row(): | |
| model_drop = gr.Dropdown(["meta-llama/Meta-Llama-3.3-70B-Instruct"], label="Text Model", scale=2) | |
| fetch_models_btn = gr.Button("π Fetch", scale=1) | |
| # Use specific wrapper to avoid Coroutine error | |
| fetch_models_btn.click( | |
| fetch_models_ui_wrapper, | |
| inputs=[provider_radio, key_neb, key_gem, key_claude], outputs=[model_drop] | |
| ) | |
| gr.Markdown("### ποΈ ElevenLabs Settings") | |
| with gr.Row(): | |
| el_model_drop = gr.Dropdown(["eleven_multilingual_v2"], label="Audio Model", scale=1) | |
| voice_drop = gr.Dropdown([], label="Voice", scale=1) | |
| fetch_voices_btn = gr.Button("π Fetch", scale=1) | |
| fetch_voices_btn.click(fetch_elevenlabs_data_ui, inputs=[key_aud], outputs=[voice_drop, voices_map_state, el_model_drop]) | |
| gr.Markdown("### π¨ Format & Style") | |
| format_drop = gr.Dropdown(["Novel", "Short Story", "Novella", "Poem", "Essay", "Screenplay"], value="Novel", label="Format") | |
| style_drop = gr.Dropdown( | |
| ["Cinematic Thriller", "Hemingway (Minimalist)", "Jane Austen (Regency)", | |
| "Stephen King (Horror)", "Gen Z Internet Slang", "Shakespearean Drama", | |
| "Douglas Adams (Absurdist)", "Lovecraftian (Eldritch)", "Hard Sci-Fi (Technical)"], | |
| value="Cinematic Thriller", label="Writing Style" | |
| ) | |
| len_drop = gr.Dropdown(["Short", "Medium", "Long"], value="Medium", label="Segment Length") | |
| gr.Markdown("### 𧬠Genre Blender") | |
| s_mys = gr.Slider(0, 100, label="Mystery", value=20, info="Puzzles, clues, suspense") | |
| s_rom = gr.Slider(0, 100, label="Romance", value=10, info="Relationships, emotion, drama") | |
| s_hor = gr.Slider(0, 100, label="Horror", value=10, info="Fear, tension, supernatural") | |
| s_sci = gr.Slider(0, 100, label="Sci-Fi", value=60, info="Future, tech, space") | |
| s_lit = gr.Slider(0, 100, label="Literary", value=10, info="Prose focus, metaphor, depth") | |
| # RIGHT MAIN | |
| with gr.Column(scale=3): | |
| with gr.Tabs(): | |
| # TAB 1: GENERATE | |
| with gr.TabItem("βοΈ Studio"): | |
| seed_input = gr.Textbox(label="Story Seed", placeholder="A robot discovers it has a soul...", lines=3) | |
| gen_btn = gr.Button("π Generate", variant="primary") | |
| status_box = gr.Textbox(label="Live Log", interactive=False, lines=4) | |
| pdf_display = PDF(label="eBook Preview", height=600) | |
| with gr.Accordion("ποΈ Audio Studio", open=False): | |
| gr.Markdown("Select text to narrate or play the full story.") | |
| custom_audio_text = gr.Textbox(label="Text to Narrate", lines=3) | |
| with gr.Row(): | |
| load_full_btn = gr.Button("Load Full Story") | |
| play_custom_btn = gr.Button("βΆοΈ Play Selection", variant="primary") | |
| custom_audio_player = gr.Audio(label="Audio Output") | |
| audio_status = gr.Textbox(label="Audio Status", interactive=False) | |
| gr.Markdown("### π Publish to Library") | |
| with gr.Row(): | |
| pub_title = gr.Textbox(label="Title", placeholder="Auto-filled") | |
| pub_author = gr.Textbox(label="Author", placeholder="Anonymous") | |
| submit_btn = gr.Button("Publish") | |
| submit_msg = gr.Textbox(label="Status", interactive=False) | |
| # TAB 2: SOCIAL | |
| with gr.TabItem("π Social Library"): | |
| gr.Markdown("### π Community Bookshelf") | |
| refresh_btn = gr.Button("π Refresh Library") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| leaderboard = gr.Dataframe( | |
| headers=["Title", "Author", "Genre", "Likes", "ID", "Content", "File"], | |
| datatype=["str", "str", "str", "number", "str", "str", "str"], | |
| interactive=False, | |
| label="Click a book to Read" | |
| ) | |
| with gr.Column(scale=1): | |
| social_status = gr.Markdown("### Select a book to read") | |
| social_reader = PDF(label="Reader", height=600, scale=0.1) | |
| like_btn = gr.Button("β€οΈ Like this Story", variant="primary") | |
| like_msg = gr.Textbox(label="Status", interactive=False) | |
| # --- WIRING --- | |
| gen_btn.click( | |
| _ui_wrapper, | |
| inputs=[seed_input, s_mys, s_rom, s_hor, s_sci, s_lit, format_drop, style_drop, len_drop, provider_radio, model_drop, voice_drop, el_model_drop, voices_map_state, key_neb, key_gem, key_claude, key_aud], | |
| outputs=[status_box, pdf_display, pub_title, hidden_raw_text, hidden_audio_text] | |
| ) | |
| load_full_btn.click(get_first_chapter, inputs=[hidden_audio_text], outputs=[custom_audio_text]) | |
| play_custom_btn.click( | |
| generate_custom_audio, | |
| inputs=[custom_audio_text, voice_drop, el_model_drop, voices_map_state, key_aud], | |
| outputs=[custom_audio_player, audio_status] | |
| ) | |
| submit_btn.click( | |
| submit_novel_to_lib, | |
| inputs=[pub_title, pub_author, pub_title, hidden_raw_text, pdf_display], | |
| outputs=[submit_msg] | |
| ) | |
| refresh_btn.click(refresh_library, outputs=[leaderboard]) | |
| # When a row is selected, also pass the session state so we can set the button label | |
| leaderboard.select(select_book_from_leaderboard, inputs=[leaderboard, liked_session], outputs=[social_reader, current_book_id, social_status, like_btn]) | |
| # Update leaderboard, status message, and the like button label when toggling | |
| like_btn.click(vote_current_book, inputs=[current_book_id, liked_session], outputs=[leaderboard, like_msg, like_btn]) | |
| # if __name__ == "__main__": | |
| # demo.queue().launch(mcp_server=True,) | |