import random import nltk import os import json import yaml from dotenv import load_dotenv import logging import requests from litellm import completion from datetime import datetime # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Load environment variables from .env file load_dotenv() # Load model configuration from YAML def load_model_config(config_path="models.yaml"): """Load model configuration from YAML file""" try: if os.path.exists(config_path): with open(config_path, 'r', encoding='utf-8') as f: config = yaml.safe_load(f) logging.info(f"✓ Model configuration loaded from {config_path}") return config else: logging.warning(f"⚠ Model configuration file {config_path} not found, using defaults") return None except Exception as e: logging.error(f"✗ Error loading model configuration: {e}") return None # Load configuration at module level MODEL_CONFIG = load_model_config() # Download NLTK data (only needs to be done once) try: nltk.data.find("tokenizers/punkt") except LookupError: nltk.download('punkt') # Make sure punkt is downloaded before importing the rest nltk.download('punkt', quiet=True) # Import transformers with error handling try: from transformers import pipeline transformers_available = True except ImportError: logging.warning("Transformers library not available. Using fallback sentiment analysis.") transformers_available = False from enum import Enum # ChromaDB removed - using JSON-only memory # --- Memory System (JSON only) --- class MemorySystem: """Memory system using JSON for simple key-value storage""" def __init__(self, json_db_path=None, config=None): self.config = config or MODEL_CONFIG or {} # Get paths from config or use defaults memory_config = self.config.get('memory', {}) if self.config else {} self.json_db_path = json_db_path or memory_config.get('json_path', './memory.json') self.json_memory = {} # Initialize JSON database self.load_json_memory() def is_ready(self): """Check if memory system is fully initialized""" return self.json_memory is not None def load_json_memory(self): """Load JSON memory database""" try: if os.path.exists(self.json_db_path): with open(self.json_db_path, 'r', encoding='utf-8') as f: self.json_memory = json.load(f) logging.info(f"Loaded JSON memory with {len(self.json_memory)} entries") else: self.json_memory = {} logging.info("Created new JSON memory database") except Exception as e: logging.error(f"Error loading JSON memory: {e}") self.json_memory = {} def save_json_memory(self): """Save JSON memory database""" try: with open(self.json_db_path, 'w', encoding='utf-8') as f: json.dump(self.json_memory, f, indent=2, ensure_ascii=False) except Exception as e: logging.error(f"Error saving JSON memory: {e}") def store_memory(self, text, metadata=None, memory_type="conversation"): """Store a memory in JSON""" timestamp = datetime.now().isoformat() # Store in JSON memory_id = f"{memory_type}_{timestamp}" self.json_memory[memory_id] = { "text": text, "metadata": metadata or {}, "type": memory_type, "timestamp": timestamp } self.save_json_memory() logging.info(f"Stored memory in JSON: {memory_id[:20]}...") def retrieve_relevant_memories(self, query, n_results=5): """Retrieve relevant memories using keyword search in JSON""" relevant_memories = [] # Simple keyword search in JSON if self.json_memory: query_lower = query.lower() query_words = set(query_lower.split()) for memory_id, memory_data in self.json_memory.items(): text_lower = memory_data.get("text", "").lower() text_words = set(text_lower.split()) # Simple overlap check overlap = len(query_words & text_words) if overlap > 0: relevant_memories.append({ "text": memory_data["text"], "metadata": memory_data.get("metadata", {}), "distance": 1.0 - (overlap / max(len(query_words), len(text_words))) }) # Sort by relevance (lower distance = more relevant) relevant_memories.sort(key=lambda x: x.get("distance", 1.0)) relevant_memories = relevant_memories[:n_results] logging.info(f"Retrieved {len(relevant_memories)} relevant memories from JSON DB") return relevant_memories def get_json_memory(self, key): """Get a specific memory by key from JSON database""" return self.json_memory.get(key) def set_json_memory(self, key, value, metadata=None): """Set a key-value memory in JSON database""" self.json_memory[key] = { "value": value, "metadata": metadata or {}, "timestamp": datetime.now().isoformat() } self.save_json_memory() def get_all_json_memories(self): """Get all JSON memories""" return self.json_memory.copy() # --- Agent Classes --- class MemoryAgent: """Agent responsible for memory retrieval and storage""" def __init__(self, memory_system, config=None): self.memory_system = memory_system self.config = config or MODEL_CONFIG or {} def retrieve_memories(self, query, n_results=None): """Retrieve relevant memories for a query""" if n_results is None: max_memories = self.config.get('memory', {}).get('retrieval', {}).get('max_retrieved_memories', 5) if self.config else 5 else: max_memories = n_results try: memories = self.memory_system.retrieve_relevant_memories(query, n_results=max_memories) if memories: logging.info(f"[MemoryAgent] Retrieved {len(memories)} relevant memories") return memories except Exception as e: logging.error(f"[MemoryAgent] Error retrieving memories: {e}") return [] def store_memory(self, text, metadata=None, memory_type="conversation"): """Store a memory""" try: self.memory_system.store_memory(text, metadata, memory_type) logging.info(f"[MemoryAgent] Stored memory: {memory_type}") except Exception as e: logging.error(f"[MemoryAgent] Error storing memory: {e}") def smoke_test(self): """Perform smoke test to verify memory system is working""" try: # Test storing test_text = "Smoke test memory entry" self.store_memory(test_text, {"test": True}, "test") # Test retrieving memories = self.retrieve_memories("smoke test", n_results=1) if memories is not None: logging.info("[MemoryAgent] ✓ Smoke test passed") return True else: logging.warning("[MemoryAgent] ⚠ Smoke test failed - retrieve returned None") return False except Exception as e: logging.error(f"[MemoryAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if memory agent is ready""" return self.memory_system.is_ready() if self.memory_system else False class GeminiThinkingAgent: """Agent responsible for thinking and analysis using Gemini""" def __init__(self, config=None): self.config = config or MODEL_CONFIG or {} self.gemini_available = False self._initialize() def _initialize(self): """Initialize Gemini API availability""" gemini_key = os.getenv("GEMINI_API_KEY") if gemini_key: os.environ["GEMINI_API_KEY"] = gemini_key self.gemini_available = True logging.info("[GeminiThinkingAgent] ✓ Initialized and ready") else: logging.warning("[GeminiThinkingAgent] ✗ GEMINI_API_KEY not found") def think(self, user_input, emotional_state, conversation_history, retrieved_memories=None): """Think about and analyze the conversation context""" if not self.gemini_available: logging.warning("[GeminiThinkingAgent] Not available") return None try: # Build thinking prompt with conversation context emotions_text = ", ".join([f"{emotion}: {value:.2f}" for emotion, value in emotional_state.items()]) # Prepare conversation context for thinking context_summary = "" if conversation_history: recent_history = conversation_history[-6:] # Last 3 exchanges context_summary = "\nRecent conversation:\n" for msg in recent_history: role = "User" if msg["role"] == "user" else "Galatea" context_summary += f"{role}: {msg['content']}\n" # Add retrieved memories if available memory_context = "" if retrieved_memories and len(retrieved_memories) > 0: memory_context = "\n\nRelevant memories from past conversations:\n" for i, memory in enumerate(retrieved_memories[:3], 1): # Top 3 most relevant memory_context += f"{i}. {memory['text'][:200]}...\n" thinking_prompt = f"""You are the internal reasoning system for Galatea, an AI assistant. Current emotional state: {emotions_text} {context_summary} {memory_context} Current user message: "{user_input}" Analyze this conversation and provide: 1. Key insights about what the user is asking or discussing 2. Important context from the conversation history and retrieved memories 3. How Galatea should respond emotionally and contextually 4. Any important details to remember or reference Keep your analysis concise (2-3 sentences). Focus on what matters for crafting an appropriate response.""" messages = [ {"role": "system", "content": "You are an internal reasoning system. Analyze conversations and provide insights."}, {"role": "user", "content": thinking_prompt} ] logging.info("[GeminiThinkingAgent] Processing thinking request...") # Get Gemini models from config gemini_config = self.config.get('gemini', {}) if self.config else {} gemini_models = gemini_config.get('thinking_models', [ "gemini/gemini-2.0-flash-exp", "gemini/gemini-2.0-flash", "gemini/gemini-1.5-flash-latest", "gemini/gemini-1.5-flash" ]) # Get thinking settings from config thinking_config = gemini_config.get('thinking', {}) thinking_temp = thinking_config.get('temperature', 0.5) thinking_max_tokens = thinking_config.get('max_tokens', 200) for model in gemini_models: try: response = completion( model=model, messages=messages, temperature=thinking_temp, max_tokens=thinking_max_tokens ) if response and 'choices' in response and len(response['choices']) > 0: thinking_result = response['choices'][0]['message']['content'] logging.info("[GeminiThinkingAgent] ✓ Thinking completed") return thinking_result.strip() except Exception as e: logging.warning(f"[GeminiThinkingAgent] Model {model} failed: {e}, trying next...") continue logging.error("[GeminiThinkingAgent] All models failed") return None except Exception as e: logging.error(f"[GeminiThinkingAgent] Error: {e}") return None def smoke_test(self): """Perform smoke test to verify Gemini is working""" if not self.gemini_available: return False try: test_result = self.think( "test", {"joy": 0.5, "sadness": 0.3, "anger": 0.1, "fear": 0.1, "curiosity": 0.5}, [], retrieved_memories=None ) if test_result and len(test_result) > 0: logging.info("[GeminiThinkingAgent] ✓ Smoke test passed") return True else: logging.warning("[GeminiThinkingAgent] ⚠ Smoke test failed - no result") return False except Exception as e: logging.error(f"[GeminiThinkingAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if agent is ready""" return self.gemini_available class PiResponseAgent: """Agent responsible for generating human-facing responses using Pi-3.1""" def __init__(self, config=None): self.config = config or MODEL_CONFIG or {} self.inflection_ai_available = False self.inflection_ai_api_key = None self._initialize() def _initialize(self): """Initialize Inflection AI API availability""" inflection_key = os.getenv("INFLECTION_AI_API_KEY") if inflection_key: self.inflection_ai_api_key = inflection_key self.inflection_ai_available = True logging.info("[PiResponseAgent] ✓ Initialized and ready") else: logging.warning("[PiResponseAgent] ✗ INFLECTION_AI_API_KEY not found") def respond(self, user_input, emotional_state, thinking_context=None, conversation_history=None, retrieved_memories=None): """Generate response using Pi-3.1 with thinking context and emotional state""" if not self.inflection_ai_available: logging.warning("[PiResponseAgent] Not available") return None try: # Create context with emotional state emotions_text = ", ".join([f"{emotion}: {value:.2f}" for emotion, value in emotional_state.items()]) # Get endpoint and config from YAML inflection_config = self.config.get('inflection_ai', {}) if self.config else {} url = inflection_config.get('api_endpoint', 'https://api.inflection.ai/external/api/inference') model_config = inflection_config.get('model_config', 'Pi-3.1') headers = { "Authorization": f"Bearer {self.inflection_ai_api_key}", "Content-Type": "application/json" } # Build comprehensive context with thinking insights, conversation history, and retrieved memories context_parts = [] # Base system context base_context = f"You are Galatea, an AI assistant with the following emotional state: {emotions_text}. Respond in character as Galatea. Keep your response concise (under 50 words) and reflect your emotional state in your tone." # Add thinking context from Gemini if available if thinking_context: base_context += f"\n\nInternal analysis: {thinking_context}" # Add retrieved memories if available if retrieved_memories and len(retrieved_memories) > 0: memory_text = "\n\nRelevant context from past conversations:\n" for i, memory in enumerate(retrieved_memories[:3], 1): # Top 3 most relevant memory_text += f"{i}. {memory['text'][:150]}...\n" base_context += memory_text # Add conversation history context if conversation_history and len(conversation_history) > 0: recent_history = conversation_history[-4:] # Last 2 exchanges history_text = "\n\nRecent conversation context:\n" for msg in recent_history: role = "User" if msg["role"] == "user" else "You (Galatea)" history_text += f"{role}: {msg['content']}\n" base_context += history_text context_parts.append({ "text": base_context, "type": "System" }) # Add conversation history as context messages if conversation_history and len(conversation_history) > 4: # Add older messages as context (but not the most recent ones we already included) for msg in conversation_history[-8:-4]: context_parts.append({ "text": msg["content"], "type": "Human" if msg["role"] == "user" else "Assistant" }) # Add current user input context_parts.append({ "text": user_input, "type": "Human" }) data = { "context": context_parts, "config": model_config } logging.info("[PiResponseAgent] Sending request to Pi-3.1 API...") response = requests.post(url, headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() # Extract the response text from the API response if isinstance(result, dict): if 'output' in result: text = result['output'] elif 'text' in result: text = result['text'] elif 'response' in result: text = result['response'] elif 'message' in result: text = result['message'] else: text = str(result) elif isinstance(result, str): text = result else: text = str(result) logging.info("[PiResponseAgent] ✓ Response received") return text.strip() else: logging.error(f"[PiResponseAgent] API returned status code {response.status_code}: {response.text}") return None except Exception as e: logging.error(f"[PiResponseAgent] Error: {e}") return None def smoke_test(self): """Perform smoke test to verify Pi-3.1 is working""" if not self.inflection_ai_available: return False try: test_result = self.respond( "Hello", {"joy": 0.5, "sadness": 0.3, "anger": 0.1, "fear": 0.1, "curiosity": 0.5}, thinking_context="Test thinking context", conversation_history=[], retrieved_memories=None ) if test_result and len(test_result) > 0: logging.info("[PiResponseAgent] ✓ Smoke test passed") return True else: logging.warning("[PiResponseAgent] ⚠ Smoke test failed - no result") return False except Exception as e: logging.error(f"[PiResponseAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if agent is ready""" return self.inflection_ai_available class EmotionalStateAgent: """Agent responsible for managing and updating emotional state""" def __init__(self, initial_state=None, config=None): self.config = config or MODEL_CONFIG or {} self.emotional_state = initial_state or {"joy": 0.2, "sadness": 0.2, "anger": 0.2, "fear": 0.2, "curiosity": 0.2} self.learning_rate = 0.05 self.quantum_random_available = False self.quantum_api_key = None self._initialize_quantum() def _initialize_quantum(self): """Initialize quantum randomness availability""" quantum_key = os.getenv("ANU_QUANTUM_API_KEY") if quantum_key: self.quantum_api_key = quantum_key self.quantum_random_available = True logging.info("[EmotionalStateAgent] ✓ Quantum randomness available") else: logging.warning("[EmotionalStateAgent] Quantum randomness unavailable") def get_quantum_random_float(self, min_val=0.0, max_val=1.0): """Get a quantum random float between min_val and max_val""" if not self.quantum_random_available: return random.uniform(min_val, max_val) try: quantum_config = self.config.get('quantum', {}) if self.config else {} url = quantum_config.get('api_endpoint', 'https://api.quantumnumbers.anu.edu.au') headers = {"x-api-key": self.quantum_api_key} params = {"length": 1, "type": "uint8"} response = requests.get(url, headers=headers, params=params, timeout=10) if response.status_code == 200: result = response.json() if result.get('success') and 'data' in result and len(result['data']) > 0: normalized = result['data'][0] / 255.0 return min_val + (max_val - min_val) * normalized except Exception as e: logging.warning(f"[EmotionalStateAgent] Quantum API failed: {e}") return random.uniform(min_val, max_val) def update_with_sentiment(self, sentiment_score): """Update emotional state based on sentiment""" # Enhanced Emotion Update (decay and normalization with quantum randomness) decay_factor = 0.9 if self.quantum_random_available: quantum_decay_variation = self.get_quantum_random_float(0.85, 0.95) decay_factor = quantum_decay_variation for emotion in self.emotional_state: # Decay emotions (more realistic fading with quantum variation) self.emotional_state[emotion] *= decay_factor # Normalize self.emotional_state[emotion] = max(0.0, min(1.0, self.emotional_state[emotion])) # Apply sentiment with quantum-enhanced learning rate variation learning_rate = self.learning_rate if self.quantum_random_available: quantum_lr_variation = self.get_quantum_random_float(0.03, 0.07) learning_rate = quantum_lr_variation self.emotional_state["joy"] += sentiment_score * learning_rate self.emotional_state["sadness"] -= sentiment_score * learning_rate # Add quantum randomness to curiosity (making responses more unpredictable) if self.quantum_random_available: quantum_curiosity_boost = self.get_quantum_random_float(-0.05, 0.05) self.emotional_state["curiosity"] = max(0.0, min(1.0, self.emotional_state["curiosity"] + quantum_curiosity_boost)) # Re-normalize total_emotion = sum(self.emotional_state.values()) for emotion in self.emotional_state: self.emotional_state[emotion] = self.emotional_state[emotion] / total_emotion if total_emotion > 0 else 0.2 logging.info(f"[EmotionalStateAgent] Updated emotional state: {self.emotional_state}") return self.emotional_state def get_state(self): """Get current emotional state""" return self.emotional_state.copy() def smoke_test(self): """Perform smoke test to verify emotional state system is working""" try: # Test quantum randomness if available if self.quantum_random_available: test_float = self.get_quantum_random_float(0.0, 1.0) if not isinstance(test_float, float) or test_float < 0.0 or test_float > 1.0: logging.warning("[EmotionalStateAgent] ⚠ Smoke test failed - invalid quantum random") return False # Test state update initial_state = self.get_state().copy() updated_state = self.update_with_sentiment(0.5) if updated_state and isinstance(updated_state, dict): logging.info("[EmotionalStateAgent] ✓ Smoke test passed") return True else: logging.warning("[EmotionalStateAgent] ⚠ Smoke test failed - invalid state") return False except Exception as e: logging.error(f"[EmotionalStateAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if agent is ready""" return True # Emotional state is always ready class AzureTextAnalyticsAgent: """Agent responsible for Azure Text Analytics sentiment analysis""" def __init__(self, config=None): self.config = config or MODEL_CONFIG or {} self.azure_available = False self.client = None self._initialize() def _initialize(self): """Initialize Azure Text Analytics client""" try: from azure.ai.textanalytics import TextAnalyticsClient from azure.core.credentials import AzureKeyCredential key = os.getenv("AZURE_TEXT_ANALYTICS_KEY") endpoint = os.getenv("AZURE_TEXT_ANALYTICS_ENDPOINT") if key and endpoint: try: credential = AzureKeyCredential(key) self.client = TextAnalyticsClient(endpoint=endpoint, credential=credential) self.azure_available = True logging.info("[AzureTextAnalyticsAgent] ✓ Initialized and ready") except Exception as e: logging.warning(f"[AzureTextAnalyticsAgent] Failed to create client: {e}") self.azure_available = False else: logging.warning("[AzureTextAnalyticsAgent] ✗ Azure credentials not found") self.azure_available = False except ImportError: logging.warning("[AzureTextAnalyticsAgent] ✗ Azure SDK not installed") self.azure_available = False def analyze(self, text): """Analyze sentiment using Azure Text Analytics""" if not self.azure_available or not self.client: return None try: result = self.client.analyze_sentiment(documents=[text])[0] if result.sentiment == 'positive': return result.confidence_scores.positive elif result.sentiment == 'negative': return -result.confidence_scores.negative else: return 0.0 except Exception as e: logging.error(f"[AzureTextAnalyticsAgent] Error: {e}") return None def smoke_test(self): """Perform smoke test to verify Azure Text Analytics is working""" if not self.azure_available: return False try: test_text = "This is a test message for sentiment analysis." result = self.analyze(test_text) if result is not None: logging.info("[AzureTextAnalyticsAgent] ✓ Smoke test passed") return True else: logging.warning("[AzureTextAnalyticsAgent] ⚠ Smoke test failed - analyze returned None") return False except Exception as e: logging.error(f"[AzureTextAnalyticsAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if agent is ready""" return self.azure_available class SentimentAgent: """Agent responsible for sentiment analysis (uses Azure, Hugging Face, or NLTK fallback)""" def __init__(self, config=None): self.config = config or MODEL_CONFIG or {} self.azure_agent = AzureTextAnalyticsAgent(config=self.config) self.sentiment_analyzer = None self.ready = False self._initialize() def _initialize(self): """Initialize sentiment analyzer""" # Try Azure first if self.azure_agent.is_ready(): self.ready = True logging.info("[SentimentAgent] Using Azure Text Analytics") return # Fallback to Hugging Face sentiment_model = self.config.get('sentiment', {}).get('primary_model', 'distilbert/distilbert-base-uncased-finetuned-sst-2-english') if self.config else 'distilbert/distilbert-base-uncased-finetuned-sst-2-english' if transformers_available: try: logging.info("[SentimentAgent] Initializing Hugging Face sentiment analyzer...") self.sentiment_analyzer = pipeline("sentiment-analysis", model=sentiment_model) self.ready = True logging.info("[SentimentAgent] ✓ Initialized successfully") except Exception as e: logging.warning(f"[SentimentAgent] Hugging Face model failed: {e}, using fallback") self.sentiment_analyzer = None self.ready = True # Fallback available else: self.ready = True # Fallback available def analyze(self, text): """Analyze sentiment of text (tries Azure, then Hugging Face, then NLTK)""" # Try Azure first if self.azure_agent.is_ready(): result = self.azure_agent.analyze(text) if result is not None: return result # Fallback to Hugging Face if self.sentiment_analyzer: try: result = self.sentiment_analyzer(text)[0] label = result['label'].lower() score = result['score'] if 'positive' in label: return score elif 'negative' in label: return -score else: return 0.0 except Exception as e: logging.error(f"[SentimentAgent] Error: {e}") return self._fallback_analyze(text) else: return self._fallback_analyze(text) def _fallback_analyze(self, text): """Fallback sentiment analysis using NLTK VADER""" try: from nltk.sentiment import SentimentIntensityAnalyzer analyzer = SentimentIntensityAnalyzer() scores = analyzer.polarity_scores(text) return scores['compound'] # Returns value between -1 and 1 except Exception as e: logging.error(f"[SentimentAgent] Fallback failed: {e}") return 0.0 def smoke_test(self): """Perform smoke test to verify sentiment analysis is working""" try: test_text = "I am happy and excited!" result = self.analyze(test_text) if result is not None and isinstance(result, (int, float)): logging.info("[SentimentAgent] ✓ Smoke test passed") return True else: logging.warning("[SentimentAgent] ⚠ Smoke test failed - invalid result") return False except Exception as e: logging.error(f"[SentimentAgent] ✗ Smoke test failed: {e}") return False def is_ready(self): """Check if agent is ready""" return self.ready # --- 1. AI Core --- class GalateaAI: def __init__(self): # Load model configuration first self.config = MODEL_CONFIG or {} self.knowledge_base = {} self.response_model = "A generic response" #Place Holder for the ML model # Conversation history for context self.conversation_history = [] # List of {"role": "user"/"assistant", "content": "..."} # Get max history length from config or use default self.max_history_length = self.config.get('conversation', {}).get('max_history_length', 20) # Initialize memory system logging.info("Initializing memory system (JSON)...") try: self.memory_system = MemorySystem(config=self.config) self.memory_system_ready = self.memory_system.is_ready() if not self.memory_system_ready: raise Exception("Memory system failed to initialize") logging.info("✓ Memory system initialized") except Exception as e: logging.error(f"Failed to initialize memory system: {e}") self.memory_system_ready = False raise # Initialize agents logging.info("Initializing agents...") self.memory_agent = MemoryAgent(self.memory_system, config=self.config) self.gemini_agent = GeminiThinkingAgent(config=self.config) self.pi_agent = PiResponseAgent(config=self.config) self.emotional_agent = EmotionalStateAgent(config=self.config) self.sentiment_agent = SentimentAgent(config=self.config) # Track initialization status self.memory_system_ready = self.memory_agent.is_ready() self.sentiment_analyzer_ready = self.sentiment_agent.is_ready() self.models_ready = self.gemini_agent.is_ready() or self.pi_agent.is_ready() self.api_keys_valid = self.gemini_agent.is_ready() or self.pi_agent.is_ready() # Legacy compatibility self.gemini_available = self.gemini_agent.is_ready() self.inflection_ai_available = self.pi_agent.is_ready() self.quantum_random_available = self.emotional_agent.quantum_random_available logging.info("✓ All agents initialized") def _check_pre_initialization(self): """Check if components were pre-initialized by initialize_galatea.py""" # Check if ChromaDB directory exists and has collection chromadb_path = "./chroma_db" if os.path.exists(chromadb_path): try: import chromadb from chromadb.config import Settings vector_db = chromadb.PersistentClient( path=chromadb_path, settings=Settings(anonymized_telemetry=False) ) collection = vector_db.get_collection("galatea_memory") if collection: logging.info("✓ Pre-initialized ChromaDB detected") return True except Exception: pass # Check if JSON memory exists if os.path.exists("./memory.json"): logging.info("✓ Pre-initialized JSON memory detected") return True return False def is_fully_initialized(self): """Check if all components are fully initialized""" return ( self.memory_system_ready and self.sentiment_analyzer_ready and self.models_ready and self.api_keys_valid ) def get_initialization_status(self): """Get detailed initialization status""" smoke_tests = getattr(self, 'smoke_test_results', {}) return { "memory_system": self.memory_system_ready, "sentiment_analyzer": self.sentiment_analyzer_ready, "models": self.models_ready, "api_keys": self.api_keys_valid, "gemini_available": self.gemini_agent.is_ready() if hasattr(self, 'gemini_agent') else False, "inflection_ai_available": self.pi_agent.is_ready() if hasattr(self, 'pi_agent') else False, "azure_text_analytics_available": self.sentiment_agent.azure_agent.is_ready() if hasattr(self, 'sentiment_agent') else False, "smoke_tests": smoke_tests, "fully_initialized": self.is_fully_initialized() } @property def emotional_state(self): """Get current emotional state from EmotionalStateAgent""" return self.emotional_agent.get_state() if hasattr(self, 'emotional_agent') else {"joy": 0.2, "sadness": 0.2, "anger": 0.2, "fear": 0.2, "curiosity": 0.2} def initialize_sentiment_analyzer(self): """Initialize sentiment analysis with fallback options""" self.sentiment_analyzer_ready = False # Get sentiment model from config sentiment_model = self.config.get('sentiment', {}).get('primary_model', 'distilbert/distilbert-base-uncased-finetuned-sst-2-english') if self.config else 'distilbert/distilbert-base-uncased-finetuned-sst-2-english' if transformers_available: try: logging.info("Attempting to initialize Hugging Face sentiment analyzer") # Try to initialize the pipeline with specific parameters self.sentiment_analyzer = pipeline( "sentiment-analysis", model=sentiment_model ) self.sentiment_analyzer_ready = True logging.info("✓ Hugging Face sentiment analyzer loaded successfully") except Exception as e: logging.error(f"Failed to initialize Hugging Face sentiment analyzer: {e}") self.sentiment_analyzer = None # Still mark as ready since we have fallback self.sentiment_analyzer_ready = True logging.info("✓ Using fallback sentiment analyzer") else: self.sentiment_analyzer = None self.sentiment_analyzer_ready = True # Fallback available logging.info("✓ Using fallback sentiment analyzer") def analyze_sentiment(self, text): # Use Hugging Face if available if self.sentiment_analyzer is not None: try: result = self.sentiment_analyzer(text)[0] sentiment = result['label'] score = result['score'] if sentiment == 'POSITIVE': return score else: return -score except Exception as e: logging.error(f"Error in sentiment analysis: {e}") # Fall back to simple analysis # Simple fallback sentiment analysis positive_words = ['good', 'great', 'excellent', 'happy', 'joy', 'love', 'like', 'wonderful'] negative_words = ['bad', 'terrible', 'sad', 'hate', 'dislike', 'awful', 'poor', 'angry'] words = text.lower().split() sentiment_score = 0.0 for word in words: if word in positive_words: sentiment_score += 0.2 elif word in negative_words: sentiment_score -= 0.2 return max(-1.0, min(1.0, sentiment_score)) # Clamp between -1 and 1 def initialize_litellm(self): """Initialize LiteLLM for unified model management""" self.gemini_available = False self.inflection_ai_available = False self.quantum_random_available = False self.models_ready = False self.api_keys_valid = False # Check for Gemini API key gemini_key = os.getenv("GEMINI_API_KEY") if gemini_key: os.environ["GEMINI_API_KEY"] = gemini_key self.gemini_available = True logging.info("✓ Gemini API key found - Gemini models available via LiteLLM") else: logging.warning("GEMINI_API_KEY not found - Gemini models unavailable") # Check for Inflection AI API key inflection_key = os.getenv("INFLECTION_AI_API_KEY") if inflection_key: self.inflection_ai_api_key = inflection_key self.inflection_ai_available = True logging.info("✓ Inflection AI API key found - Pi-3.1 model available") else: logging.warning("INFLECTION_AI_API_KEY not found - Pi-3.1 model unavailable") # Check for Quantum Random Numbers API key quantum_key = os.getenv("ANU_QUANTUM_API_KEY") if quantum_key: self.quantum_api_key = quantum_key self.quantum_random_available = True logging.info("✓ ANU Quantum Numbers API key found - Quantum randomness available") else: logging.warning("ANU_QUANTUM_API_KEY not found - Quantum randomness unavailable") # Verify API keys are valid (at least one model API key must be present) self.api_keys_valid = self.gemini_available or self.inflection_ai_available if self.api_keys_valid: logging.info("✓ API keys validated - at least one model API key is available") else: logging.error("✗ No valid API keys found - models unavailable") # Models are ready if at least one is available self.models_ready = self.gemini_available or self.inflection_ai_available if self.models_ready: logging.info("✓ Models ready for use") else: logging.warning("⚠ No models available") def get_quantum_random_numbers(self, length=None, number_type=None): """Fetch quantum random numbers from ANU Quantum Numbers API""" if not self.quantum_random_available: logging.warning("Quantum random numbers unavailable, using fallback") return None # Get defaults from config quantum_config = self.config.get('quantum', {}) if self.config else {} if length is None: length = quantum_config.get('default_length', 128) if number_type is None: number_type = quantum_config.get('default_type', 'uint8') try: url = quantum_config.get('api_endpoint', 'https://api.quantumnumbers.anu.edu.au') headers = { "x-api-key": self.quantum_api_key } params = { "length": length, "type": number_type } response = requests.get(url, headers=headers, params=params, timeout=10) if response.status_code == 200: result = response.json() if result.get('success') and 'data' in result: logging.info(f"✓ Retrieved {len(result['data'])} quantum random numbers") return result['data'] else: logging.warning("Quantum API returned success but no data") return None else: logging.error(f"Quantum API returned status code {response.status_code}: {response.text}") return None except Exception as e: logging.error(f"Error fetching quantum random numbers: {e}") return None def get_quantum_random_float(self, min_val=0.0, max_val=1.0): """Get a quantum random float between min_val and max_val""" quantum_nums = self.get_quantum_random_numbers(length=1, number_type='uint8') if quantum_nums and len(quantum_nums) > 0: # Normalize uint8 (0-255) to float range normalized = quantum_nums[0] / 255.0 return min_val + (max_val - min_val) * normalized # Fallback to regular random return random.uniform(min_val, max_val) def call_inflection_ai(self, user_input, emotional_state, thinking_context=None, conversation_history=None, retrieved_memories=None): """Call Inflection AI Pi-3.1 model API with conversation context, thinking insights, and retrieved memories""" if not self.inflection_ai_available: return None try: # Create context with emotional state emotions_text = ", ".join([f"{emotion}: {value:.2f}" for emotion, value in emotional_state.items()]) # Format the request according to Inflection AI API # Get endpoint and config from YAML inflection_config = self.config.get('inflection_ai', {}) if self.config else {} url = inflection_config.get('api_endpoint', 'https://api.inflection.ai/external/api/inference') model_config = inflection_config.get('model_config', 'Pi-3.1') headers = { "Authorization": f"Bearer {self.inflection_ai_api_key}", "Content-Type": "application/json" } # Build comprehensive context with thinking insights, conversation history, and retrieved memories context_parts = [] # Base system context base_context = f"You are Galatea, an AI assistant with the following emotional state: {emotions_text}. Respond in character as Galatea. Keep your response concise (under 50 words) and reflect your emotional state in your tone." # Add thinking context from Gemini if available if thinking_context: base_context += f"\n\nInternal analysis: {thinking_context}" # Add retrieved memories if available if retrieved_memories and len(retrieved_memories) > 0: memory_text = "\n\nRelevant context from past conversations:\n" for i, memory in enumerate(retrieved_memories[:3], 1): # Top 3 most relevant memory_text += f"{i}. {memory['text'][:150]}...\n" base_context += memory_text # Add conversation history context if conversation_history and len(conversation_history) > 0: recent_history = conversation_history[-4:] # Last 2 exchanges history_text = "\n\nRecent conversation context:\n" for msg in recent_history: role = "User" if msg["role"] == "user" else "You (Galatea)" history_text += f"{role}: {msg['content']}\n" base_context += history_text context_parts.append({ "text": base_context, "type": "System" }) # Add conversation history as context messages if conversation_history and len(conversation_history) > 4: # Add older messages as context (but not the most recent ones we already included) for msg in conversation_history[-8:-4]: context_parts.append({ "text": msg["content"], "type": "Human" if msg["role"] == "user" else "Assistant" }) # Add current user input context_parts.append({ "text": user_input, "type": "Human" }) data = { "context": context_parts, "config": model_config } logging.info("Sending request to Inflection AI Pi-3.1 API") response = requests.post(url, headers=headers, json=data, timeout=30) if response.status_code == 200: result = response.json() # Extract the response text from the API response if isinstance(result, dict): if 'output' in result: text = result['output'] elif 'text' in result: text = result['text'] elif 'response' in result: text = result['response'] elif 'message' in result: text = result['message'] else: text = str(result) elif isinstance(result, str): text = result else: text = str(result) logging.info("Inflection AI response received successfully") return text.strip() else: logging.error(f"Inflection AI API returned status code {response.status_code}: {response.text}") return None except Exception as e: logging.error(f"Error calling Inflection AI API: {e}") logging.error(f"Full error details: {type(e).__name__}: {str(e)}") return None def gemini_think(self, user_input, emotional_state, conversation_history, retrieved_memories=None): """Use Gemini to think about and analyze the conversation context with retrieved memories""" if not self.gemini_available: return None try: # Build thinking prompt with conversation context emotions_text = ", ".join([f"{emotion}: {value:.2f}" for emotion, value in emotional_state.items()]) # Prepare conversation context for thinking context_summary = "" if conversation_history: recent_history = conversation_history[-6:] # Last 3 exchanges context_summary = "\nRecent conversation:\n" for msg in recent_history: role = "User" if msg["role"] == "user" else "Galatea" context_summary += f"{role}: {msg['content']}\n" # Add retrieved memories if available memory_context = "" if retrieved_memories and len(retrieved_memories) > 0: memory_context = "\n\nRelevant memories from past conversations:\n" for i, memory in enumerate(retrieved_memories[:3], 1): # Top 3 most relevant memory_context += f"{i}. {memory['text'][:200]}...\n" thinking_prompt = f"""You are the internal reasoning system for Galatea, an AI assistant. Current emotional state: {emotions_text} {context_summary} {memory_context} Current user message: "{user_input}" Analyze this conversation and provide: 1. Key insights about what the user is asking or discussing 2. Important context from the conversation history and retrieved memories 3. How Galatea should respond emotionally and contextually 4. Any important details to remember or reference Keep your analysis concise (2-3 sentences). Focus on what matters for crafting an appropriate response.""" messages = [ {"role": "system", "content": "You are an internal reasoning system. Analyze conversations and provide insights."}, {"role": "user", "content": thinking_prompt} ] logging.info("Using Gemini for thinking/analysis") # Get Gemini models from config gemini_config = self.config.get('gemini', {}) if self.config else {} gemini_models = gemini_config.get('thinking_models', [ "gemini/gemini-2.0-flash-exp", "gemini/gemini-2.0-flash", "gemini/gemini-1.5-flash-latest", "gemini/gemini-1.5-flash" ]) # Get thinking settings from config thinking_config = gemini_config.get('thinking', {}) thinking_temp = thinking_config.get('temperature', 0.5) thinking_max_tokens = thinking_config.get('max_tokens', 200) for model in gemini_models: try: response = completion( model=model, messages=messages, temperature=thinking_temp, max_tokens=thinking_max_tokens ) if response and 'choices' in response and len(response['choices']) > 0: thinking_result = response['choices'][0]['message']['content'] logging.info("✓ Gemini thinking completed") return thinking_result.strip() except Exception as e: logging.warning(f"Gemini model {model} failed for thinking: {e}, trying next...") continue logging.error("All Gemini models failed for thinking") return None except Exception as e: logging.error(f"Error in Gemini thinking: {e}") return None def update_conversation_history(self, user_input, assistant_response): """Update conversation history, maintaining max length""" # Add user message self.conversation_history.append({"role": "user", "content": user_input}) # Add assistant response self.conversation_history.append({"role": "assistant", "content": assistant_response}) # Trim history if too long if len(self.conversation_history) > self.max_history_length: # Keep the most recent messages self.conversation_history = self.conversation_history[-self.max_history_length:] def store_important_memory(self, user_input, assistant_response, intent, keywords): """Store important conversation snippets in memory system""" try: # Determine if this conversation is worth storing # Store if: question, contains important keywords, or is a significant exchange should_store = False memory_type = "conversation" if intent == "question": should_store = True memory_type = "question" elif len(keywords) > 3: # Substantial conversation should_store = True elif any(keyword in ["remember", "important", "note", "save"] for keyword in keywords): should_store = True memory_type = "important" if should_store: # Create a memory entry combining user input and response memory_text = f"User: {user_input}\nGalatea: {assistant_response}" metadata = { "intent": intent, "keywords": keywords[:5], # Top 5 keywords "emotions": {k: round(v, 2) for k, v in self.emotional_state.items()} } # Store in memory system (both ChromaDB and JSON) self.memory_system.store_memory( text=memory_text, metadata=metadata, memory_type=memory_type ) logging.info(f"Stored important memory: {memory_type} - {user_input[:50]}...") except Exception as e: logging.error(f"Error storing memory: {e}") def is_thinking_mode(self, intent, user_input, keywords): """Determine if the request requires thinking mode (use Gemini for complex reasoning)""" # Always use thinking mode now - Gemini always thinks, Pi-3.1 always responds return True def process_input(self, user_input): """Process user input through the agent chain workflow: PHI(GEMINI(User inputs, read with past memory), emotionalstate)""" # Step 1: Analyze sentiment sentiment_score = self.sentiment_agent.analyze(user_input) # Step 2: Extract keywords and determine intent keywords = self.extract_keywords(user_input) intent = self.determine_intent(user_input) # Step 3: Update emotional state based on sentiment self.emotional_agent.update_with_sentiment(sentiment_score) current_emotional_state = self.emotional_agent.get_state() # Step 4: Retrieve memories retrieved_memories = self.memory_agent.retrieve_memories(user_input) # Step 5: Chain workflow: PHI(GEMINI(User inputs, read with past memory), emotionalstate) # Step 5a: GEMINI(User inputs, read with past memory) thinking_context = self.gemini_agent.think( user_input, current_emotional_state, self.conversation_history, retrieved_memories=retrieved_memories ) # Step 5b: PHI(GEMINI result, emotionalstate) response = self.pi_agent.respond( user_input, current_emotional_state, thinking_context=thinking_context, conversation_history=self.conversation_history, retrieved_memories=retrieved_memories ) # Fallback if Pi-3.1 is not available if not response and self.gemini_agent.is_ready(): response = self._gemini_fallback_response( user_input, current_emotional_state, thinking_context, self.conversation_history ) # If still no response, use fallback if not response: response = self._generate_fallback_response(intent, keywords, current_emotional_state, user_input) # Update conversation history if response: self.update_conversation_history(user_input, response) # Store important memories self._store_important_memory(user_input, response, intent, keywords) # Update knowledge base self.update_knowledge(keywords, user_input) return response return response def extract_keywords(self, text): try: # Try using NLTK's tokenizer tokens = nltk.word_tokenize(text) keywords = [word.lower() for word in tokens if word.isalnum()] return keywords except Exception: # Fall back to a simple split-based approach if NLTK fails words = text.split() # Clean up words (remove punctuation) keywords = [word.lower().strip('.,!?;:()[]{}""\'') for word in words] # Filter out empty strings keywords = [word for word in keywords if word and word.isalnum()] return keywords def determine_intent(self, text): # More comprehensive intent recognition (using keywords) text = text.lower() if "what" in text or "how" in text or "why" in text: return "question" elif "thank" in text: return "gratitude" elif "goodbye" in text or "bye" in text: return "farewell" else: return "statement" def _gemini_fallback_response(self, user_input, emotional_state, thinking_context, conversation_history): """Fallback response using Gemini directly""" try: logging.info("[GalateaAI] Using Gemini fallback for direct response") emotions_text = ", ".join([f"{emotion}: {value:.2f}" for emotion, value in emotional_state.items()]) # Build messages with conversation history messages = [] # Get system prompts from config system_prompts = self.config.get('system_prompts', {}) if self.config else {} identity = system_prompts.get('galatea_identity', 'You are Galatea, an AI assistant with emotional awareness and memory.') style = system_prompts.get('response_style', 'Respond in character, keeping responses concise (under 50 words).') messages.append({ "role": "system", "content": f"{identity} Your emotional state: {emotions_text}. {style}" }) # Get fallback settings from config gemini_config = self.config.get('gemini', {}) if self.config else {} fallback_config = gemini_config.get('fallback', {}) max_history_exchanges = fallback_config.get('max_history_exchanges', 8) fallback_model = gemini_config.get('fallback_model', 'gemini/gemini-1.5-flash') # Add conversation history if conversation_history: for msg in conversation_history[-max_history_exchanges:]: messages.append({ "role": msg["role"], "content": msg["content"] }) # Add current user input messages.append({ "role": "user", "content": user_input }) # Add thinking context if available if thinking_context: messages.append({ "role": "system", "content": f"Internal analysis: {thinking_context}" }) # Use quantum randomness for temperature base_temperature = fallback_config.get('temperature_base', 0.7) temp_range = fallback_config.get('temperature_variation_range', [0.0, 0.3]) quantum_temp_variation = self.emotional_agent.get_quantum_random_float(temp_range[0], temp_range[1]) temperature = base_temperature + quantum_temp_variation response = completion( model=fallback_model, messages=messages, temperature=temperature, max_tokens=fallback_config.get('max_tokens', 150) ) if response and 'choices' in response and len(response['choices']) > 0: text = response['choices'][0]['message']['content'] logging.info("[GalateaAI] ✓ Gemini fallback response received") return text.strip() except Exception as e: logging.error(f"[GalateaAI] Gemini fallback failed: {e}") return None def _generate_fallback_response(self, intent, keywords, emotional_state, original_input): """Generate final fallback response when all systems fail""" logging.info(f"[GalateaAI] Using final fallback response. Intent: {intent}, Keywords: {keywords[:5]}") # Determine which systems are not working unavailable_systems = [] system_descriptions = { 'inflection_ai': ('Pi-3.1', 'my conversation model'), 'gemini': ('Gemini', 'my thinking model'), 'quantum_random': ('Quantum Random Numbers API', 'my quantum randomness source'), 'memory': ('Memory System', 'my memory system') } if not getattr(self, 'inflection_ai_available', False): unavailable_systems.append(system_descriptions['inflection_ai']) if not getattr(self, 'gemini_available', False): unavailable_systems.append(system_descriptions['gemini']) if not getattr(self, 'quantum_random_available', False): unavailable_systems.append(system_descriptions['quantum_random']) if not getattr(self, 'memory_system_ready', False): unavailable_systems.append(system_descriptions['memory']) # Generate natural, conversational error message if unavailable_systems: if len(unavailable_systems) == 1: system_name, system_desc = unavailable_systems[0] system_msg = f"{system_desc} ({system_name}) is not working right now" elif len(unavailable_systems) == 2: sys1_name, sys1_desc = unavailable_systems[0] sys2_name, sys2_desc = unavailable_systems[1] system_msg = f"{sys1_desc} ({sys1_name}) and {sys2_desc} ({sys2_name}) are not working" else: # For 3+ systems, list them naturally system_list = [] for sys_name, sys_desc in unavailable_systems[:-1]: system_list.append(f"{sys_desc} ({sys_name})") last_name, last_desc = unavailable_systems[-1] system_msg = f"{', '.join(system_list)}, and {last_desc} ({last_name}) are not working" else: system_msg = "some of my systems encountered an error" fallback_response = None if intent == "question": if "you" in keywords: fallback_response = f"I'm still learning about myself, but I'm having technical difficulties. {system_msg.capitalize()}. I apologize for the inconvenience." else: fallback_response = f"I'd love to help with that, but {system_msg}. Please check my system status or try again in a moment." elif intent == "gratitude": fallback_response = "You're welcome!" else: if unavailable_systems: fallback_response = f"I hear you, but {system_msg}. This might be due to missing API keys or network issues. Please check my configuration." else: fallback_response = "I hear you, though my full AI capabilities aren't active right now. Please check if my API keys are configured." # Update conversation history even for fallback if fallback_response: self.update_conversation_history(original_input, fallback_response) return fallback_response def update_knowledge(self, keywords, user_input): #for new key words remember them for keyword in keywords: if keyword not in self.knowledge_base: self.knowledge_base[keyword] = user_input # --- 2. Dialogue Engine --- class DialogueEngine: def __init__(self, ai_core): self.ai_core = ai_core self.last_user_message = "" def get_response(self, user_input): # Store the last message for sentiment analysis self.last_user_message = user_input ai_response = self.ai_core.process_input(user_input) styled_response = self.apply_style(ai_response, self.ai_core.emotional_state) return styled_response def apply_style(self, text, emotional_state): style = self.get_style(emotional_state) #selects styles based on emotions #add style to text styled_text = text # Remove the style suffix to make responses cleaner return styled_text def get_style(self, emotional_state): #determine style based on the state of the AI return "neutral" # --- 3. Avatar Engine --- class AvatarShape(Enum): #create shape types for the avatar CIRCLE = "Circle" TRIANGLE = "Triangle" SQUARE = "Square" class AvatarEngine: def __init__(self): self.avatar_model = "Circle" # Start with a basic shape self.expression_parameters = {} def update_avatar(self, emotional_state): # Map emotions to avatar parameters (facial expressions, color) joy_level = emotional_state["joy"] sadness_level = emotional_state["sadness"] # Simple mapping (placeholder) self.avatar_model = self.change_avatar_shape(joy_level, sadness_level) def change_avatar_shape(self, joy, sad): #determine shape based on feelings if joy > 0.5: return AvatarShape.CIRCLE.value elif sad > 0.5: return AvatarShape.TRIANGLE.value else: return AvatarShape.SQUARE.value def render_avatar(self): # Simple console rendering of the avatar state print(f"Avatar shape: {self.avatar_model}") # REMOVE THE MAIN PROGRAM LOOP THAT BLOCKS EXECUTION # This is critical - the code below was causing the issue # by creating instances outside of the Flask app's control # instead, only run this if the script is executed directly if __name__ == "__main__": # Download NLTK data again before starting the main loop to ensure availability nltk.download('punkt', quiet=True) try: nltk.data.find("tokenizers/punkt") except LookupError: nltk.download('punkt') #Create galatea_ai = GalateaAI() dialogue_engine = DialogueEngine(galatea_ai) avatar_engine = AvatarEngine() avatar_engine.update_avatar(galatea_ai.emotional_state) # Initial avatar rendering avatar_engine.render_avatar() while True: user_input = input("You: ") response = dialogue_engine.get_response(user_input) print(f"Galatea: {response}") avatar_engine.update_avatar(galatea_ai.emotional_state) avatar_engine.render_avatar()