ccm commited on
Commit
7e34bee
·
1 Parent(s): a2b9043

Reverting logging changes

Browse files
Files changed (2) hide show
  1. agent_server/agent_streaming.py +144 -300
  2. agent_server/std_tee.py +36 -251
agent_server/agent_streaming.py CHANGED
@@ -1,58 +1,152 @@
1
- # agent_server/agent_streaming.py
2
- from __future__ import annotations
3
-
4
  import asyncio
5
  import contextlib
6
  import os
7
  import threading
8
- import typing as t
 
9
 
10
  import fastapi
11
  import httpx
12
 
13
  from agent_server.helpers import sse_headers
14
  from agent_server.sanitizing_think_tags import scrub_think_tags
15
- from agent_server.std_tee import (
16
- QueueWriter,
17
- _format_reasoning_chunk,
18
- )
19
 
20
- # -----------------------------------------------------------------------------
21
- # Minimal agent streaming:
22
- # • capture ONLY stdout/stderr during agent execution
23
- # • stream normalized reasoning chunks derived from those lines
24
- # • emit a single {"__final__": ...} with the agent's returned result
25
- # -----------------------------------------------------------------------------
26
- async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
27
- """
28
- Streams compact reasoning derived from stdout/stderr lines while the agent runs.
29
- When the agent finishes, emits {"__final__": <agent_return_value>}.
30
 
31
- Yields dict events:
32
- - {"__reasoning__": "<chunk>"} # normalized, compact text
33
- - {"__error__": "<message>"} # if the agent runner throws
34
- - {"__final__": <any>} # terminal event with the returned result
 
 
 
 
35
  """
 
36
  q: asyncio.Queue = asyncio.Queue()
37
  agent_to_use = agent_obj
38
 
39
- # redirect stdout/stderr into this queue (line-buffered in QueueWriter)
 
 
40
  qwriter = QueueWriter(q)
41
 
42
- # Background runner executes the agent synchronously so stdout flushes as it prints
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
43
  def run_agent():
44
  final_result = None
45
  try:
46
- with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(qwriter):
47
- # Keep this simple: don't use stream=True or iterator-style APIs.
48
- if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
49
- final_result = agent_to_use.run(task)
50
- elif hasattr(agent_to_use, "generate") and callable(getattr(agent_to_use, "generate")):
51
- final_result = agent_to_use.generate(task)
52
- elif callable(agent_to_use):
53
- final_result = agent_to_use(task)
54
- else:
55
- raise RuntimeError("Agent object is not callable and exposes no run()/generate()")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
56
  except Exception as e:
57
  try:
58
  qwriter.flush()
@@ -71,31 +165,22 @@ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
71
  q.put_nowait({"__final__": final_result})
72
  except Exception:
73
  pass
 
74
 
75
- # Kick off the worker thread
76
- run_thread = threading.Thread(target=run_agent, name="agent-runner", daemon=True)
 
 
77
  run_thread.start()
78
 
79
- # Async consumer: convert raw stdout lines -> compact reasoning chunks
80
  while True:
81
  item = await q.get()
82
-
83
- if isinstance(item, dict) and "__stdout__" in item:
84
- # Normalize/clean the line (strips ANSI, think tags, box drawing, system prompts)
85
- chunk = _format_reasoning_chunk(item["__stdout__"], tag="stdout", idx=0)
86
- if chunk:
87
- yield {"__reasoning__": chunk}
88
- continue
89
-
90
- # Pass through non-stdout items (errors, final)
91
  yield item
92
  if isinstance(item, dict) and "__final__" in item:
93
  break
94
 
95
 
96
- # -----------------------------------------------------------------------------
97
- # Utilities: scrub nested structures of <think> tags when proxying upstream
98
- # -----------------------------------------------------------------------------
99
  def _recursively_scrub(obj):
100
  if isinstance(obj, str):
101
  return scrub_think_tags(obj)
@@ -106,10 +191,9 @@ def _recursively_scrub(obj):
106
  return obj
107
 
108
 
