File size: 6,999 Bytes
0646b18
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import functools
import json
from typing import Literal, Dict, Callable

from langchain_core.messages import AIMessage
from langgraph.types import Command

from cuga.backend.activity_tracker.tracker import ActivityTracker, Step
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.final_answer_agent import FinalAnswerAgent
from cuga.backend.cuga_graph.nodes.answer.final_answer_agent.prompts.load_prompt import FinalAnswerOutput
from cuga.backend.cuga_graph.nodes.shared.base_node import BaseNode
from cuga.backend.cuga_graph.nodes.human_in_the_loop.followup_model import (
    create_save_reuse_action,
    create_get_more_utterances,
)
from cuga.backend.cuga_graph.state.agent_state import AgentState
from cuga.config import settings
from cuga.backend.cuga_graph.utils.nodes_names import NodeNames, ActionIds, MessagePrefixes

tracker = ActivityTracker()

# Feature flag for human-in-the-loop functionality
ENABLE_SAVE_REUSE = settings.features.save_reuse


class HumanInTheLoopHandler:
    """Simple handler for human-in-the-loop interactions"""

    def __init__(self):
        self._action_handlers: Dict[str, Callable] = {
            ActionIds.SAVE_REUSE: self._handle_save_reuse,
            ActionIds.SAVE_REUSE_INTENT: self._handle_save_reuse_intent,
        }

    def handle_human_response(self, state: AgentState, node_name: str) -> Command:
        """Handle any human response based on action_id"""
        action_id = state.hitl_response.action_id

        if action_id in self._action_handlers:
            return self._action_handlers[action_id](state, node_name)

        # Default fallback
        return Command(update=state.model_dump(), goto=NodeNames.END)

    def add_action_handler(self, action_id: str, handler: Callable):
        """Add a custom action handler"""
        self._action_handlers[action_id] = handler

    def _handle_save_reuse(self, state: AgentState, node_name: str) -> Command:
        """Handle save/reuse action - get more utterances"""
        state.hitl_action = create_get_more_utterances()
        state.sender = node_name
        return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)

    def _handle_save_reuse_intent(self, state: AgentState, node_name: str) -> Command:
        """Handle save/reuse intent - go to reuse agent"""
        state.sender = node_name
        return Command(update=state.model_dump(), goto=NodeNames.REUSE_AGENT)


class FinalAnswerNode(BaseNode):
    def __init__(self, final_answer_agent: FinalAnswerAgent):
        super().__init__()
        self.final_answer_agent = final_answer_agent
        self.hitl_handler = HumanInTheLoopHandler()

        self.node = functools.partial(
            FinalAnswerNode.node_handler,
            agent=self.final_answer_agent,
            name=self.final_answer_agent.name,
            hitl_handler=self.hitl_handler,
        )

    @staticmethod
    async def node_handler(
        state: AgentState, agent: FinalAnswerAgent, name: str, hitl_handler: HumanInTheLoopHandler
    ) -> Command[Literal["__end__", "SuggestHumanActions", "ReuseAgent"]]:
        # Handle human responses (only if HITL is enabled)
        if ENABLE_SAVE_REUSE and state.sender == NodeNames.WAIT_FOR_RESPONSE:
            return hitl_handler.handle_human_response(state, name)

        # Handle direct chat calls (no processing needed)
        if state.sender == NodeNames.CHAT_AGENT:
            state.sender = name
            final_answer_content = state.chat_agent_messages[-1].content
            state.final_answer = final_answer_content
            final_answer_output = FinalAnswerOutput(
                thoughts=["Chat response provided directly."], final_answer=final_answer_content
            )
            state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
            tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
            return Command(update=state.model_dump(), goto=NodeNames.END)

        # Handle TaskAnalyzerAgent when final_answer is already set (no apps matched)
        if state.sender == NodeNames.TASK_ANALYZER_AGENT and state.final_answer:
            state.sender = name
            final_answer_output = FinalAnswerOutput(
                thoughts=[
                    "No applications matched the request. Providing available applications information."
                ],
                final_answer=state.final_answer,
            )
            state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
            tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
            return Command(update=state.model_dump(), goto=NodeNames.END)
        if state.sender == NodeNames.CUGA_LITE:
            state.sender = name
            state.final_answer = state.final_answer
            state.sender = name
            final_answer_output = FinalAnswerOutput(
                thoughts=[],
                final_answer=state.final_answer,
            )
            state.messages.append(AIMessage(content=final_answer_output.model_dump_json(), name=name))
            tracker.collect_step(step=Step(name=name, data=final_answer_output.model_dump_json()))
            return Command(update=state.model_dump(), goto=NodeNames.END)
        # Main processing: generate final answer
        await FinalAnswerNode._generate_final_answer(state, agent, name)

        # Route based on sender (only suggest human actions if HITL is enabled)
        # Allow save/reuse from both PlanControllerAgent (task decomposition mode) and ChatAgent (chat mode)
        if ENABLE_SAVE_REUSE and state.sender == NodeNames.PLAN_CONTROLLER_AGENT:
            state.hitl_action = create_save_reuse_action()
            state.sender = name
            return Command(update=state.model_dump(), goto=NodeNames.SUGGEST_HUMAN_ACTIONS)
        else:
            return Command(update=state.model_dump(), goto=NodeNames.END)

    @staticmethod
    async def _generate_final_answer(state: AgentState, agent: FinalAnswerAgent, name: str):
        """Generate and process the final answer"""
        # Run the agent
        response = await agent.run(state)
        state.messages.append(response)

        # Parse and process output
        final_answer_output = FinalAnswerOutput(**json.loads(response.content))

        # Add to chat if enabled
        if settings.features.chat:
            chat_message = f"{MessagePrefixes.ANSWER_PREFIX}{final_answer_output.final_answer}"
            state.append_to_last_chat_message(chat_message)

        # Track the step
        tracker.collect_step(Step(name=name, data=final_answer_output.model_dump_json()))

        # Replace variables and update state
        final_answer_output.final_answer = state.variables_manager.replace_variables_placeholders(
            final_answer_output.final_answer
        )
        state.final_answer = final_answer_output.final_answer