ccm's picture
New heterogeneous managed agent
8719e81
import asyncio
import json
import os
import typing
from agent_server.agent_streaming import run_agent_stream
from agent_server.formatting_reasoning import (
_extract_final_text,
_maybe_parse_final_from_stdout,
_format_reasoning_chunk,
)
from agent_server.helpers import normalize_content_to_text, now_ts
from agent_server.openai_schemas import ChatCompletionRequest, ChatMessage
from agent_server.sanitizing_think_tags import scrub_think_tags
from agents.code_writing_agents import (
generate_code_writing_agent_without_tools,
generate_code_writing_agent_with_search,
)
from agents.generator_and_critic import generate_generator_with_managed_critic
from agents.json_tool_calling_agents import (
generate_tool_calling_agent_with_search_and_code,
)
from agents.agent_with_custom_beam_design_tools import generate_beam_agent
from agents.manager_with_heterogeneous_agents import (
generate_manager_with_heterogeneous_agents,
)
# Model name from env
MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen3-1.7B")
def normalize_model_name(raw_model: typing.Union[str, dict, None]) -> str:
"""
Accepts either a bare model string or {"id": "..."} form; default to the
local code-writing agent if unspecified.
"""
if isinstance(raw_model, dict):
return typing.cast(str, raw_model.get("id", "code-writing-agent-without-tools"))
if isinstance(raw_model, str) and raw_model.strip():
return raw_model
return "code-writing-agent-without-tools"
def is_upstream_passthrough(model_name: str) -> bool:
return model_name == MODEL_NAME
def is_upstream_passthrough_nothink(model_name: str) -> bool:
return model_name == f"{MODEL_NAME}-nothink"
def apply_nothink_to_body(
body: ChatCompletionRequest, messages: typing.List[ChatMessage]
) -> ChatCompletionRequest:
"""
Mutates message content to request 'no-think' behavior upstream.
- Sets body["model"] to AGENT_MODEL (strip -nothink)
- Appends '/nothink' to user message content
"""
new_body: ChatCompletionRequest = dict(body) # shallow copy is fine
new_body["model"] = MODEL_NAME
new_messages: typing.List[ChatMessage] = []
for msg in messages:
if msg.get("role") == "user":
content = normalize_content_to_text(msg.get("content", ""))
new_messages.append({"role": "user", "content": content + "\n/nothink"})
else:
new_messages.append(msg)
new_body["messages"] = new_messages
return new_body
def agent_for_model(model_name: str):
"""
Returns an instantiated agent for the given local model id.
Raises ValueError on unknown local ids.
"""
if model_name == "code-writing-agent-without-tools":
return generate_code_writing_agent_without_tools()
if model_name == "code-writing-agent-with-search":
return generate_code_writing_agent_with_search()
if model_name == "tool-calling-agent-with-search-and-code":
return generate_tool_calling_agent_with_search_and_code()
if model_name == "generator-with-managed-critic":
return generate_generator_with_managed_critic()
if model_name == "custom-agent-with-beam-design-tools":
return generate_beam_agent()
if model_name == "manager-with-heterogeneous-agents":
return generate_manager_with_heterogeneous_agents()
raise ValueError(f"Unknown model id: {model_name}")
def _openai_stream_base(model_name: str) -> dict:
"""
The base chunk used for all SSE deltas in streaming mode.
"""
return {
"id": f"chatcmpl-smol-{now_ts()}",
"object": "chat.completion.chunk",
"created": now_ts(),
"model": model_name,
"choices": [
{
"index": 0,
"delta": {"role": "assistant"},
"finish_reason": None,
}
],
}
def _safe_extract_candidate(val: typing.Any) -> typing.Optional[str]:
"""
Extracts a candidate final text string if present and non-empty.
"""
cand = _extract_final_text(val)
if cand and cand.strip().lower() != "none":
return cand
return None
def _truncate_reasoning_blob(reasoning: str, limit: int = 24000) -> str:
if len(reasoning) > limit:
return reasoning[:limit] + "\n… [truncated]"
return reasoning
def make_sse_generator(
task: str,
agent_for_request: typing.Any,
model_name: str,
):
"""
Returns an async generator that yields SSE 'data:' lines for FastAPI StreamingResponse.
"""
async def _gen():
base = _openai_stream_base(model_name)
# initial role header
yield f"data: {json.dumps(base)}\n\n"
reasoning_idx = 0
final_candidate: typing.Optional[str] = None
async for item in run_agent_stream(task, agent_for_request):
# Short-circuit on explicit error signaled by the runner
if isinstance(item, dict) and "__error__" in item:
error_chunk = {
**base,
"choices": [{"index": 0, "delta": {}, "finish_reason": "error"}],
}
yield f"data: {json.dumps(error_chunk)}\n\n"
yield f"data: {json.dumps({'error': item['__error__']})}\n\n"
break
# Explicit final (do not emit yet; keep last candidate)
if isinstance(item, dict) and "__final__" in item:
cand = _safe_extract_candidate(item["__final__"])
if cand:
final_candidate = cand
continue
# Live stdout -> reasoning_content
if (
isinstance(item, dict)
and "__stdout__" in item
and isinstance(item["__stdout__"], str)
):
for line in item["__stdout__"].splitlines():
parsed = _maybe_parse_final_from_stdout(line)
if parsed:
final_candidate = parsed
rt = _format_reasoning_chunk(
line, "stdout", reasoning_idx := reasoning_idx + 1
)
if rt:
r_chunk = {
**base,
"choices": [
{"index": 0, "delta": {"reasoning_content": rt}}
],
}
yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n"
continue
# Observed step -> reasoning_content
if (
isinstance(item, dict)
and "__step__" in item
and isinstance(item["__step__"], str)
):
for line in item["__step__"].splitlines():
parsed = _maybe_parse_final_from_stdout(line)
if parsed:
final_candidate = parsed
rt = _format_reasoning_chunk(
line, "step", reasoning_idx := reasoning_idx + 1
)
if rt:
r_chunk = {
**base,
"choices": [
{"index": 0, "delta": {"reasoning_content": rt}}
],
}
yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n"
continue
# Any other iterable/text from agent -> candidate answer
cand = _safe_extract_candidate(item)
if cand:
final_candidate = cand
# Cooperative scheduling
await asyncio.sleep(0)
# Emit visible answer once at the end (scrub any stray tags)
visible = scrub_think_tags(final_candidate or "")
if not visible or visible.strip().lower() == "none":
visible = "Done."
final_chunk = {**base, "choices": [{"index": 0, "delta": {"content": visible}}]}
yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n"
stop_chunk = {
**base,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(stop_chunk)}\n\n"
yield "data: [DONE]\n\n"
return _gen
async def run_non_streaming(task: str, agent_for_request: typing.Any) -> str:
"""
Runs the agent and returns a single OpenAI-style text (with optional <think> block).
"""
reasoning_lines: typing.List[str] = []
final_candidate: typing.Optional[str] = None
async for item in run_agent_stream(task, agent_for_request):
if isinstance(item, dict) and "__error__" in item:
raise Exception(item["__error__"])
if isinstance(item, dict) and "__final__" in item:
cand = _safe_extract_candidate(item["__final__"])
if cand:
final_candidate = cand
continue
if isinstance(item, dict) and "__stdout__" in item:
lines = scrub_think_tags(item["__stdout__"]).rstrip("\n").splitlines()
for line in lines:
parsed = _maybe_parse_final_from_stdout(line)
if parsed:
final_candidate = parsed
rt = _format_reasoning_chunk(line, "stdout", len(reasoning_lines) + 1)
if rt:
reasoning_lines.append(rt)
continue
if isinstance(item, dict) and "__step__" in item:
lines = scrub_think_tags(item["__step__"]).rstrip("\n").splitlines()
for line in lines:
parsed = _maybe_parse_final_from_stdout(line)
if parsed:
final_candidate = parsed
rt = _format_reasoning_chunk(line, "step", len(reasoning_lines) + 1)
if rt:
reasoning_lines.append(rt)
continue
cand = _safe_extract_candidate(item)
if cand:
final_candidate = cand
reasoning_blob = _truncate_reasoning_blob("\n".join(reasoning_lines).strip())
think_block = f"<think>\n{reasoning_blob}\n</think>\n" if reasoning_blob else ""
final_text = scrub_think_tags(final_candidate or "")
if not final_text or final_text.strip().lower() == "none":
final_text = "Done."
return f"{think_block}{final_text}"