109
- # -----------------------------------------------------------------------------
110
- # Upstream proxy (OpenAI-compatible) with optional think-tag scrubbing
111
- # -----------------------------------------------------------------------------
112
- async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think: bool = False):
113
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
114
  headers = {
115
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
@@ -122,9 +206,12 @@ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think:
122
 
123
  async def proxy_stream():
124
  async with httpx.AsyncClient(timeout=None) as client:
125
- async with client.stream("POST", url, headers=headers, json=body) as resp:
 
 
126
  resp.raise_for_status()
127
  if scrub_think:
 
128
  async for txt in resp.aiter_text():
129
  try:
130
  cleaned = scrub_think_tags(txt)
@@ -152,249 +239,6 @@ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think:
152
  except Exception:
153
  pass
154
 
155
- return fastapi.responses.JSONResponse(status_code=r.status_code, content=payload)
156
-
157
- # import asyncio
158
- # import contextlib
159
- # import os
160
- # import threading
161
- # import time
162
- # import typing
163
- #
164
- # import fastapi
165
- # import httpx
166
- #
167
- # from agent_server.helpers import sse_headers
168
- # from agent_server.sanitizing_think_tags import scrub_think_tags
169
- # from agent_server.std_tee import QueueWriter, _serialize_step
170
- #
171
- #
172
- # async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
173
- # """
174
- # Start the agent in a worker thread.
175
- # Stream THREE sources of incremental data into the async generator:
176
- # (1) live stdout/stderr lines,
177
- # (2) newly appended memory steps (polled),
178
- # (3) any iterable the agent may yield (if supported).
179
- # Finally emit a __final__ item with the last answer.
180
- # """
181
- # loop = asyncio.get_running_loop()
182
- # q: asyncio.Queue = asyncio.Queue()
183
- # agent_to_use = agent_obj
184
- #
185
- # stop_evt = threading.Event()
186
- #
187
- # # 1) stdout/stderr live tee
188
- # qwriter = QueueWriter(q)
189
- #
190
- # # 2) memory poller
191
- # def poll_memory():
192
- # last_len = 0
193
- # while not stop_evt.is_set():
194
- # try:
195
- # steps = []
196
- # try:
197
- # # Common API: agent.memory.get_full_steps()
198
- # steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
199
- # except Exception:
200
- # # Fallbacks: different names across versions
201
- # steps = (
202
- # getattr(agent_to_use, "steps", [])
203
- # or getattr(agent_to_use, "memory", [])
204
- # or []
205
- # )
206
- # if steps is None:
207
- # steps = []
208
- # curr_len = len(steps)
209
- # if curr_len > last_len:
210
- # new = steps[last_len:curr_len]
211
- # last_len = curr_len
212
- # for s in new:
213
- # s_text = _serialize_step(s)
214
- # if s_text:
215
- # try:
216
- # q.put_nowait({"__step__": s_text})
217
- # except Exception:
218
- # pass
219
- # except Exception:
220
- # pass
221
- # time.sleep(0.10) # 100 ms cadence
222
- #
223
- # # 3) agent runner (may or may not yield)
224
- # def run_agent():
225
- # final_result = None
226
- # try:
227
- # with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
228
- # qwriter
229
- # ):
230
- # used_iterable = False
231
- # if hasattr(agent_to_use, "run") and callable(
232
- # getattr(agent_to_use, "run")
233
- # ):
234
- # try:
235
- # res = agent_to_use.run(task, stream=True)
236
- # if hasattr(res, "__iter__") and not isinstance(
237
- # res, (str, bytes)
238
- # ):
239
- # used_iterable = True
240
- # for it in res:
241
- # try:
242
- # q.put_nowait(it)
243
- # except Exception:
244
- # pass
245
- # final_result = (
246
- # None # iterable may already contain the answer
247
- # )
248
- # else:
249
- # final_result = res
250
- # except TypeError:
251
- # # run(stream=True) not supported -> fall back
252
- # pass
253
- #
254
- # if final_result is None and not used_iterable:
255
- # # Try other common streaming signatures
256
- # for name in (
257
- # "run_stream",
258
- # "stream",
259
- # "stream_run",
260
- # "run_with_callback",
261
- # ):
262
- # fn = getattr(agent_to_use, name, None)
263
- # if callable(fn):
264
- # try:
265
- # res = fn(task)
266
- # if hasattr(res, "__iter__") and not isinstance(
267
- # res, (str, bytes)
268
- # ):
269
- # for it in res:
270
- # q.put_nowait(it)
271
- # final_result = None
272
- # else:
273
- # final_result = res
274
- # break
275
- # except TypeError:
276
- # # maybe callback signature
277
- # def cb(item):
278
- # try:
279
- # q.put_nowait(item)
280
- # except Exception:
281
- # pass
282
- #
283
- # try:
284
- # fn(task, cb)
285
- # final_result = None
286
- # break
287
- # except Exception:
288
- # continue
289
- #
290
- # if final_result is None and not used_iterable:
291
- # pass # (typo guard removed below)
292
- #
293
- # if final_result is None and not used_iterable:
294
- # # Last resort: synchronous run()/generate()/callable
295
- # if hasattr(agent_to_use, "run") and callable(
296
- # getattr(agent_to_use, "run")
297
- # ):
298
- # final_result = agent_to_use.run(task)
299
- # elif hasattr(agent_to_use, "generate") and callable(
300
- # getattr(agent_to_use, "generate")
301
- # ):
302
- # final_result = agent_to_use.generate(task)
303
- # elif callable(agent_to_use):
304
- # final_result = agent_to_use(task)
305
- #
306
- # except Exception as e:
307
- # try:
308
- # qwriter.flush()
309
- # except Exception:
310
- # pass
311
- # try:
312
- # q.put_nowait({"__error__": str(e)})
313
- # except Exception:
314
- # pass
315
- # finally:
316
- # try:
317
- # qwriter.flush()
318
- # except Exception:
319
- # pass
320
- # try:
321
- # q.put_nowait({"__final__": final_result})
322
- # except Exception:
323
- # pass
324
- # stop_evt.set()
325
- #
326
- # # Kick off threads
327
- # mem_thread = threading.Thread(target=poll_memory, daemon=True)
328
- # run_thread = threading.Thread(target=run_agent, daemon=True)
329
- # mem_thread.start()
330
- # run_thread.start()
331
- #
332
- # # Async consumer
333
- # while True:
334
- # item = await q.get()
335
- # yield item
336
- # if isinstance(item, dict) and "__final__" in item:
337
- # break
338
- #
339
- #
340
- # def _recursively_scrub(obj):
341
- # if isinstance(obj, str):
342
- # return scrub_think_tags(obj)
343
- # if isinstance(obj, dict):
344
- # return {k: _recursively_scrub(v) for k, v in obj.items()}
345
- # if isinstance(obj, list):
346
- # return [_recursively_scrub(v) for v in obj]
347
- # return obj
348
- #
349
- #
350
- # async def proxy_upstream_chat_completions(
351
- # body: dict, stream: bool, scrub_think: bool = False
352
- # ):
353
- # HF_TOKEN = os.getenv("OPENAI_API_KEY")
354
- # headers = {
355
- # "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
356
- # "Content-Type": "application/json",
357
- # }
358
- # UPSTREAM_BASE = os.getenv("UPSTREAM_OPENAI_BASE", "").rstrip("/")
359
- # url = f"{UPSTREAM_BASE}/chat/completions"
360
- #
361
- # if stream:
362
- #
363
- # async def proxy_stream():
364
- # async with httpx.AsyncClient(timeout=None) as client:
365
- # async with client.stream(
366
- # "POST", url, headers=headers, json=body
367
- # ) as resp:
368
- # resp.raise_for_status()
369
- # if scrub_think:
370
- # # Pull text segments, scrub tags, and yield bytes
371
- # async for txt in resp.aiter_text():
372
- # try:
373
- # cleaned = scrub_think_tags(txt)
374
- # yield cleaned.encode("utf-8")
375
- # except Exception:
376
- # yield txt.encode("utf-8")
377
- # else:
378
- # async for chunk in resp.aiter_bytes():
379
- # yield chunk
380
- #
381
- # return fastapi.responses.StreamingResponse(
382
- # proxy_stream(), media_type="text/event-stream", headers=sse_headers()
383
- # )
384
- # else:
385
- # async with httpx.AsyncClient(timeout=None) as client:
386
- # r = await client.post(url, headers=headers, json=body)
387
- # try:
388
- # payload = r.json()
389
- # except Exception:
390
- # payload = {"status_code": r.status_code, "text": r.text}
391
- #
392
- # if scrub_think:
393
- # try:
394
- # payload = _recursively_scrub(payload)
395
- # except Exception:
396
- # pass
397
- #
398
- # return fastapi.responses.JSONResponse(
399
- # status_code=r.status_code, content=payload
400
- # )
 
 
 
 
1
  import asyncio
2
  import contextlib
3
  import os
4
  import threading
5
+ import time
6
+ import typing
7
 
8
  import fastapi
9
  import httpx
10
 
11
  from agent_server.helpers import sse_headers
12
  from agent_server.sanitizing_think_tags import scrub_think_tags
13
+ from agent_server.std_tee import QueueWriter, _serialize_step
 
 
 
14
 
 
 
 
 
 
 
 
 
 
 
15
 
16
+ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
17
+ """
18
+ Start the agent in a worker thread.
19
+ Stream THREE sources of incremental data into the async generator:
20
+ (1) live stdout/stderr lines,
21
+ (2) newly appended memory steps (polled),
22
+ (3) any iterable the agent may yield (if supported).
23
+ Finally emit a __final__ item with the last answer.
24
  """
25
+ loop = asyncio.get_running_loop()
26
  q: asyncio.Queue = asyncio.Queue()
27
  agent_to_use = agent_obj
28
 
29
+ stop_evt = threading.Event()
30
+
31
+ # 1) stdout/stderr live tee
32
  qwriter = QueueWriter(q)
33
 
34
+ # 2) memory poller
35
+ def poll_memory():
36
+ last_len = 0
37
+ while not stop_evt.is_set():
38
+ try:
39
+ steps = []
40
+ try:
41
+ # Common API: agent.memory.get_full_steps()
42
+ steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
43
+ except Exception:
44
+ # Fallbacks: different names across versions
45
+ steps = (
46
+ getattr(agent_to_use, "steps", [])
47
+ or getattr(agent_to_use, "memory", [])
48
+ or []
49
+ )
50
+ if steps is None:
51
+ steps = []
52
+ curr_len = len(steps)
53
+ if curr_len > last_len:
54
+ new = steps[last_len:curr_len]
55
+ last_len = curr_len
56
+ for s in new:
57
+ s_text = _serialize_step(s)
58
+ if s_text:
59
+ try:
60
+ q.put_nowait({"__step__": s_text})
61
+ except Exception:
62
+ pass
63
+ except Exception:
64
+ pass
65
+ time.sleep(0.10) # 100 ms cadence
66
+
67
+ # 3) agent runner (may or may not yield)
68
  def run_agent():
69
  final_result = None
70
  try:
71
+ with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
72
+ qwriter
73
+ ):
74
+ used_iterable = False
75
+ if hasattr(agent_to_use, "run") and callable(
76
+ getattr(agent_to_use, "run")
77
+ ):
78
+ try:
79
+ res = agent_to_use.run(task, stream=True)
80
+ if hasattr(res, "__iter__") and not isinstance(
81
+ res, (str, bytes)
82
+ ):
83
+ used_iterable = True
84
+ for it in res:
85
+ try:
86
+ q.put_nowait(it)
87
+ except Exception:
88
+ pass
89
+ final_result = (
90
+ None # iterable may already contain the answer
91
+ )
92
+ else:
93
+ final_result = res
94
+ except TypeError:
95
+ # run(stream=True) not supported -> fall back
96
+ pass
97
+
98
+ if final_result is None and not used_iterable:
99
+ # Try other common streaming signatures
100
+ for name in (
101
+ "run_stream",
102
+ "stream",
103
+ "stream_run",
104
+ "run_with_callback",
105
+ ):
106
+ fn = getattr(agent_to_use, name, None)
107
+ if callable(fn):
108
+ try:
109
+ res = fn(task)
110
+ if hasattr(res, "__iter__") and not isinstance(
111
+ res, (str, bytes)
112
+ ):
113
+ for it in res:
114
+ q.put_nowait(it)
115
+ final_result = None
116
+ else:
117
+ final_result = res
118
+ break
119
+ except TypeError:
120
+ # maybe callback signature
121
+ def cb(item):
122
+ try:
123
+ q.put_nowait(item)
124
+ except Exception:
125
+ pass
126
+
127
+ try:
128
+ fn(task, cb)
129
+ final_result = None
130
+ break
131
+ except Exception:
132
+ continue
133
+
134
+ if final_result is None and not used_iterable:
135
+ pass # (typo guard removed below)
136
+
137
+ if final_result is None and not used_iterable:
138
+ # Last resort: synchronous run()/generate()/callable
139
+ if hasattr(agent_to_use, "run") and callable(
140
+ getattr(agent_to_use, "run")
141
+ ):
142
+ final_result = agent_to_use.run(task)
143
+ elif hasattr(agent_to_use, "generate") and callable(
144
+ getattr(agent_to_use, "generate")
145
+ ):
146
+ final_result = agent_to_use.generate(task)
147
+ elif callable(agent_to_use):
148
+ final_result = agent_to_use(task)
149
+
150
  except Exception as e:
