Spaces:
Sleeping
Sleeping
| import asyncio | |
| import json | |
| import os | |
| import typing | |
| from agent_server.agent_streaming import run_agent_stream | |
| from agent_server.formatting_reasoning import ( | |
| _extract_final_text, | |
| _maybe_parse_final_from_stdout, | |
| _format_reasoning_chunk, | |
| ) | |
| from agent_server.helpers import normalize_content_to_text, now_ts | |
| from agent_server.openai_schemas import ChatCompletionRequest, ChatMessage | |
| from agent_server.sanitizing_think_tags import scrub_think_tags | |
| from agents.code_writing_agents import ( | |
| generate_code_writing_agent_without_tools, | |
| generate_code_writing_agent_with_search, | |
| ) | |
| from agents.generator_and_critic import generate_generator_with_managed_critic | |
| from agents.json_tool_calling_agents import ( | |
| generate_tool_calling_agent_with_search_and_code, | |
| ) | |
| from agents.agent_with_custom_beam_design_tools import generate_beam_agent | |
| from agents.manager_with_heterogeneous_agents import ( | |
| generate_manager_with_heterogeneous_agents, | |
| ) | |
| # Model name from env | |
| MODEL_NAME = os.getenv("MODEL_NAME", "Qwen/Qwen3-1.7B") | |
| def normalize_model_name(raw_model: typing.Union[str, dict, None]) -> str: | |
| """ | |
| Accepts either a bare model string or {"id": "..."} form; default to the | |
| local code-writing agent if unspecified. | |
| """ | |
| if isinstance(raw_model, dict): | |
| return typing.cast(str, raw_model.get("id", "code-writing-agent-without-tools")) | |
| if isinstance(raw_model, str) and raw_model.strip(): | |
| return raw_model | |
| return "code-writing-agent-without-tools" | |
| def is_upstream_passthrough(model_name: str) -> bool: | |
| return model_name == MODEL_NAME | |
| def is_upstream_passthrough_nothink(model_name: str) -> bool: | |
| return model_name == f"{MODEL_NAME}-nothink" | |
| def apply_nothink_to_body( | |
| body: ChatCompletionRequest, messages: typing.List[ChatMessage] | |
| ) -> ChatCompletionRequest: | |
| """ | |
| Mutates message content to request 'no-think' behavior upstream. | |
| - Sets body["model"] to AGENT_MODEL (strip -nothink) | |
| - Appends '/nothink' to user message content | |
| """ | |
| new_body: ChatCompletionRequest = dict(body) # shallow copy is fine | |
| new_body["model"] = MODEL_NAME | |
| new_messages: typing.List[ChatMessage] = [] | |
| for msg in messages: | |
| if msg.get("role") == "user": | |
| content = normalize_content_to_text(msg.get("content", "")) | |
| new_messages.append({"role": "user", "content": content + "\n/nothink"}) | |
| else: | |
| new_messages.append(msg) | |
| new_body["messages"] = new_messages | |
| return new_body | |
| def agent_for_model(model_name: str): | |
| """ | |
| Returns an instantiated agent for the given local model id. | |
| Raises ValueError on unknown local ids. | |
| """ | |
| if model_name == "code-writing-agent-without-tools": | |
| return generate_code_writing_agent_without_tools() | |
| if model_name == "code-writing-agent-with-search": | |
| return generate_code_writing_agent_with_search() | |
| if model_name == "tool-calling-agent-with-search-and-code": | |
| return generate_tool_calling_agent_with_search_and_code() | |
| if model_name == "generator-with-managed-critic": | |
| return generate_generator_with_managed_critic() | |
| if model_name == "custom-agent-with-beam-design-tools": | |
| return generate_beam_agent() | |
| if model_name == "manager-with-heterogeneous-agents": | |
| return generate_manager_with_heterogeneous_agents() | |
| raise ValueError(f"Unknown model id: {model_name}") | |
| def _openai_stream_base(model_name: str) -> dict: | |
| """ | |
| The base chunk used for all SSE deltas in streaming mode. | |
| """ | |
| return { | |
| "id": f"chatcmpl-smol-{now_ts()}", | |
| "object": "chat.completion.chunk", | |
| "created": now_ts(), | |
| "model": model_name, | |
| "choices": [ | |
| { | |
| "index": 0, | |
| "delta": {"role": "assistant"}, | |
| "finish_reason": None, | |
| } | |
| ], | |
| } | |
| def _safe_extract_candidate(val: typing.Any) -> typing.Optional[str]: | |
| """ | |
| Extracts a candidate final text string if present and non-empty. | |
| """ | |
| cand = _extract_final_text(val) | |
| if cand and cand.strip().lower() != "none": | |
| return cand | |
| return None | |
| def _truncate_reasoning_blob(reasoning: str, limit: int = 24000) -> str: | |
| if len(reasoning) > limit: | |
| return reasoning[:limit] + "\n… [truncated]" | |
| return reasoning | |
| def make_sse_generator( | |
| task: str, | |
| agent_for_request: typing.Any, | |
| model_name: str, | |
| ): | |
| """ | |
| Returns an async generator that yields SSE 'data:' lines for FastAPI StreamingResponse. | |
| """ | |
| async def _gen(): | |
| base = _openai_stream_base(model_name) | |
| # initial role header | |
| yield f"data: {json.dumps(base)}\n\n" | |
| reasoning_idx = 0 | |
| final_candidate: typing.Optional[str] = None | |
| async for item in run_agent_stream(task, agent_for_request): | |
| # Short-circuit on explicit error signaled by the runner | |
| if isinstance(item, dict) and "__error__" in item: | |
| error_chunk = { | |
| **base, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "error"}], | |
| } | |
| yield f"data: {json.dumps(error_chunk)}\n\n" | |
| yield f"data: {json.dumps({'error': item['__error__']})}\n\n" | |
| break | |
| # Explicit final (do not emit yet; keep last candidate) | |
| if isinstance(item, dict) and "__final__" in item: | |
| cand = _safe_extract_candidate(item["__final__"]) | |
| if cand: | |
| final_candidate = cand | |
| continue | |
| # Live stdout -> reasoning_content | |
| if ( | |
| isinstance(item, dict) | |
| and "__stdout__" in item | |
| and isinstance(item["__stdout__"], str) | |
| ): | |
| for line in item["__stdout__"].splitlines(): | |
| parsed = _maybe_parse_final_from_stdout(line) | |
| if parsed: | |
| final_candidate = parsed | |
| rt = _format_reasoning_chunk( | |
| line, "stdout", reasoning_idx := reasoning_idx + 1 | |
| ) | |
| if rt: | |
| r_chunk = { | |
| **base, | |
| "choices": [ | |
| {"index": 0, "delta": {"reasoning_content": rt}} | |
| ], | |
| } | |
| yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n" | |
| continue | |
| # Observed step -> reasoning_content | |
| if ( | |
| isinstance(item, dict) | |
| and "__step__" in item | |
| and isinstance(item["__step__"], str) | |
| ): | |
| for line in item["__step__"].splitlines(): | |
| parsed = _maybe_parse_final_from_stdout(line) | |
| if parsed: | |
| final_candidate = parsed | |
| rt = _format_reasoning_chunk( | |
| line, "step", reasoning_idx := reasoning_idx + 1 | |
| ) | |
| if rt: | |
| r_chunk = { | |
| **base, | |
| "choices": [ | |
| {"index": 0, "delta": {"reasoning_content": rt}} | |
| ], | |
| } | |
| yield f"data: {json.dumps(r_chunk, ensure_ascii=False)}\n\n" | |
| continue | |
| # Any other iterable/text from agent -> candidate answer | |
| cand = _safe_extract_candidate(item) | |
| if cand: | |
| final_candidate = cand | |
| # Cooperative scheduling | |
| await asyncio.sleep(0) | |
| # Emit visible answer once at the end (scrub any stray tags) | |
| visible = scrub_think_tags(final_candidate or "") | |
| if not visible or visible.strip().lower() == "none": | |
| visible = "Done." | |
| final_chunk = {**base, "choices": [{"index": 0, "delta": {"content": visible}}]} | |
| yield f"data: {json.dumps(final_chunk, ensure_ascii=False)}\n\n" | |
| stop_chunk = { | |
| **base, | |
| "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}], | |
| } | |
| yield f"data: {json.dumps(stop_chunk)}\n\n" | |
| yield "data: [DONE]\n\n" | |
| return _gen | |
| async def run_non_streaming(task: str, agent_for_request: typing.Any) -> str: | |
| """ | |
| Runs the agent and returns a single OpenAI-style text (with optional <think> block). | |
| """ | |
| reasoning_lines: typing.List[str] = [] | |
| final_candidate: typing.Optional[str] = None | |
| async for item in run_agent_stream(task, agent_for_request): | |
| if isinstance(item, dict) and "__error__" in item: | |
| raise Exception(item["__error__"]) | |
| if isinstance(item, dict) and "__final__" in item: | |
| cand = _safe_extract_candidate(item["__final__"]) | |
| if cand: | |
| final_candidate = cand | |
| continue | |
| if isinstance(item, dict) and "__stdout__" in item: | |
| lines = scrub_think_tags(item["__stdout__"]).rstrip("\n").splitlines() | |
| for line in lines: | |
| parsed = _maybe_parse_final_from_stdout(line) | |
| if parsed: | |
| final_candidate = parsed | |
| rt = _format_reasoning_chunk(line, "stdout", len(reasoning_lines) + 1) | |
| if rt: | |
| reasoning_lines.append(rt) | |
| continue | |
| if isinstance(item, dict) and "__step__" in item: | |
| lines = scrub_think_tags(item["__step__"]).rstrip("\n").splitlines() | |
| for line in lines: | |
| parsed = _maybe_parse_final_from_stdout(line) | |
| if parsed: | |
| final_candidate = parsed | |
| rt = _format_reasoning_chunk(line, "step", len(reasoning_lines) + 1) | |
| if rt: | |
| reasoning_lines.append(rt) | |
| continue | |
| cand = _safe_extract_candidate(item) | |
| if cand: | |
| final_candidate = cand | |
| reasoning_blob = _truncate_reasoning_blob("\n".join(reasoning_lines).strip()) | |
| think_block = f"<think>\n{reasoning_blob}\n</think>\n" if reasoning_blob else "" | |
| final_text = scrub_think_tags(final_candidate or "") | |
| if not final_text or final_text.strip().lower() == "none": | |
| final_text = "Done." | |
| return f"{think_block}{final_text}" | |