Spaces:
Sleeping
Sleeping
| """ | |
| Emoji AI Avatar - Main Gradio Application | |
| Real-time emoji avatars based on chat sentiment analysis | |
| With MCP (Model Context Protocol) server integration | |
| """ | |
| import gradio as gr | |
| import os | |
| import socket | |
| import sys | |
| import time | |
| import importlib.util | |
| from pathlib import Path | |
| # Add parent directory to path for imports | |
| ROOT_DIR = Path(__file__).parent | |
| sys.path.insert(0, str(ROOT_DIR)) | |
| # Import from modular avatar structure | |
| from avatar import SentimentAnalyzer, EmojiMapper | |
| def _load_module_from_path(module_name: str, file_path: str): | |
| """Helper to load a module from a file path with hyphens in directory name""" | |
| spec = importlib.util.spec_from_file_location(module_name, file_path) | |
| if spec and spec.loader: | |
| module = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(module) | |
| return module | |
| raise ImportError(f"Could not load {module_name} from {file_path}") | |
| # Load GeminiClient from llm-inference module | |
| _gemini_module = _load_module_from_path( | |
| "gemini_client", | |
| str(ROOT_DIR / "llm-inference" / "gemini_client.py") | |
| ) | |
| GeminiClient = _gemini_module.GeminiClient | |
| # Load MCPClient from mcp-client module | |
| _mcp_module = _load_module_from_path( | |
| "mcp_client", | |
| str(ROOT_DIR / "mcp-client" / "mcp_client.py") | |
| ) | |
| MCPClient = _mcp_module.MCPClient | |
| # Load environment variables from .env file | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| # Initialize components | |
| api_key = os.environ.get('GEMINI_API_KEY') | |
| print(f"🔑 API Key loaded: {'Yes (' + api_key[:10] + '...)' if api_key else 'No'}") | |
| gemini = GeminiClient(api_key=api_key) | |
| sentiment_analyzer = SentimentAnalyzer() | |
| emoji_mapper = EmojiMapper() | |
| mcp_client = MCPClient() | |
| # Streaming velocity control (seconds between yields) | |
| STREAM_DELAY = 0.05 # 50ms delay for smooth, readable streaming | |
| def get_emoji_html(emoji: str, label: str, size: int = 64) -> str: | |
| """Generate styled emoji HTML - instant updates, no fade""" | |
| return f""" | |
| <div style="text-align: center; padding: 10px; min-height: 90px;"> | |
| <div style="font-size: {size}px; line-height: 1;">{emoji}</div> | |
| <div style="font-size: 12px; color: #666; margin-top: 5px;">{label}</div> | |
| </div> | |
| """ | |
| custom_css = """ | |
| .emoji-container { | |
| display: flex; | |
| justify-content: space-around; | |
| padding: 20px; | |
| background: linear-gradient(135deg, #667eea 0%, #764ba2 100%); | |
| border-radius: 15px; | |
| margin-bottom: 20px; | |
| } | |
| .emoji-box { | |
| background: white; | |
| border-radius: 15px; | |
| padding: 15px 30px; | |
| box-shadow: 0 4px 15px rgba(0,0,0,0.1); | |
| min-width: 120px; | |
| } | |
| /* Prevent fades/transitions and enforce full opacity for emoji displays */ | |
| .emoji-box, .emoji-box * { | |
| transition: none !important; | |
| opacity: 1 !important; | |
| } | |
| .chat-container { | |
| border-radius: 15px; | |
| overflow: hidden; | |
| } | |
| input[type="text"] { | |
| border-radius: 8px; | |
| border: 1px solid #ddd; | |
| } | |
| button { | |
| border-radius: 8px; | |
| } | |
| """ | |
| # Build Gradio Interface | |
| with gr.Blocks(title="Emoji AI Avatar") as demo: | |
| gr.Markdown(""" | |
| # 😊 Emoji AI Avatar Chat 🤖 | |
| Watch the emojis change based on the **sentiment** of your conversation! | |
| Both your messages and AI responses are analyzed in real-time. | |
| Connect to **MCP servers** on Hugging Face to add new skills! | |
| """) | |
| # Tabs for Chat and MCP Configuration | |
| with gr.Tabs(): | |
| # ========== CHAT TAB ========== | |
| with gr.TabItem("💬 Chat", id="chat-tab"): | |
| # Emoji Display Row | |
| with gr.Row(elem_classes="emoji-container"): | |
| with gr.Column(elem_classes="emoji-box"): | |
| user_emoji_display = gr.HTML( | |
| get_emoji_html("😐", "You"), | |
| label="Your Emoji" | |
| ) | |
| with gr.Column(elem_classes="emoji-box"): | |
| ai_emoji_display = gr.HTML( | |
| get_emoji_html("😐", "AI: neutral"), | |
| label="AI Emoji" | |
| ) | |
| # MCP Status indicator | |
| mcp_status_display = gr.HTML( | |
| value='<div style="text-align: center; padding: 5px; color: #666; font-size: 12px;">🔌 MCP: Not connected</div>', | |
| label="MCP Status" | |
| ) | |
| # Chat Interface | |
| chatbot = gr.Chatbot( | |
| label="Chat History", | |
| height=400, | |
| ) | |
| # Message input and send button in row | |
| with gr.Row(): | |
| msg = gr.Textbox( | |
| placeholder="Type your message here... Try expressing different emotions!", | |
| label="Message", | |
| scale=9, | |
| ) | |
| submit_btn = gr.Button("Send", variant="primary", scale=1) | |
| # Timer for live emoji updates | |
| timer = gr.Timer(0.1) # Update every 100ms | |
| # Use MCP checkbox | |
| use_mcp_checkbox = gr.Checkbox( | |
| label="🔌 Use MCP Context (enhances AI with MCP knowledge)", | |
| value=False, | |
| info="When enabled, MCP provides grounding context to enhance Gemini's responses" | |
| ) | |
| # Example messages - 2 examples as requested | |
| gr.Examples( | |
| examples=[ | |
| "Hello! How are you?", # Short text example | |
| "I've been working on this complex machine learning project for weeks now, and I'm really excited about the progress we've made. The neural network is finally converging and the accuracy metrics are looking promising. Can you help me understand how to further optimize the hyperparameters?", # Long text example | |
| ], | |
| inputs=msg, | |
| label="Try these examples:" | |
| ) | |
| # Sentiment Legend | |
| with gr.Accordion("Emoji Legend", open=False): | |
| gr.Markdown(""" | |
| ### Emotion → Emoji Mapping (Unified for User & AI) | |
| **Positive Emotions:** | |
| | 😄 Joy | 😊 Happiness | 🤩 Excitement | 🥰 Love | 🥹 Gratitude | | |
| |--------|--------------|---------------|---------|--------------| | |
| | 😌 Contentment | 🤗 Hope | 😎 Pride | 😆 Amusement | 😮💨 Relief | | |
| **Curious/Surprise:** | |
| | 🧐 Curiosity | 😲 Surprise | 😯 Anticipation | 🤯 Wonder | 🙃 Playful | | |
| |--------------|-------------|-----------------|-----------|------------| | |
| **Negative Emotions:** | |
| | 😠 Anger | 😤 Frustration | 😒 Annoyance | 🤢 Disgust | 😏 Contempt | | |
| |----------|----------------|--------------|------------|-------------| | |
| | 😢 Sadness | 😭 Grief | 😞 Disappointment | 🥺 Hurt | 😨 Fear | | |
| **Other Emotions:** | |
| | 😰 Anxiety | 😟 Worry | 😬 Nervousness | 😕 Confusion | 😳 Embarrassment | | |
| |------------|---------|----------------|--------------|------------------| | |
| | 😔 Shame | 🥱 Boredom | 😶 Loneliness | 🤨 Skepticism | 😐 Neutral | | |
| """) | |
| # ========== MCP CONFIGURATION TAB ========== | |
| with gr.TabItem("🔌 MCP Configuration", id="mcp-tab"): | |
| gr.Markdown(""" | |
| ## MCP Server Configuration | |
| Connect to **MCP (Model Context Protocol)** servers on Hugging Face Spaces | |
| to add new skills and capabilities to your chatbot! | |
| MCP servers provide specialized tools and functions that can be used during chat. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=3): | |
| mcp_url_input = gr.Textbox( | |
| label="MCP Server URL", | |
| placeholder="e.g., MCP-1st-Birthday/QuantumArchitect-MCP", | |
| value="https://huggingface.co/spaces/MCP-1st-Birthday/QuantumArchitect-MCP", | |
| info="Enter the Hugging Face Space URL or just the space name (owner/repo)" | |
| ) | |
| with gr.Column(scale=1): | |
| connect_btn = gr.Button("🔌 Connect", variant="primary") | |
| disconnect_btn = gr.Button("❌ Disconnect", variant="secondary") | |
| # Connection status | |
| mcp_connection_status = gr.HTML( | |
| value='<div style="padding: 15px; background: #f0f0f0; border-radius: 8px; margin: 10px 0;"><b>Status:</b> Not connected</div>' | |
| ) | |
| # Available tools/endpoints | |
| mcp_tools_display = gr.Textbox( | |
| label="Available Tools/Endpoints", | |
| lines=8, | |
| interactive=False, | |
| placeholder="Connect to an MCP server to see available tools..." | |
| ) | |
| # Test MCP | |
| gr.Markdown("### Test MCP Connection") | |
| with gr.Row(): | |
| test_message = gr.Textbox( | |
| label="Test Message", | |
| placeholder="Enter a test message for the MCP server...", | |
| scale=4 | |
| ) | |
| test_btn = gr.Button("🧪 Test", variant="secondary", scale=1) | |
| test_result = gr.Textbox( | |
| label="Test Result", | |
| lines=5, | |
| interactive=False | |
| ) | |
| # Example MCP servers | |
| gr.Markdown(""" | |
| ### Example MCP Servers on Hugging Face | |
| | Server | Description | | |
| |--------|-------------| | |
| | `MCP-1st-Birthday/QuantumArchitect-MCP` | Quantum computing architecture assistant | | |
| | `gradio/tool-mcp` | General purpose tool server | | |
| Click on a server name above and paste it into the URL field to connect. | |
| """) | |
| # ========== EVENT HANDLERS ========== | |
| # Live typing sentiment update - updates as user types each character | |
| def update_user_emoji_live(text: str): | |
| """Update user emoji in real-time as they type - EVERY KEYSTROKE""" | |
| if not text or not text.strip(): | |
| return get_emoji_html("😐", "You") | |
| # Analyze sentiment on every keystroke for live updates | |
| # The analyzer now focuses on the LAST SENTENCE for accuracy | |
| sentiment = sentiment_analyzer.analyze(text) | |
| emoji = emoji_mapper.get_emoji(sentiment["label"]) | |
| return get_emoji_html(emoji, f"You: {sentiment['label']}") | |
| # MCP Connection handlers | |
| def connect_to_mcp(url: str): | |
| """Connect to MCP server""" | |
| result = mcp_client.connect(url) | |
| if result["success"]: | |
| # Show capabilities | |
| caps_info = "" | |
| if mcp_client.mcp_capabilities: | |
| caps_list = "<br>".join([f"• {c}" for c in mcp_client.mcp_capabilities[:10]]) | |
| caps_info = f"<br><br><b>Capabilities:</b><br>{caps_list}" | |
| status_html = f''' | |
| <div style="padding: 15px; background: #d4edda; border-radius: 8px; margin: 10px 0; border: 1px solid #c3e6cb;"> | |
| <b>✅ Status:</b> Connected to <code>{mcp_client.space_name}</code><br> | |
| <small>URL: {mcp_client.space_url}</small><br> | |
| <small>{mcp_client.mcp_description}</small> | |
| {caps_info} | |
| </div> | |
| ''' | |
| tools = mcp_client.list_tools() | |
| mcp_indicator = f'<div style="text-align: center; padding: 5px; color: #28a745; font-size: 12px;">🔌 MCP: {mcp_client.space_name} (provides context for AI)</div>' | |
| return status_html, tools, mcp_indicator | |
| else: | |
| status_html = f''' | |
| <div style="padding: 15px; background: #f8d7da; border-radius: 8px; margin: 10px 0; border: 1px solid #f5c6cb;"> | |
| <b>❌ Status:</b> Connection failed<br> | |
| <small>{result["message"]}</small> | |
| </div> | |
| ''' | |
| return status_html, "Connection failed", '<div style="text-align: center; padding: 5px; color: #666; font-size: 12px;">🔌 MCP: Not connected</div>' | |
| def disconnect_from_mcp(): | |
| """Disconnect from MCP server""" | |
| mcp_client.disconnect() | |
| status_html = '<div style="padding: 15px; background: #f0f0f0; border-radius: 8px; margin: 10px 0;"><b>Status:</b> Disconnected</div>' | |
| mcp_indicator = '<div style="text-align: center; padding: 5px; color: #666; font-size: 12px;">🔌 MCP: Not connected</div>' | |
| return status_html, "", mcp_indicator | |
| def test_mcp_connection(message: str): | |
| """Test the MCP connection - shows context that would be provided to Gemini""" | |
| if not mcp_client.connected: | |
| return "❌ Not connected to any MCP server. Please connect first." | |
| if not message.strip(): | |
| return "Please enter a test message." | |
| # Show what context would be provided to Gemini | |
| context = mcp_client.get_context_for_llm(message) | |
| if context: | |
| return f"✅ MCP Context for this message:\n\n{context}" | |
| else: | |
| return "⚠️ No context retrieved from MCP for this message." | |
| # Chat with MCP integration - MCP provides CONTEXT for Gemini | |
| def stream_chat_with_mcp(message: str, history: list, use_mcp: bool): | |
| """ | |
| Stream chat response with MCP as grounding context. | |
| MCP provides context/skills that enhance Gemini's responses. | |
| EMOJI DISPLAY: Keeps previous emoji stable until new emotion is detected. | |
| No 'thinking' or intermediate states shown. | |
| """ | |
| if not message.strip(): | |
| # No message — don't overwrite existing emoji displays | |
| yield ( | |
| history, | |
| None, | |
| None, | |
| ) | |
| return | |
| # Analyze user message sentiment immediately | |
| user_sentiment = sentiment_analyzer.analyze(message) | |
| user_emoji = emoji_mapper.get_emoji(user_sentiment["label"]) | |
| user_emoji_html = get_emoji_html(user_emoji, f"You: {user_sentiment['label']}") | |
| # Create new history with user message | |
| new_history = list(history) + [{"role": "user", "content": message}] | |
| # STABLE EMOJI: Keep previous AI emoji unchanged until a new emotion is detected | |
| current_ai_emoji = None | |
| current_ai_label = None | |
| last_yielded_ai_html = None | |
| # don't overwrite the AI emoji at stream start; leave it unchanged until we detect a real emotion | |
| yield ( | |
| new_history, | |
| user_emoji_html, | |
| None, | |
| ) | |
| # Get MCP context if enabled (no emoji change during context gathering) | |
| mcp_context = "" | |
| if use_mcp and mcp_client.connected: | |
| mcp_context = mcp_client.get_context_for_llm(message) | |
| # Build the enhanced message with MCP context | |
| if mcp_context: | |
| enhanced_message = f"""You have access to MCP context. Use this information to provide a more informed response. | |
| **MCP Context:** | |
| {mcp_context} | |
| **User Question:** | |
| {message} | |
| Please answer the user's question, incorporating the MCP context where relevant. Be conversational and helpful.""" | |
| else: | |
| enhanced_message = message | |
| # Stream Gemini response (with MCP context if available) | |
| full_response = "" | |
| chunk_count = 0 | |
| last_emotion = "" | |
| last_yield_time = time.time() | |
| for chunk in gemini.stream_chat(enhanced_message): | |
| full_response += chunk | |
| chunk_count += 1 | |
| # Velocity control | |
| current_time = time.time() | |
| elapsed = current_time - last_yield_time | |
| if elapsed < STREAM_DELAY: | |
| time.sleep(STREAM_DELAY - elapsed) | |
| last_yield_time = time.time() | |
| # Update AI emoji every 2 chunks - ONLY when emotion changes | |
| if chunk_count % 2 == 0: | |
| partial_sentiment = sentiment_analyzer.analyze(full_response) | |
| detected_emotion = partial_sentiment["label"] | |
| # Only update emoji if emotion actually changed (not empty) | |
| if detected_emotion and detected_emotion != "neutral" and detected_emotion != last_emotion: | |
| last_emotion = detected_emotion | |
| current_ai_emoji = emoji_mapper.get_emoji(detected_emotion) | |
| # Show MCP indicator if using MCP | |
| if mcp_context: | |
| current_ai_label = f"AI+MCP: {detected_emotion}" | |
| else: | |
| current_ai_label = f"AI: {detected_emotion}" | |
| elif detected_emotion == "neutral" and last_emotion == "": | |
| # First detection is neutral - update label but keep neutral emoji | |
| last_emotion = "neutral" | |
| current_ai_emoji = "😐" | |
| current_ai_label = "AI: neutral" | |
| # Add MCP indicator to response if context was used | |
| display_response = full_response | |
| if mcp_context and chunk_count == 1: | |
| display_response = f"🔌 *Using {mcp_client.space_name} context*\n\n{full_response}" | |
| elif mcp_context: | |
| display_response = f"🔌 *Using {mcp_client.space_name} context*\n\n{full_response}" | |
| display_history = list(history) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": display_response} | |
| ] | |
| # Only update ai_emoji_display when AI emoji actually changed | |
| ai_html_to_yield = None | |
| if current_ai_emoji is not None: | |
| ai_html_to_yield = get_emoji_html(current_ai_emoji, current_ai_label) | |
| # If identical to last yielded HTML, avoid updating to prevent visual flicker | |
| if ai_html_to_yield == last_yielded_ai_html: | |
| ai_html_to_yield = None | |
| else: | |
| last_yielded_ai_html = ai_html_to_yield | |
| yield ( | |
| display_history, | |
| user_emoji_html, | |
| ai_html_to_yield, | |
| ) | |
| # Final sentiment analysis | |
| final_sentiment = sentiment_analyzer.analyze(full_response) | |
| final_emoji = emoji_mapper.get_emoji(final_sentiment["label"]) | |
| # Final response with MCP indicator if used | |
| final_response = full_response | |
| if mcp_context: | |
| final_response = f"🔌 *Using {mcp_client.space_name} context*\n\n{full_response}" | |
| final_label = f"AI+MCP: {final_sentiment['label']}" | |
| else: | |
| final_label = f"AI: {final_sentiment['label']}" | |
| final_history = list(history) + [ | |
| {"role": "user", "content": message}, | |
| {"role": "assistant", "content": final_response} | |
| ] | |
| yield ( | |
| final_history, | |
| user_emoji_html, | |
| get_emoji_html(final_emoji, final_label), | |
| ) | |
| # Listen to text input changes for live emoji update | |
| # Use both timer.tick AND msg.input for maximum responsiveness | |
| timer.tick( | |
| update_user_emoji_live, | |
| inputs=[msg], | |
| outputs=[user_emoji_display], | |
| ) | |
| # Also use .input() for immediate keystroke feedback (Gradio 6 compatible) | |
| msg.input( | |
| update_user_emoji_live, | |
| inputs=[msg], | |
| outputs=[user_emoji_display], | |
| ) | |
| # MCP connection buttons | |
| connect_btn.click( | |
| connect_to_mcp, | |
| inputs=[mcp_url_input], | |
| outputs=[mcp_connection_status, mcp_tools_display, mcp_status_display], | |
| ) | |
| disconnect_btn.click( | |
| disconnect_from_mcp, | |
| outputs=[mcp_connection_status, mcp_tools_display, mcp_status_display], | |
| ) | |
| # Test MCP button | |
| test_btn.click( | |
| test_mcp_connection, | |
| inputs=[test_message], | |
| outputs=[test_result], | |
| ) | |
| # Handle message submission with streaming (now with MCP support) | |
| msg.submit( | |
| stream_chat_with_mcp, | |
| [msg, chatbot, use_mcp_checkbox], | |
| [chatbot, user_emoji_display, ai_emoji_display], | |
| ).then( | |
| lambda: "", | |
| None, | |
| msg, | |
| ) | |
| submit_btn.click( | |
| stream_chat_with_mcp, | |
| [msg, chatbot, use_mcp_checkbox], | |
| [chatbot, user_emoji_display, ai_emoji_display], | |
| ).then( | |
| lambda: "", | |
| None, | |
| msg, | |
| ) | |
| # Clear button | |
| clear_btn = gr.Button("Clear Chat", variant="secondary") | |
| def clear_chat(): | |
| gemini.reset_chat() # Reset Gemini chat history too | |
| return [], get_emoji_html("😐", "You"), get_emoji_html("😐", "AI: neutral") | |
| clear_btn.click( | |
| clear_chat, | |
| None, | |
| [chatbot, user_emoji_display, ai_emoji_display], | |
| ) | |
| def _is_port_free(port: int) -> bool: | |
| """Return True if localhost:port is available for binding.""" | |
| try: | |
| with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: | |
| s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | |
| s.bind(("0.0.0.0", port)) | |
| return True | |
| except OSError: | |
| return False | |
| def _choose_port(preferred: int | None = None, start: int = 7861, end: int = 7870) -> int: | |
| """Choose an available port. | |
| Priority: | |
| 1. Environment variable GRADIO_SERVER_PORT or PORT | |
| 2. HuggingFace Spaces detection (use port 7860) | |
| 3. preferred argument | |
| 4. scan range start..end | |
| 5. Let Gradio auto-assign by returning None | |
| """ | |
| # 1. environment override | |
| env_port = os.environ.get("GRADIO_SERVER_PORT") or os.environ.get("PORT") | |
| if env_port: | |
| try: | |
| p = int(env_port) | |
| return p # Trust the environment variable | |
| except Exception: | |
| pass | |
| # 2. Detect HuggingFace Spaces | |
| if os.environ.get("SPACE_ID"): | |
| print("🤗 HuggingFace Space detected. Using default port 7860.") | |
| return 7860 | |
| # 3. preferred | |
| if preferred and _is_port_free(preferred): | |
| return preferred | |
| # 4. scan range | |
| for p in range(start, end + 1): | |
| if _is_port_free(p): | |
| return p | |
| # 5. Let Gradio handle it with auto-assignment | |
| print("⚠️ No port in preferred range available. Letting Gradio auto-assign.") | |
| return None | |
| if __name__ == "__main__": | |
| # Prefer 7861..7870, but choose automatically if occupied. | |
| # For HuggingFace Spaces, auto-detect and use port 7860 | |
| preferred_port = 7861 | |
| port = _choose_port(preferred=preferred_port, start=7861, end=7870) | |
| if port: | |
| print(f"🚀 Attempting to start Gradio on 0.0.0.0:{port}") | |
| else: | |
| print(f"🚀 Starting Gradio with auto-assigned port") | |
| # Try launching on the chosen port, but gracefully fallback to auto-assignment | |
| # if Gradio raises OSError (port collision/race condition). | |
| try: | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=port, | |
| share=False, | |
| css=custom_css, | |
| ) | |
| except OSError as e: | |
| print(f"⚠️ Failed to bind to port {port}: {e}. Falling back to auto-assigned port.") | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| share=False, | |
| css=custom_css, | |
| ) | |