151
  try:
152
  qwriter.flush()
 
165
  q.put_nowait({"__final__": final_result})
166
  except Exception:
167
  pass
168
+ stop_evt.set()
169
 
170
+ # Kick off threads
171
+ mem_thread = threading.Thread(target=poll_memory, daemon=True)
172
+ run_thread = threading.Thread(target=run_agent, daemon=True)
173
+ mem_thread.start()
174
  run_thread.start()
175
 
176
+ # Async consumer
177
  while True:
178
  item = await q.get()
 
 
 
 
 
 
 
 
 
179
  yield item
180
  if isinstance(item, dict) and "__final__" in item:
181
  break
182
 
183
 
 
 
 
184
  def _recursively_scrub(obj):
185
  if isinstance(obj, str):
186
  return scrub_think_tags(obj)
 
191
  return obj
192
 
193
 
194
+ async def proxy_upstream_chat_completions(
195
+ body: dict, stream: bool, scrub_think: bool = False
196
+ ):
 
197
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
198
  headers = {
199
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
 
206
 
207
  async def proxy_stream():
208
  async with httpx.AsyncClient(timeout=None) as client:
209
+ async with client.stream(
210
+ "POST", url, headers=headers, json=body
211
+ ) as resp:
212
  resp.raise_for_status()
213
  if scrub_think:
214
+ # Pull text segments, scrub tags, and yield bytes
215
  async for txt in resp.aiter_text():
216
  try:
217
  cleaned = scrub_think_tags(txt)
 
239
  except Exception:
240
  pass
241
 
242
+ return fastapi.responses.JSONResponse(
243
+ status_code=r.status_code, content=payload
244
+ )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
agent_server/std_tee.py CHANGED
@@ -1,317 +1,102 @@
1
- # agent_server/std_tee.py
2
- from __future__ import annotations
3
-
4
  import asyncio
5
  import io
6
  import json
7
  import re
8
  import threading
9
- import typing as t
10
 
11
  from agent_server.sanitizing_think_tags import scrub_think_tags
12
 
13
- # ---------------------------------------------------------------------------
14
- # Cleaning / formatting helpers (used by the streaming layer)
15
- # ---------------------------------------------------------------------------
16
-
17
- # Strip ANSI escape sequences (common from rich/logging)
18
- _ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
19
-
20
- # Lines that should never be surfaced (system prompt safety + obvious boilerplate)
21
- _NOISY_PREFIXES = (
22
- "OpenAIServerModel",
23
- "Output message of the LLM",
24
- "New run",
25
- "─ Executing parsed code",
26
- "╭", "╰", "│", "━", "─",
27
- "System prompt", "SYSTEM PROMPT", "System Prompt",
28
- )
29
-
30
- # Very long single lines without enough alphanumerics are dropped
31
- _MIN_SIG_CHARS = re.compile(r"[A-Za-z0-9]{3,}")
32
-
33
- def _strip_ansi_and_think(s: str) -> str:
34
- s = scrub_think_tags(s)
35
- s = _ANSI_RE.sub("", s)
36
- return s
37
-
38
- def _truncate(s: str, n: int) -> str:
39
- s = s.strip()
40
- if len(s) <= n:
41
- return s
42
- return s[:n] + "\n… [truncated]"
43
-
44
- def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str:
45
- """
46
- Lightweight formatter for reasoning stream from stdout.
47
- - scrubs <think>…</think>
48
- - strips ANSI
49
- - drops banners/box drawing and 'System prompt …'
50
- - drops very-long low-signal lines
51
- Returns a small readable chunk with a trailing blank line.
52
- """
53
- stripped = _strip_ansi_and_think(text).rstrip("\n").strip()
54
- if not stripped:
55
- return ""
56
- if any(stripped.startswith(p) for p in _NOISY_PREFIXES):
57
- return ""
58
- # Lines made mostly of box drawing/separators
59
- if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped):
60
- return ""
61
- # Excessively long lines with little signal (no alphanumerics)
62
- if len(stripped) > 240 and not _MIN_SIG_CHARS.search(stripped):
63
- return ""
64
- return f"{stripped}\n\n"
65
-
66
- # Optional helper if you ever want to sniff final answer from stdout
67
- _FINAL_RE = re.compile(r"(?:^|\b)Final\s+answer:\s*(.+)$", flags=re.IGNORECASE)
68
- def _maybe_parse_final_from_stdout(line: str) -> t.Optional[str]:
69
- if not isinstance(line, str):
70
- return None
71
- m = _FINAL_RE.search(_strip_ansi_and_think(line))
72
- if not m:
73
- return None
74
- return _strip_ansi_and_think(m.group(1)).strip() or None
75
-
76
-
77
- # ---------------------------------------------------------------------------
78
- # QueueWriter: tee stdout/stderr into an asyncio.Queue line-by-line
79
- # ---------------------------------------------------------------------------
80
 
