import functools import json from typing import Literal, Dict, Callable from langchain_core.messages import AIMessage from langgraph.types import Command from cuga.backend.activity_tracker.tracker import ActivityTracker, Step from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.final_answer_agent import FinalAnswerAgent from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.prompts.load_prompt import FinalAnswerOutput from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import ( create_save_reuse_action, create_get_more_utterances, ) from cuga.backend.cuga_graph.state.agent_state import AgentState from cuga.config import settings from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds, MessagePrefixes tracker = ActivityTracker() # Feature flag for human-in-the-loop functionality ENABLE_SAVE_REUSE = settings.features.save_reuse class HumanInTheLoopHandler: """Simple handler for human-in-the-loop interactions""" def __init__(self): self._action_handlers: Dict[str, Callable] = { ActionIds.SAVE_REUSE: self._handle_save_reuse, ActionIds.SAVE_REUSE_INTENT: self._handle_save_reuse_intent, } def handle_human_response(self, state: AgentState, node_name: str) -> Command: """Handle any human response based on action_id""" action_id = state.hitl_response.action_id if action_id in self._action_handlers: return self._action_handlers[action_id](state, node_name) # Default fallback return Command(update=state.model_dump(), goto=NodeNames.END) def add_action_handler(self, action_id: str, handler: Callable): """Add a custom action handler""" self._action_handlers[action_id] = handler def _handle_save_reuse(self, state: AgentState, node_name: str) -> Command: """Handle save/reuse action - get more utterances""" state.hitl_action = create_get_more_utterances() state.sender = node_name return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS) def _handle_save_reuse_intent(self, state: AgentState, node_name: str) -> Command: """Handle save/reuse intent - go to reuse agent""" state.sender = node_name return Command(update=state.model_dump(), goto=NodeNames.REUSE_AGENT) class FinalAnswerNode(BaseNode): def __init__(self, final_answer_agent: FinalAnswerAgent): super().__init__() self.final_answer_agent = final_answer_agent self.hitl_handler = HumanInTheLoopHandler() self.node = functools.partial( FinalAnswerNode.node_handler, agent=self.final_answer_agent, name=self.final_answer_agent.name, hitl_handler=self.hitl_handler, ) @staticmethod async def node_handler( state: AgentState, agent: FinalAnswerAgent, name: str, hitl_handler: HumanInTheLoopHandler ) -> Command[Literal["__end__", "SuggestHumanActions", "ReuseAgent"]]: # Handle human responses (only if HITL is enabled) if ENABLE_SAVE_REUSE and state.sender == NodeNames.WAIT_FOR_RESPONSE: return hitl_handler.handle_human_response(state, name) # Handle direct chat calls (no processing needed) if state.sender == NodeNames.CHAT_AGENT: state.sender = name final_answer_content = state.chat_agent_messages[-1].content state.final_answer = final_answer_content final_answer_output = FinalAnswerOutput( thoughts=["Chat response provided directly."], final_answer=final_answer_content ) state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name)) tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json())) return Command(update=state.model_dump(), goto=NodeNames.END) # Handle TaskAnalyzerAgent when final_answer is already set (no apps matched) if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer: state.sender = name final_answer_output = FinalAnswerOutput( thoughts=[ "No applications matched the request. Providing available applications information." ], final_answer=state.final_answer, ) state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name)) tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json())) return Command(update=state.model_dump(), goto=NodeNames.END) if state.sender == NodeNames.CUGA_LITE: state.sender = name state.final_answer = state.final_answer state.sender = name final_answer_output = FinalAnswerOutput( thoughts=[], final_answer=state.final_answer, ) state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name)) tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json())) return Command(update=state.model_dump(), goto=NodeNames.END) # Main processing: generate final answer await FinalAnswerNode._generate_final_answer(state, agent, name) # Route based on sender (only suggest human actions if HITL is enabled) # Allow save/reuse from both PlanControllerAgent (task decomposition mode) and ChatAgent (chat mode) if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT: state.hitl_action = create_save_reuse_action() state.sender = name return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS) else: return Command(update=state.model_dump(), goto=NodeNames.END) @staticmethod async def _generate_final_answer(state: AgentState, agent: FinalAnswerAgent, name: str): """Generate and process the final answer""" # Run the agent response = await agent.run(state) state.messages.append(response) # Parse and process output final_answer_output = FinalAnswerOutput(**json.loads(response.content)) # Add to chat if enabled if settings.features.chat: chat_message = f"{MessagePrefixes.ANSWER_PREFIX}{final_answer_output.final_answer}" state.append_to_last_chat_message(chat_message) # Track the step tracker.collect_step(Step(name=name, data=final_answer_output.model_dump_json())) # Replace variables and update state final_answer_output.final_answer = state.variables_manager.replace_variables_placeholders( final_answer_output.final_answer ) state.final_answer = final_answer_output.final_answer