Spaces:
Running
Running
File size: 6,999 Bytes
0646b18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 |
import functools
import json
from typing import Literal, Dict, Callable
from langchain_core.messages import AIMessage
from langgraph.types import Command
from cuga.backend.activity_tracker.tracker import ActivityTracker, Step
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.final_answer_agent import FinalAnswerAgent
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.prompts.load_prompt import FinalAnswerOutput
from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode
from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import (
create_save_reuse_action,
create_get_more_utterances,
)
from cuga.backend.cuga_graph.state.agent_state import AgentState
from cuga.config import settings
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds, MessagePrefixes
tracker = ActivityTracker()
# Feature flag for human-in-the-loop functionality
ENABLE_SAVE_REUSE = settings.features.save_reuse
class HumanInTheLoopHandler:
"""Simple handler for human-in-the-loop interactions"""
def __init__(self):
self._action_handlers: Dict[str, Callable] = {
ActionIds.SAVE_REUSE: self._handle_save_reuse,
ActionIds.SAVE_REUSE_INTENT: self._handle_save_reuse_intent,
}
def handle_human_response(self, state: AgentState, node_name: str) -> Command:
"""Handle any human response based on action_id"""
action_id = state.hitl_response.action_id
if action_id in self._action_handlers:
return self._action_handlers[action_id](state, node_name)
# Default fallback
return Command(update=state.model_dump(), goto=NodeNames.END)
def add_action_handler(self, action_id: str, handler: Callable):
"""Add a custom action handler"""
self._action_handlers[action_id] = handler
def _handle_save_reuse(self, state: AgentState, node_name: str) -> Command:
"""Handle save/reuse action - get more utterances"""
state.hitl_action = create_get_more_utterances()
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
def _handle_save_reuse_intent(self, state: AgentState, node_name: str) -> Command:
"""Handle save/reuse intent - go to reuse agent"""
state.sender = node_name
return Command(update=state.model_dump(), goto=NodeNames.REUSE_AGENT)
class FinalAnswerNode(BaseNode):
def __init__(self, final_answer_agent: FinalAnswerAgent):
super().__init__()
self.final_answer_agent = final_answer_agent
self.hitl_handler = HumanInTheLoopHandler()
self.node = functools.partial(
FinalAnswerNode.node_handler,
agent=self.final_answer_agent,
name=self.final_answer_agent.name,
hitl_handler=self.hitl_handler,
)
@staticmethod
async def node_handler(
state: AgentState, agent: FinalAnswerAgent, name: str, hitl_handler: HumanInTheLoopHandler
) -> Command[Literal["__end__", "SuggestHumanActions", "ReuseAgent"]]:
# Handle human responses (only if HITL is enabled)
if ENABLE_SAVE_REUSE and state.sender == NodeNames.WAIT_FOR_RESPONSE:
return hitl_handler.handle_human_response(state, name)
# Handle direct chat calls (no processing needed)
if state.sender == NodeNames.CHAT_AGENT:
state.sender = name
final_answer_content = state.chat_agent_messages[-1].content
state.final_answer = final_answer_content
final_answer_output = FinalAnswerOutput(
thoughts=["Chat response provided directly."], final_answer=final_answer_content
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)
# Handle TaskAnalyzerAgent when final_answer is already set (no apps matched)
if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer:
state.sender = name
final_answer_output = FinalAnswerOutput(
thoughts=[
"No applications matched the request. Providing available applications information."
],
final_answer=state.final_answer,
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)
if state.sender == NodeNames.CUGA_LITE:
state.sender = name
state.final_answer = state.final_answer
state.sender = name
final_answer_output = FinalAnswerOutput(
thoughts=[],
final_answer=state.final_answer,
)
state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
return Command(update=state.model_dump(), goto=NodeNames.END)
# Main processing: generate final answer
await FinalAnswerNode._generate_final_answer(state, agent, name)
# Route based on sender (only suggest human actions if HITL is enabled)
# Allow save/reuse from both PlanControllerAgent (task decomposition mode) and ChatAgent (chat mode)
if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT:
state.hitl_action = create_save_reuse_action()
state.sender = name
return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
else:
return Command(update=state.model_dump(), goto=NodeNames.END)
@staticmethod
async def _generate_final_answer(state: AgentState, agent: FinalAnswerAgent, name: str):
"""Generate and process the final answer"""
# Run the agent
response = await agent.run(state)
state.messages.append(response)
# Parse and process output
final_answer_output = FinalAnswerOutput(**json.loads(response.content))
# Add to chat if enabled
if settings.features.chat:
chat_message = f"{MessagePrefixes.ANSWER_PREFIX}{final_answer_output.final_answer}"
state.append_to_last_chat_message(chat_message)
# Track the step
tracker.collect_step(Step(name=name, data=final_answer_output.model_dump_json()))
# Replace variables and update state
final_answer_output.final_answer = state.variables_manager.replace_variables_placeholders(
final_answer_output.final_answer
)
state.final_answer = final_answer_output.final_answer
|