81
  class QueueWriter(io.TextIOBase):
82
  """
83
- File-like object that pushes lines to an asyncio.Queue.
84
- Each complete line is enqueued as {"__stdout__": "<line>"}.
85
- The last partial line (if any) is flushed on flush().
86
  """
87
 
88
- def __init__(self, q: "asyncio.Queue[dict]"):
89
  self.q = q
90
  self._lock = threading.Lock()
91
- self._buf: list[str] = []
92
 
93
- def write(self, s: str) -> int:
94
  if not s:
95
  return 0
96
- if not isinstance(s, str):
97
- s = str(s)
98
-
99
  with self._lock:
100
  self._buf.append(s)
101
- text = "".join(self._buf)
102
- if "\n" in text:
103
- lines = text.splitlines(keepends=True)
104
- # keep last partial (no newline) in buffer
105
- tail = "" if text.endswith("\n") else lines.pop()
106
- for ln in lines:
107
- if ln: # include newlines; consumer will trim/format
108
- try:
109
- self.q.put_nowait({"__stdout__": ln})
110
- except Exception:
111
- pass
112
- self._buf = [tail]
113
  return len(s)
114
 
115
- def flush(self) -> None:
116
  with self._lock:
117
  if self._buf:
118
- text = "".join(self._buf)
119
  self._buf.clear()
