# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import json from config import REMINDERS from ..api.client import client from ..tools.executor import tool_execution from ..utils.time import get_current_time from ..media.message_adapter import adapt_message_format async def chat(user_message, history): if not user_message or ( isinstance(user_message, dict) and not (user_message.get("text") or user_message.get("files")) ) or (isinstance(user_message, str) and not user_message.strip()): yield [] return messages = [] messages.append({"role": "system", "content": f"Today is: {get_current_time()}\n\n{REMINDERS}"}) for history_entry in history: entry_role = history_entry.get("role") entry_content = history_entry.get("content") if entry_role == "user": adapted_content = await adapt_message_format(entry_content) messages.append({"role": "user", "content": adapted_content}) elif entry_role == "assistant": messages.append({"role": "assistant", "content": entry_content}) adapted_user_message = await adapt_message_format(user_message) messages.append({"role": "user", "content": adapted_user_message}) normal_response = "" while True: tools_mapping = [] final_response = "" finish_reason = None async for chunk in client(messages): if chunk.get("choices") and len(chunk["choices"]) > 0: choice = chunk["choices"][0] delta = choice.get("delta", {}) if choice.get("finish_reason"): finish_reason = choice["finish_reason"] if delta.get("content") is not None: final_response += delta["content"] normal_response += delta["content"] yield normal_response if delta.get("tool_calls"): for tool_delta in delta["tool_calls"]: tool_index = tool_delta.get("index", 0) while len(tools_mapping) <= tool_index: tools_mapping.append({ "id": "", "type": "function", "function": { "name": "", "arguments": "" } }) if tool_delta.get("id"): tools_mapping[tool_index]["id"] = tool_delta["id"] if tool_delta.get("function"): if tool_delta["function"].get("name"): tools_mapping[tool_index]["function"]["name"] = tool_delta["function"]["name"] if tool_delta["function"].get("arguments"): tools_mapping[tool_index]["function"]["arguments"] += tool_delta["function"]["arguments"] if tools_mapping: messages.append({ "role": "assistant", "content": final_response if final_response else None, "tool_calls": tools_mapping }) for tool_call in tools_mapping: try: tool_name = tool_call["function"]["name"] tool_args = json.loads(tool_call["function"]["arguments"]) tool_result = await tool_execution(tool_name, tool_args) messages.append({ "role": "tool", "tool_call_id": tool_call["id"], "content": tool_result }) except Exception as error: messages.append({ "role": "tool", "tool_call_id": tool_call["id"], "content": f"Error: {str(error)}" }) continue if final_response: messages.append({"role": "assistant", "content": final_response}) break if finish_reason: break yield normal_response