120
- if text:
121
- try:
122
- self.q.put_nowait({"__stdout__": text})
123
- except Exception:
124
- pass
125
-
126
- def isatty(self) -> bool: # some libs check this
127
- return False
128
 
129
 
130
- # ---------------------------------------------------------------------------
131
- # (Optional / future) Compact serializer for step objects from various agents
132
- # ---------------------------------------------------------------------------
133
-
134
  def _serialize_step(step) -> str:
135
  """
136
- Compact, uniform serializer for 'step' objects from different agent libs.
137
- Produces:
138
- Step N
139
- 🧠 Thought: …
140
- 🛠️ Tool: …
141
- 📥 Args: …
142
- 📤 Observation: …
143
- 💥 Error: …
144
- (plus code fences when code is present)
145
- With truncation to keep the reveal parsimonious.
146
  """
147
- parts: list[str] = []
148
-
149
- # Step number (best-effort)
150
  sn = getattr(step, "step_number", None)
151
  if sn is not None:
152
  parts.append(f"Step {sn}")
153
-
154
- # Thought
155
  thought_val = getattr(step, "thought", None)
156
  if thought_val:
157
- parts.append(f"🧠 Thought: {_truncate(_strip_ansi_and_think(str(thought_val)), 600)}")
158
-
159
- # Tool
160
  tool_val = getattr(step, "tool", None)
161
  if tool_val:
162
- parts.append(f"🛠️ Tool: {_truncate(_strip_ansi_and_think(str(tool_val)), 240)}")
163
-
164
- # Code (if any)
165
  code_val = getattr(step, "code", None)
166
  if code_val:
167
- code_str = _truncate(_strip_ansi_and_think(str(code_val)), 1600)
168
- if code_str:
169
- parts.append("```python\n" + code_str + "\n```")
170
-
171
- # Args
172
  args = getattr(step, "args", None)
173
  if args:
174
  try:
175
- arg_s = _truncate(_strip_ansi_and_think(json.dumps(args, ensure_ascii=False)), 800)
 
 
176
  except Exception:
177
- arg_s = _truncate(_strip_ansi_and_think(str(args)), 800)
178
- parts.append("📥 Args: " + arg_s)
179
-
180
- # Error
181
  error = getattr(step, "error", None)
182
  if error:
183
- parts.append(f"💥 Error: {_truncate(_strip_ansi_and_think(str(error)), 600)}")
184
-
185
- # Observations
186
  obs = getattr(step, "observations", None)
187
  if obs is not None:
188
  if isinstance(obs, (list, tuple)):
189
  obs_str = "\n".join(map(str, obs))
190
  else:
191
  obs_str = str(obs)
192
- parts.append("📤 Observation:\n" + _truncate(_strip_ansi_and_think(obs_str), 1600))
193
-
194
  # If this looks like a FinalAnswer step object, surface a clean final answer
195
  try:
196
  tname = type(step).__name__
197
  except Exception:
198
  tname = ""
199
- if isinstance(tname, str) and tname.lower().startswith("finalanswer"):
200
  out = getattr(step, "output", None)
201
  if out is not None:
202
- return f"Final answer: {_strip_ansi_and_think(str(out)).strip()}"
203
- # Fallback: parse from string repr "FinalAnswerStep(output=...)"
204
- s = _strip_ansi_and_think(str(step))
205
  m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
206
  if m:
207
  return f"Final answer: {m.group(1).strip()}"
208
-
209
  # If the only content would be an object repr like FinalAnswerStep(...), drop it;
210
  # a cleaner "Final answer: ..." will come from the rule above or stdout.
211
  joined = "\n".join(parts).strip()
212
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
213
  return ""
214
- return joined or _strip_ansi_and_think(str(step))
215
-
216
- # import asyncio
217
- # import io
218
- # import json
219
- # import re
220
- # import threading
221
- #
222
- # from agent_server.sanitizing_think_tags import scrub_think_tags
223
- #
224
- #
225
- # class QueueWriter(io.TextIOBase):
226
- # """
227
- # File-like object that pushes each write to an asyncio.Queue immediately.
228
- # """
229
- #
230
- # def __init__(self, q: "asyncio.Queue"):
231
- # self.q = q
232
- # self._lock = threading.Lock()
233
- # self._buf = [] # accumulate until newline to reduce spam
234
- #
235
- # def write(self, s: str):
236
- # if not s:
237
- # return 0
238
- # with self._lock:
239
- # self._buf.append(s)
240
- # # flush on newline to keep granularity reasonable
241
- # if "\n" in s:
242
- # chunk = "".join(self._buf)
243
- # self._buf.clear()
244
- # try:
245
- # self.q.put_nowait({"__stdout__": chunk})
246
- # except Exception:
247
- # pass
248
- # return len(s)
249
- #
250
- # def flush(self):
251
- # with self._lock:
252
- # if self._buf:
253
- # chunk = "".join(self._buf)
254
- # self._buf.clear()
255
- # try:
256
- # self.q.put_nowait({"__stdout__": chunk})
257
- # except Exception:
258
- # pass
259
- #
260
- #
261
- # def _serialize_step(step) -> str:
262
- # """
263
- # Best-effort pretty string for a smolagents MemoryStep / ActionStep.
264
- # Works even if attributes are missing on some versions.
265
- # """
266
- # parts = []
267
- # sn = getattr(step, "step_number", None)
268
- # if sn is not None:
269
- # parts.append(f"Step {sn}")
270
- # thought_val = getattr(step, "thought", None)
271
- # if thought_val:
272
- # parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
273
- # tool_val = getattr(step, "tool", None)
274
- # if tool_val:
275
- # parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
276
- # code_val = getattr(step, "code", None)
277
- # if code_val:
278
- # code_str = scrub_think_tags(str(code_val)).strip()
279
- # parts.append("```python\n" + code_str + "\n```")
280
- # args = getattr(step, "args", None)
281
- # if args:
282
- # try:
283
- # parts.append(
284
- # "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
285
- # )
286
- # except Exception:
287
- # parts.append("Args: " + scrub_think_tags(str(args)))
288
- # error = getattr(step, "error", None)
289
- # if error:
290
- # parts.append(f"Error: {scrub_think_tags(str(error))}")
291
- # obs = getattr(step, "observations", None)
292
- # if obs is not None:
293
- # if isinstance(obs, (list, tuple)):
294
- # obs_str = "\n".join(map(str, obs))
295
- # else:
296
- # obs_str = str(obs)
297
- # parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
298
- # # If this looks like a FinalAnswer step object, surface a clean final answer
299
- # try:
300
- # tname = type(step).__name__
301
- # except Exception:
302
- # tname = ""
303
- # if tname.lower().startswith("finalanswer"):
304
- # out = getattr(step, "output", None)
305
- # if out is not None:
306
- # return f"Final answer: {scrub_think_tags(str(out)).strip()}"
307
- # # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
308
- # s = scrub_think_tags(str(step))
309
- # m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
310
- # if m:
311
- # return f"Final answer: {m.group(1).strip()}"
312
- # # If the only content would be an object repr like FinalAnswerStep(...), drop it;
313
- # # a cleaner "Final answer: ..." will come from the rule above or stdout.
314
- # joined = "\n".join(parts).strip()
315
- # if re.match(r"^FinalAnswer[^\n]+\)$", joined):
316
- # return ""
317
- # return joined or scrub_think_tags(str(step))
 
 
 
 
1
  import asyncio
2
  import io
3
  import json
4
  import re
5
  import threading
 
6
 
7
  from agent_server.sanitizing_think_tags import scrub_think_tags
8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9
 
10
  class QueueWriter(io.TextIOBase):
11
  """
12
+ File-like object that pushes each write to an asyncio.Queue immediately.
 
 
13
  """
14
 
15
+ def __init__(self, q: "asyncio.Queue"):
16
  self.q = q
17
  self._lock = threading.Lock()
18
+ self._buf = [] # accumulate until newline to reduce spam
19
 
20
+ def write(self, s: str):
21
  if not s:
22
  return 0
 
 
 
23
  with self._lock:
24
  self._buf.append(s)
25
+ # flush on newline to keep granularity reasonable
26
+ if "\n" in s:
27
+ chunk = "".join(self._buf)
28
+ self._buf.clear()
29
+ try:
30
+ self.q.put_nowait({"__stdout__": chunk})
31
+ except Exception:
32
+ pass
 
 
 
 
33
  return len(s)
34
 
35
+ def flush(self):
36
  with self._lock:
37
  if self._buf:
38
+ chunk = "".join(self._buf)
39
  self._buf.clear()
40
+ try:
41
+ self.q.put_nowait({"__stdout__": chunk})
42
+ except Exception:
43
+ pass
 
 
 
 
44
 
45
 
 
 
 
 
46
  def _serialize_step(step) -> str:
47
  """
48
+ Best-effort pretty string for a smolagents MemoryStep / ActionStep.
49
+ Works even if attributes are missing on some versions.
 
 
 
 
 
 
 
 
50
  """
51
+ parts = []
 
 
52
  sn = getattr(step, "step_number", None)
53
  if sn is not None:
54
  parts.append(f"Step {sn}")
 
 
55
  thought_val = getattr(step, "thought", None)
56
  if thought_val:
57
+ parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
 
 
58
  tool_val = getattr(step, "tool", None)
59
  if tool_val:
60
+ parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
 
 
61
  code_val = getattr(step, "code", None)
62
  if code_val:
63
+ code_str = scrub_think_tags(str(code_val)).strip()
64
+ parts.append("```python\n" + code_str + "\n```")
 
 
 
65
  args = getattr(step, "args", None)
66
  if args:
67
  try:
68
+ parts.append(
69
+ "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
70
+ )
71
  except Exception:
72
+ parts.append("Args: " + scrub_think_tags(str(args)))
 
 
 
73
  error = getattr(step, "error", None)
74
  if error:
75
+ parts.append(f"Error: {scrub_think_tags(str(error))}")
 
 
76
  obs = getattr(step, "observations", None)
77
  if obs is not None:
78
  if isinstance(obs, (list, tuple)):
79
  obs_str = "\n".join(map(str, obs))
80
  else:
81
  obs_str = str(obs)
82
+ parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
 
83
  # If this looks like a FinalAnswer step object, surface a clean final answer
84
  try:
85
  tname = type(step).__name__
86
  except Exception:
87
  tname = ""
88
+ if tname.lower().startswith("finalanswer"):
89
  out = getattr(step, "output", None)
90
  if out is not None:
91
+ return f"Final answer: {scrub_think_tags(str(out)).strip()}"
92
+ # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
93
+ s = scrub_think_tags(str(step))
94
  m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
95
  if m:
96
  return f"Final answer: {m.group(1).strip()}"
 
97
  # If the only content would be an object repr like FinalAnswerStep(...), drop it;
98
  # a cleaner "Final answer: ..." will come from the rule above or stdout.
99
  joined = "\n".join(parts).strip()
100
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
101
  return ""
102
+ return joined or scrub_think_tags(str(step))