ccm commited on
Commit
a6d1b6d
·
1 Parent(s): 1a514b0

Attempting better agent reasoning trace

Browse files
agent_server/agent_streaming.py CHANGED
@@ -1,19 +1,91 @@
 
 
 
1
  import asyncio
2
  import contextlib
3
  import os
4
  import threading
5
  import time
6
- import typing
7
 
8
  import fastapi
9
  import httpx
10
 
11
  from agent_server.helpers import sse_headers
12
  from agent_server.sanitizing_think_tags import scrub_think_tags
13
- from agent_server.std_tee import QueueWriter, _serialize_step
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
14
 
 
 
 
15
 
16
- async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
 
 
 
 
 
17
  """
18
  Start the agent in a worker thread.
19
  Stream THREE sources of incremental data into the async generator:
@@ -21,97 +93,62 @@ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = N
21
  (2) newly appended memory steps (polled),
22
  (3) any iterable the agent may yield (if supported).
23
  Finally emit a __final__ item with the last answer.
 
 
 
 
 
 
 
24
  """
25
  loop = asyncio.get_running_loop()
26
  q: asyncio.Queue = asyncio.Queue()
27
  agent_to_use = agent_obj
28
-
29
  stop_evt = threading.Event()
30
 
31
- # 1) stdout/stderr live tee
32
  qwriter = QueueWriter(q)
33
 
34
  # 2) memory poller
35
- def poll_memory():
36
- last_len = 0
37
- while not stop_evt.is_set():
38
- try:
39
- steps = []
40
- try:
41
- # Common API: agent.memory.get_full_steps()
42
- steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
43
- except Exception:
44
- # Fallbacks: different names across versions
45
- steps = (
46
- getattr(agent_to_use, "steps", [])
47
- or getattr(agent_to_use, "memory", [])
48
- or []
49
- )
50
- if steps is None:
51
- steps = []
52
- curr_len = len(steps)
53
- if curr_len > last_len:
54
- new = steps[last_len:curr_len]
55
- last_len = curr_len
56
- for s in new:
57
- s_text = _serialize_step(s)
58
- if s_text:
59
- try:
60
- q.put_nowait({"__step__": s_text})
61
- except Exception:
62
- pass
63
- except Exception:
64
- pass
65
- time.sleep(0.10) # 100 ms cadence
66
 
67
  # 3) agent runner (may or may not yield)
68
  def run_agent():
69
  final_result = None
70
  try:
71
- with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
72
- qwriter
73
- ):
74
  used_iterable = False
75
- if hasattr(agent_to_use, "run") and callable(
76
- getattr(agent_to_use, "run")
77
- ):
78
  try:
79
  res = agent_to_use.run(task, stream=True)
80
- if hasattr(res, "__iter__") and not isinstance(
81
- res, (str, bytes)
82
- ):
83
  used_iterable = True
84
  for it in res:
85
  try:
86
- q.put_nowait(it)
87
  except Exception:
88
  pass
89
- final_result = (
90
- None # iterable may already contain the answer
91
- )
92
  else:
93
  final_result = res
94
  except TypeError:
95
- # run(stream=True) not supported -> fall back
96
  pass
97
 
98
  if final_result is None and not used_iterable:
99
- # Try other common streaming signatures
100
- for name in (
101
- "run_stream",
102
- "stream",
103
- "stream_run",
104
- "run_with_callback",
105
- ):
106
  fn = getattr(agent_to_use, name, None)
107
  if callable(fn):
108
  try:
109
  res = fn(task)
110
- if hasattr(res, "__iter__") and not isinstance(
111
- res, (str, bytes)
112
- ):
113
  for it in res:
114
- q.put_nowait(it)
 
 
 
115
  final_result = None
116
  else:
117
  final_result = res
@@ -120,7 +157,7 @@ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = N
120
  # maybe callback signature
121
  def cb(item):
122
  try:
123
- q.put_nowait(item)
124
  except Exception:
125
  pass
126
 
@@ -132,17 +169,10 @@ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = N
132
  continue
133
 
134
  if final_result is None and not used_iterable:
135
- pass # (typo guard removed below)
136
-
137
- if final_result is None and not used_iterable:
138
- # Last resort: synchronous run()/generate()/callable
139
- if hasattr(agent_to_use, "run") and callable(
140
- getattr(agent_to_use, "run")
141
- ):
142
  final_result = agent_to_use.run(task)
143
- elif hasattr(agent_to_use, "generate") and callable(
144
- getattr(agent_to_use, "generate")
145
- ):
146
  final_result = agent_to_use.generate(task)
147
  elif callable(agent_to_use):
148
  final_result = agent_to_use(task)
@@ -167,20 +197,45 @@ async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = N
167
  pass
168
  stop_evt.set()
169
 
170
- # Kick off threads
171
- mem_thread = threading.Thread(target=poll_memory, daemon=True)
172
- run_thread = threading.Thread(target=run_agent, daemon=True)
173
- mem_thread.start()
174
  run_thread.start()
175
 
176
- # Async consumer
177
  while True:
178
  item = await q.get()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
179
  yield item
 
180
  if isinstance(item, dict) and "__final__" in item:
181
  break
182
 
183
 
 
 
 
184
  def _recursively_scrub(obj):
185
  if isinstance(obj, str):
186
  return scrub_think_tags(obj)
@@ -191,9 +246,10 @@ def _recursively_scrub(obj):
191
  return obj
192
 
193
 
194
- async def proxy_upstream_chat_completions(
195
- body: dict, stream: bool, scrub_think: bool = False
196
- ):
 
197
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
198
  headers = {
199
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
@@ -206,12 +262,9 @@ async def proxy_upstream_chat_completions(
206
 
207
  async def proxy_stream():
208
  async with httpx.AsyncClient(timeout=None) as client:
209
- async with client.stream(
210
- "POST", url, headers=headers, json=body
211
- ) as resp:
212
  resp.raise_for_status()
213
  if scrub_think:
214
- # Pull text segments, scrub tags, and yield bytes
215
  async for txt in resp.aiter_text():
216
  try:
217
  cleaned = scrub_think_tags(txt)
@@ -239,6 +292,250 @@ async def proxy_upstream_chat_completions(
239
  except Exception:
240
  pass
241
 
242
- return fastapi.responses.JSONResponse(
243
- status_code=r.status_code, content=payload
244
- )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agent_server/agent_streaming.py
2
+ from __future__ import annotations
3
+
4
  import asyncio
5
  import contextlib
6
  import os
7
  import threading
8
  import time
9
+ import typing as t
10
 
11
  import fastapi
12
  import httpx
13
 
14
  from agent_server.helpers import sse_headers
15
  from agent_server.sanitizing_think_tags import scrub_think_tags
16
+ from agent_server.std_tee import (
17
+ QueueWriter,
18
+ _serialize_step,
19
+ _format_reasoning_chunk,
20
+ _maybe_parse_final_from_stdout,
21
+ )
22
+
23
+ # ---------------------------------------------------------------------------
24
+ # Memory poller: normalize all agent types to uniform step blocks.
25
+ # ---------------------------------------------------------------------------
26
+ def start_memory_poller(
27
+ agent: t.Any,
28
+ q: "asyncio.Queue[dict]",
29
+ stop_evt: "threading.Event",
30
+ interval: float = 0.10,
31
+ ) -> threading.Thread:
32
+ """
33
+ Starts a background thread that polls agent memory and enqueues formatted step blocks.
34
+ Tries several attribute paths to support different agent implementations.
35
+ """
36
+ last_len = 0
37
+
38
+ def _get_steps_safe() -> list:
39
+ # Try canonical memory APIs first
40
+ try:
41
+ mem = getattr(agent, "memory", None)
42
+ if mem is not None:
43
+ for attr in ("get_full_steps", "get_steps", "get_all_steps"):
44
+ fn = getattr(mem, attr, None)
45
+ if callable(fn):
46
+ steps = fn()
47
+ return list(steps or [])
48
+ except Exception:
49
+ pass
50
+ # Fallback: common direct list field
51
+ try:
52
+ raw = getattr(agent, "steps", None)
53
+ if raw:
54
+ return list(raw)
55
+ except Exception:
56
+ pass
57
+ return []
58
+
59
+ def _run():
60
+ nonlocal last_len
61
+ while not stop_evt.is_set():
62
+ try:
63
+ steps = _get_steps_safe()
64
+ if steps and len(steps) > last_len:
65
+ new = steps[last_len:]
66
+ last_len = len(steps)
67
+ for s in new:
68
+ try:
69
+ s_text = _serialize_step(s)
70
+ if s_text:
71
+ q.put_nowait({"__step__": s_text})
72
+ except Exception:
73
+ # Never let formatting kill polling
74
+ pass
75
+ except Exception:
76
+ pass
77
+ time.sleep(interval)
78
 
79
+ th = threading.Thread(target=_run, name="memory-poller", daemon=True)
80
+ th.start()
81
+ return th
82
 
83
+
84
+ # ---------------------------------------------------------------------------
85
+ # Unified agent streaming: stdout/stderr, memory steps, iterator yields.
86
+ # Adds normalized reasoning via __reasoning__ while preserving legacy keys.
87
+ # ---------------------------------------------------------------------------
88
+ async def run_agent_stream(task: str, agent_obj: t.Optional[t.Any] = None):
89
  """
90
  Start the agent in a worker thread.
91
  Stream THREE sources of incremental data into the async generator:
 
93
  (2) newly appended memory steps (polled),
94
  (3) any iterable the agent may yield (if supported).
95
  Finally emit a __final__ item with the last answer.
96
+
97
+ Emits dict items. For compatibility, raw shapes are preserved:
98
+ - {"__stdout__": "<line>"} (raw line)
99
+ - {"__step__": "<block>"} (uniform Step/Thought/Tool/Args/Obs/Error block)
100
+ - {"__reasoning__": "<chunk>"} (normalized reasoning derived from stdout)
101
+ - {"__error__": "<error>"} (if run errors)
102
+ - {"__final__": any} (final result)
103
  """
104
  loop = asyncio.get_running_loop()
105
  q: asyncio.Queue = asyncio.Queue()
106
  agent_to_use = agent_obj
 
107
  stop_evt = threading.Event()
108
 
109
+ # 1) stdout/stderr live tee (lines go in as {"__stdout__": ...})
110
  qwriter = QueueWriter(q)
111
 
112
  # 2) memory poller
113
+ mem_thread = start_memory_poller(agent_to_use, q, stop_evt)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
114
 
115
  # 3) agent runner (may or may not yield)
116
  def run_agent():
117
  final_result = None
118
  try:
119
+ with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(qwriter):
 
 
120
  used_iterable = False
121
+ # Preferred streaming signature
122
+ if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
 
123
  try:
124
  res = agent_to_use.run(task, stream=True)
125
+ if hasattr(res, "__iter__") and not isinstance(res, (str, bytes)):
 
 
126
  used_iterable = True
127
  for it in res:
128
  try:
129
+ q.put_nowait(it if isinstance(it, dict) else {"__stdout__": str(it)})
130
  except Exception:
131
  pass
132
+ final_result = None # may be contained in the iterable
 
 
133
  else:
134
  final_result = res
135
  except TypeError:
136
+ # run(stream=True) not supported -> fall through to other signatures
137
  pass
138
 
139
  if final_result is None and not used_iterable:
140
+ # Try other common streaming variants
141
+ for name in ("run_stream", "stream", "stream_run", "run_with_callback"):
 
 
 
 
 
142
  fn = getattr(agent_to_use, name, None)
143
  if callable(fn):
144
  try:
145
  res = fn(task)
146
+ if hasattr(res, "__iter__") and not isinstance(res, (str, bytes)):
 
 
147
  for it in res:
148
+ try:
149
+ q.put_nowait(it if isinstance(it, dict) else {"__stdout__": str(it)})
150
+ except Exception:
151
+ pass
152
  final_result = None
153
  else:
154
  final_result = res
 
157
  # maybe callback signature
158
  def cb(item):
159
  try:
160
+ q.put_nowait(item if isinstance(item, dict) else {"__stdout__": str(item)})
161
  except Exception:
162
  pass
163
 
 
169
  continue
170
 
171
  if final_result is None and not used_iterable:
172
+ # Last resort: synchronous APIs
173
+ if hasattr(agent_to_use, "run") and callable(getattr(agent_to_use, "run")):
 
 
 
 
 
174
  final_result = agent_to_use.run(task)
175
+ elif hasattr(agent_to_use, "generate") and callable(getattr(agent_to_use, "generate")):
 
 
176
  final_result = agent_to_use.generate(task)
177
  elif callable(agent_to_use):
178
  final_result = agent_to_use(task)
 
197
  pass
198
  stop_evt.set()
199
 
200
+ run_thread = threading.Thread(target=run_agent, name="agent-runner", daemon=True)
 
 
 
201
  run_thread.start()
202
 
203
+ # Async consumer: normalize stdout -> reasoning chunk; forward steps & others
204
  while True:
205
  item = await q.get()
206
+
207
+ # Normalize stdout lines into compact reasoning chunks, and also
208
+ # opportunistically extract a "Final answer:" if the agent prints one.
209
+ if isinstance(item, dict) and "__stdout__" in item:
210
+ line = item["__stdout__"]
211
+ # Add compact, filtered reasoning chunk (drop banners, system prompts)
212
+ chunk = _format_reasoning_chunk(line, tag="stdout", idx=0)
213
+ if chunk:
214
+ yield {"__reasoning__": chunk}
215
+ # Keep legacy raw stdout for existing consumers
216
+ yield item
217
+ # Opportunistic final answer capture from stdout
218
+ maybe_final = _maybe_parse_final_from_stdout(line)
219
+ if maybe_final:
220
+ # Don't end the stream here; consumer can decide how to use it
221
+ yield {"__maybe_final__": maybe_final}
222
+ continue
223
+
224
+ # Steps already serialized uniformly in the poller
225
+ if isinstance(item, dict) and "__step__" in item:
226
+ yield item
227
+ continue
228
+
229
+ # Pass-through for other shapes (__error__, iterable events, etc.)
230
  yield item
231
+
232
  if isinstance(item, dict) and "__final__" in item:
233
  break
234
 
235
 
236
+ # ---------------------------------------------------------------------------
237
+ # Utilities: scrub nested structures of <think> tags when proxying upstream
238
+ # ---------------------------------------------------------------------------
239
  def _recursively_scrub(obj):
240
  if isinstance(obj, str):
241
  return scrub_think_tags(obj)
 
246
  return obj
247
 
248
 
249
+ # ---------------------------------------------------------------------------
250
+ # Upstream proxy (OpenAI-compatible) with optional think-tag scrubbing
251
+ # ---------------------------------------------------------------------------
252
+ async def proxy_upstream_chat_completions(body: dict, stream: bool, scrub_think: bool = False):
253
  HF_TOKEN = os.getenv("OPENAI_API_KEY")
254
  headers = {
255
  "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
 
262
 
263
  async def proxy_stream():
264
  async with httpx.AsyncClient(timeout=None) as client:
265
+ async with client.stream("POST", url, headers=headers, json=body) as resp:
 
 
266
  resp.raise_for_status()
267
  if scrub_think:
 
268
  async for txt in resp.aiter_text():
269
  try:
270
  cleaned = scrub_think_tags(txt)
 
292
  except Exception:
293
  pass
294
 
295
+ return fastapi.responses.JSONResponse(status_code=r.status_code, content=payload)
296
+
297
+
298
+ # import asyncio
299
+ # import contextlib
300
+ # import os
301
+ # import threading
302
+ # import time
303
+ # import typing
304
+ #
305
+ # import fastapi
306
+ # import httpx
307
+ #
308
+ # from agent_server.helpers import sse_headers
309
+ # from agent_server.sanitizing_think_tags import scrub_think_tags
310
+ # from agent_server.std_tee import QueueWriter, _serialize_step
311
+ #
312
+ #
313
+ # async def run_agent_stream(task: str, agent_obj: typing.Optional[typing.Any] = None):
314
+ # """
315
+ # Start the agent in a worker thread.
316
+ # Stream THREE sources of incremental data into the async generator:
317
+ # (1) live stdout/stderr lines,
318
+ # (2) newly appended memory steps (polled),
319
+ # (3) any iterable the agent may yield (if supported).
320
+ # Finally emit a __final__ item with the last answer.
321
+ # """
322
+ # loop = asyncio.get_running_loop()
323
+ # q: asyncio.Queue = asyncio.Queue()
324
+ # agent_to_use = agent_obj
325
+ #
326
+ # stop_evt = threading.Event()
327
+ #
328
+ # # 1) stdout/stderr live tee
329
+ # qwriter = QueueWriter(q)
330
+ #
331
+ # # 2) memory poller
332
+ # def poll_memory():
333
+ # last_len = 0
334
+ # while not stop_evt.is_set():
335
+ # try:
336
+ # steps = []
337
+ # try:
338
+ # # Common API: agent.memory.get_full_steps()
339
+ # steps = agent_to_use.memory.get_full_steps() # type: ignore[attr-defined]
340
+ # except Exception:
341
+ # # Fallbacks: different names across versions
342
+ # steps = (
343
+ # getattr(agent_to_use, "steps", [])
344
+ # or getattr(agent_to_use, "memory", [])
345
+ # or []
346
+ # )
347
+ # if steps is None:
348
+ # steps = []
349
+ # curr_len = len(steps)
350
+ # if curr_len > last_len:
351
+ # new = steps[last_len:curr_len]
352
+ # last_len = curr_len
353
+ # for s in new:
354
+ # s_text = _serialize_step(s)
355
+ # if s_text:
356
+ # try:
357
+ # q.put_nowait({"__step__": s_text})
358
+ # except Exception:
359
+ # pass
360
+ # except Exception:
361
+ # pass
362
+ # time.sleep(0.10) # 100 ms cadence
363
+ #
364
+ # # 3) agent runner (may or may not yield)
365
+ # def run_agent():
366
+ # final_result = None
367
+ # try:
368
+ # with contextlib.redirect_stdout(qwriter), contextlib.redirect_stderr(
369
+ # qwriter
370
+ # ):
371
+ # used_iterable = False
372
+ # if hasattr(agent_to_use, "run") and callable(
373
+ # getattr(agent_to_use, "run")
374
+ # ):
375
+ # try:
376
+ # res = agent_to_use.run(task, stream=True)
377
+ # if hasattr(res, "__iter__") and not isinstance(
378
+ # res, (str, bytes)
379
+ # ):
380
+ # used_iterable = True
381
+ # for it in res:
382
+ # try:
383
+ # q.put_nowait(it)
384
+ # except Exception:
385
+ # pass
386
+ # final_result = (
387
+ # None # iterable may already contain the answer
388
+ # )
389
+ # else:
390
+ # final_result = res
391
+ # except TypeError:
392
+ # # run(stream=True) not supported -> fall back
393
+ # pass
394
+ #
395
+ # if final_result is None and not used_iterable:
396
+ # # Try other common streaming signatures
397
+ # for name in (
398
+ # "run_stream",
399
+ # "stream",
400
+ # "stream_run",
401
+ # "run_with_callback",
402
+ # ):
403
+ # fn = getattr(agent_to_use, name, None)
404
+ # if callable(fn):
405
+ # try:
406
+ # res = fn(task)
407
+ # if hasattr(res, "__iter__") and not isinstance(
408
+ # res, (str, bytes)
409
+ # ):
410
+ # for it in res:
411
+ # q.put_nowait(it)
412
+ # final_result = None
413
+ # else:
414
+ # final_result = res
415
+ # break
416
+ # except TypeError:
417
+ # # maybe callback signature
418
+ # def cb(item):
419
+ # try:
420
+ # q.put_nowait(item)
421
+ # except Exception:
422
+ # pass
423
+ #
424
+ # try:
425
+ # fn(task, cb)
426
+ # final_result = None
427
+ # break
428
+ # except Exception:
429
+ # continue
430
+ #
431
+ # if final_result is None and not used_iterable:
432
+ # pass # (typo guard removed below)
433
+ #
434
+ # if final_result is None and not used_iterable:
435
+ # # Last resort: synchronous run()/generate()/callable
436
+ # if hasattr(agent_to_use, "run") and callable(
437
+ # getattr(agent_to_use, "run")
438
+ # ):
439
+ # final_result = agent_to_use.run(task)
440
+ # elif hasattr(agent_to_use, "generate") and callable(
441
+ # getattr(agent_to_use, "generate")
442
+ # ):
443
+ # final_result = agent_to_use.generate(task)
444
+ # elif callable(agent_to_use):
445
+ # final_result = agent_to_use(task)
446
+ #
447
+ # except Exception as e:
448
+ # try:
449
+ # qwriter.flush()
450
+ # except Exception:
451
+ # pass
452
+ # try:
453
+ # q.put_nowait({"__error__": str(e)})
454
+ # except Exception:
455
+ # pass
456
+ # finally:
457
+ # try:
458
+ # qwriter.flush()
459
+ # except Exception:
460
+ # pass
461
+ # try:
462
+ # q.put_nowait({"__final__": final_result})
463
+ # except Exception:
464
+ # pass
465
+ # stop_evt.set()
466
+ #
467
+ # # Kick off threads
468
+ # mem_thread = threading.Thread(target=poll_memory, daemon=True)
469
+ # run_thread = threading.Thread(target=run_agent, daemon=True)
470
+ # mem_thread.start()
471
+ # run_thread.start()
472
+ #
473
+ # # Async consumer
474
+ # while True:
475
+ # item = await q.get()
476
+ # yield item
477
+ # if isinstance(item, dict) and "__final__" in item:
478
+ # break
479
+ #
480
+ #
481
+ # def _recursively_scrub(obj):
482
+ # if isinstance(obj, str):
483
+ # return scrub_think_tags(obj)
484
+ # if isinstance(obj, dict):
485
+ # return {k: _recursively_scrub(v) for k, v in obj.items()}
486
+ # if isinstance(obj, list):
487
+ # return [_recursively_scrub(v) for v in obj]
488
+ # return obj
489
+ #
490
+ #
491
+ # async def proxy_upstream_chat_completions(
492
+ # body: dict, stream: bool, scrub_think: bool = False
493
+ # ):
494
+ # HF_TOKEN = os.getenv("OPENAI_API_KEY")
495
+ # headers = {
496
+ # "Authorization": f"Bearer {HF_TOKEN}" if HF_TOKEN else "",
497
+ # "Content-Type": "application/json",
498
+ # }
499
+ # UPSTREAM_BASE = os.getenv("UPSTREAM_OPENAI_BASE", "").rstrip("/")
500
+ # url = f"{UPSTREAM_BASE}/chat/completions"
501
+ #
502
+ # if stream:
503
+ #
504
+ # async def proxy_stream():
505
+ # async with httpx.AsyncClient(timeout=None) as client:
506
+ # async with client.stream(
507
+ # "POST", url, headers=headers, json=body
508
+ # ) as resp:
509
+ # resp.raise_for_status()
510
+ # if scrub_think:
511
+ # # Pull text segments, scrub tags, and yield bytes
512
+ # async for txt in resp.aiter_text():
513
+ # try:
514
+ # cleaned = scrub_think_tags(txt)
515
+ # yield cleaned.encode("utf-8")
516
+ # except Exception:
517
+ # yield txt.encode("utf-8")
518
+ # else:
519
+ # async for chunk in resp.aiter_bytes():
520
+ # yield chunk
521
+ #
522
+ # return fastapi.responses.StreamingResponse(
523
+ # proxy_stream(), media_type="text/event-stream", headers=sse_headers()
524
+ # )
525
+ # else:
526
+ # async with httpx.AsyncClient(timeout=None) as client:
527
+ # r = await client.post(url, headers=headers, json=body)
528
+ # try:
529
+ # payload = r.json()
530
+ # except Exception:
531
+ # payload = {"status_code": r.status_code, "text": r.text}
532
+ #
533
+ # if scrub_think:
534
+ # try:
535
+ # payload = _recursively_scrub(payload)
536
+ # except Exception:
537
+ # pass
538
+ #
539
+ # return fastapi.responses.JSONResponse(
540
+ # status_code=r.status_code, content=payload
541
+ # )
agent_server/std_tee.py CHANGED
@@ -1,102 +1,305 @@
 
 
 
1
  import asyncio
2
  import io
3
  import json
4
  import re
5
  import threading
 
6
 
7
- from agent_server.sanitizing_think_tags import scrub_think_tags
 
 
 
 
 
 
 
8
 
 
 
9
 
10
- class QueueWriter(io.TextIOBase):
11
- """
12
- File-like object that pushes each write to an asyncio.Queue immediately.
13
- """
 
 
 
 
 
14
 
15
- def __init__(self, q: "asyncio.Queue"):
16
- self.q = q
17
- self._lock = threading.Lock()
18
- self._buf = [] # accumulate until newline to reduce spam
19
-
20
- def write(self, s: str):
21
- if not s:
22
- return 0
23
- with self._lock:
24
- self._buf.append(s)
25
- # flush on newline to keep granularity reasonable
26
- if "\n" in s:
27
- chunk = "".join(self._buf)
28
- self._buf.clear()
29
- try:
30
- self.q.put_nowait({"__stdout__": chunk})
31
- except Exception:
32
- pass
33
- return len(s)
34
 
35
- def flush(self):
36
- with self._lock:
37
- if self._buf:
38
- chunk = "".join(self._buf)
39
- self._buf.clear()
40
- try:
41
- self.q.put_nowait({"__stdout__": chunk})
42
- except Exception:
43
- pass
44
 
 
 
 
 
 
45
 
46
- def _serialize_step(step) -> str:
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
  """
48
- Best-effort pretty string for a smolagents MemoryStep / ActionStep.
49
- Works even if attributes are missing on some versions.
 
 
 
50
  """
51
- parts = []
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
52
  sn = getattr(step, "step_number", None)
53
  if sn is not None:
54
  parts.append(f"Step {sn}")
 
 
55
  thought_val = getattr(step, "thought", None)
56
  if thought_val:
57
- parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
 
 
58
  tool_val = getattr(step, "tool", None)
59
  if tool_val:
60
- parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
 
 
61
  code_val = getattr(step, "code", None)
62
  if code_val:
63
- code_str = scrub_think_tags(str(code_val)).strip()
64
- parts.append("```python\n" + code_str + "\n```")
 
 
 
65
  args = getattr(step, "args", None)
66
  if args:
67
  try:
68
- parts.append(
69
- "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
70
- )
71
  except Exception:
72
- parts.append("Args: " + scrub_think_tags(str(args)))
 
 
 
73
  error = getattr(step, "error", None)
74
  if error:
75
- parts.append(f"Error: {scrub_think_tags(str(error))}")
 
 
76
  obs = getattr(step, "observations", None)
77
  if obs is not None:
78
  if isinstance(obs, (list, tuple)):
79
  obs_str = "\n".join(map(str, obs))
80
  else:
81
  obs_str = str(obs)
82
- parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
83
- # If this looks like a FinalAnswer step object, surface a clean final answer
84
- try:
85
- tname = type(step).__name__
86
- except Exception:
87
- tname = ""
88
- if tname.lower().startswith("finalanswer"):
89
  out = getattr(step, "output", None)
90
  if out is not None:
91
- return f"Final answer: {scrub_think_tags(str(out)).strip()}"
92
- # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
93
- s = scrub_think_tags(str(step))
94
- m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
95
- if m:
96
- return f"Final answer: {m.group(1).strip()}"
97
- # If the only content would be an object repr like FinalAnswerStep(...), drop it;
98
- # a cleaner "Final answer: ..." will come from the rule above or stdout.
99
  joined = "\n".join(parts).strip()
100
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
101
  return ""
102
- return joined or scrub_think_tags(str(step))
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # agent_server/std_tee.py
2
+ from __future__ import annotations
3
+
4
  import asyncio
5
  import io
6
  import json
7
  import re
8
  import threading
9
+ import typing as t
10
 
11
+ # ---- Think-tag scrubber (import with safe fallback) -------------------------
12
+ try:
13
+ # Same-package relative import is preferred
14
+ from .sanitizing_think_tags import scrub_think_tags # type: ignore
15
+ except Exception: # pragma: no cover
16
+ # No-op fallback if the project layout differs
17
+ def scrub_think_tags(s: str) -> str:
18
+ return s
19
 
20
+ # ---- Formatting helpers (ANSI, noise, truncation) --------------------------
21
+ _ANSI_RE = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]")
22
 
23
+ # Lines that should never be surfaced (system prompt and obvious boilerplate)
24
+ _NOISY_PREFIXES = (
25
+ "OpenAIServerModel",
26
+ "Output message of the LLM",
27
+ "New run",
28
+ "─ Executing parsed code",
29
+ "╭", "╰", "│", "━", "─",
30
+ "System prompt", "SYSTEM PROMPT", "System Prompt",
31
+ )
32
 
33
+ # Very long single lines without enough alphanumerics are dropped
34
+ _MIN_SIG_CHARS = re.compile(r"[A-Za-z0-9]{3,}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
 
36
+ def _strip_ansi_and_think(s: str) -> str:
37
+ s = scrub_think_tags(s)
38
+ s = _ANSI_RE.sub("", s)
39
+ return s.strip()
 
 
 
 
 
40
 
41
+ def _truncate(s: str, n: int) -> str:
42
+ s = s.strip()
43
+ if len(s) <= n:
44
+ return s
45
+ return s[:n] + "\n… [truncated]"
46
 
47
+ def _clean_line(s: str) -> str:
48
+ return _strip_ansi_and_think(s).rstrip("\n")
49
+
50
+ # ---- Public-ish helpers used by agent_streaming ----------------------------
51
+ _FINAL_RE = re.compile(r"(?:^|\\b)Final\\s+answer:\\s*(.+)$", flags=re.IGNORECASE)
52
+
53
+ def _maybe_parse_final_from_stdout(line: str) -> t.Optional[str]:
54
+ if not isinstance(line, str):
55
+ return None
56
+ line = _clean_line(line)
57
+ m = _FINAL_RE.search(line)
58
+ if not m:
59
+ return None
60
+ return _clean_line(m.group(1)) or None
61
+
62
+ def _format_reasoning_chunk(text: str, tag: str, idx: int) -> str:
63
  """
64
+ Lightweight formatter for reasoning stream from stdout.
65
+ - scrubs <think>…</think>
66
+ - strips ANSI
67
+ - drops banners/box drawing and 'System prompt …'
68
+ - drops very-long low-signal lines
69
  """
70
+ stripped = _clean_line(text)
71
+ if not stripped:
72
+ return ""
73
+ if any(stripped.startswith(p) for p in _NOISY_PREFIXES):
74
+ return ""
75
+ if all(ch in " ─━╭╮╰╯│═·—-_=+•" for ch in stripped):
76
+ return ""
77
+ if len(stripped) > 240 and not _MIN_SIG_CHARS.search(stripped):
78
+ return ""
79
+ return f"{stripped}\n\n"
80
+
81
+ def _serialize_step(step: t.Any) -> str:
82
+ """
83
+ Compact, uniform serializer for 'step' objects from different agent libs.
84
+ Produces:
85
+ Step N
86
+ 🧠 Thought: …
87
+ 🛠️ Tool: …
88
+ 📥 Args: …
89
+ 📤 Observation: …
90
+ 💥 Error: …
91
+ (plus code fences when code is present)
92
+ With truncation to keep the reveal parsimonious.
93
+ """
94
+ parts: list[str] = []
95
+
96
+ # Step number (best-effort)
97
  sn = getattr(step, "step_number", None)
98
  if sn is not None:
99
  parts.append(f"Step {sn}")
100
+
101
+ # Thought
102
  thought_val = getattr(step, "thought", None)
103
  if thought_val:
104
+ parts.append(f"🧠 Thought: {_truncate(_strip_ansi_and_think(str(thought_val)), 600)}")
105
+
106
+ # Tool
107
  tool_val = getattr(step, "tool", None)
108
  if tool_val:
109
+ parts.append(f"🛠️ Tool: {_truncate(_strip_ansi_and_think(str(tool_val)), 240)}")
110
+
111
+ # Code (if any)
112
  code_val = getattr(step, "code", None)
113
  if code_val:
114
+ code_str = _truncate(_strip_ansi_and_think(str(code_val)), 1600)
115
+ if code_str:
116
+ parts.append("```python\n" + code_str + "\n```")
117
+
118
+ # Args
119
  args = getattr(step, "args", None)
120
  if args:
121
  try:
122
+ arg_s = _truncate(_strip_ansi_and_think(json.dumps(args, ensure_ascii=False)), 800)
 
 
123
  except Exception:
124
+ arg_s = _truncate(_strip_ansi_and_think(str(args)), 800)
125
+ parts.append("📥 Args: " + arg_s)
126
+
127
+ # Error
128
  error = getattr(step, "error", None)
129
  if error:
130
+ parts.append(f"💥 Error: {_truncate(_strip_ansi_and_think(str(error)), 600)}")
131
+
132
+ # Observations
133
  obs = getattr(step, "observations", None)
134
  if obs is not None:
135
  if isinstance(obs, (list, tuple)):
136
  obs_str = "\n".join(map(str, obs))
137
  else:
138
  obs_str = str(obs)
139
+ parts.append("📤 Observation:\n" + _truncate(_strip_ansi_and_think(obs_str), 1600))
140
+
141
+ # Final answer via explicit action type patterns (best-effort)
142
+ tname = getattr(step, "type_name", "") or getattr(step, "type", "") or ""
143
+ if isinstance(tname, str) and tname.lower().startswith("finalanswer"):
 
 
144
  out = getattr(step, "output", None)
145
  if out is not None:
146
+ return f"Final answer: {_strip_ansi_and_think(str(out))}"
147
+
148
+ # Fallback: parse repr
149
+ s = _strip_ansi_and_think(str(step))
150
+ m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
151
+ if m:
152
+ return f"Final answer: {m.group(1).strip()}"
153
+
154
  joined = "\n".join(parts).strip()
155
  if re.match(r"^FinalAnswer[^\n]+\)$", joined):
156
  return ""
157
+ return joined or s
158
+
159
+ # ---- Tee for redirecting stdout/stderr into an asyncio.Queue ----------------
160
+ class QueueWriter(io.TextIOBase):
161
+ """
162
+ Minimal text writer that sends lines into an asyncio.Queue.
163
+ Each non-empty line is enqueued as {"__stdout__": "<line>"}.
164
+ """
165
+ def __init__(self, q: "asyncio.Queue[dict]"):
166
+ self._q = q
167
+ self._buf = []
168
+
169
+ def write(self, s: str) -> int:
170
+ if not isinstance(s, str):
171
+ s = str(s)
172
+ # Buffer until newline; then emit line events
173
+ self._buf.append(s)
174
+ text = "".join(self._buf)
175
+ if "\n" in text:
176
+ lines = text.splitlines(keepends=True)
177
+ # keep last partial (no newline) in buffer
178
+ tail = "" if text.endswith("\n") else lines.pop()
179
+ for ln in lines:
180
+ clean = _clean_line(ln)
181
+ if clean:
182
+ try:
183
+ # downstream streamer will call _format_reasoning_chunk & co.
184
+ self._q.put_nowait({"__stdout__": clean})
185
+ except Exception:
186
+ pass
187
+ self._buf = [tail]
188
+ return len(s)
189
+
190
+ def flush(self) -> None:
191
+ if self._buf:
192
+ text = "".join(self._buf)
193
+ self._buf.clear()
194
+ clean = _clean_line(text)
195
+ if clean:
196
+ try:
197
+ self._q.put_nowait({"__stdout__": clean})
198
+ except Exception:
199
+ pass
200
+
201
+ def isatty(self) -> bool: # for libraries that test it
202
+ return False
203
+
204
+ # import asyncio
205
+ # import io
206
+ # import json
207
+ # import re
208
+ # import threading
209
+ #
210
+ # from agent_server.sanitizing_think_tags import scrub_think_tags
211
+ #
212
+ #
213
+ # class QueueWriter(io.TextIOBase):
214
+ # """
215
+ # File-like object that pushes each write to an asyncio.Queue immediately.
216
+ # """
217
+ #
218
+ # def __init__(self, q: "asyncio.Queue"):
219
+ # self.q = q
220
+ # self._lock = threading.Lock()
221
+ # self._buf = [] # accumulate until newline to reduce spam
222
+ #
223
+ # def write(self, s: str):
224
+ # if not s:
225
+ # return 0
226
+ # with self._lock:
227
+ # self._buf.append(s)
228
+ # # flush on newline to keep granularity reasonable
229
+ # if "\n" in s:
230
+ # chunk = "".join(self._buf)
231
+ # self._buf.clear()
232
+ # try:
233
+ # self.q.put_nowait({"__stdout__": chunk})
234
+ # except Exception:
235
+ # pass
236
+ # return len(s)
237
+ #
238
+ # def flush(self):
239
+ # with self._lock:
240
+ # if self._buf:
241
+ # chunk = "".join(self._buf)
242
+ # self._buf.clear()
243
+ # try:
244
+ # self.q.put_nowait({"__stdout__": chunk})
245
+ # except Exception:
246
+ # pass
247
+ #
248
+ #
249
+ # def _serialize_step(step) -> str:
250
+ # """
251
+ # Best-effort pretty string for a smolagents MemoryStep / ActionStep.
252
+ # Works even if attributes are missing on some versions.
253
+ # """
254
+ # parts = []
255
+ # sn = getattr(step, "step_number", None)
256
+ # if sn is not None:
257
+ # parts.append(f"Step {sn}")
258
+ # thought_val = getattr(step, "thought", None)
259
+ # if thought_val:
260
+ # parts.append(f"Thought: {scrub_think_tags(str(thought_val))}")
261
+ # tool_val = getattr(step, "tool", None)
262
+ # if tool_val:
263
+ # parts.append(f"Tool: {scrub_think_tags(str(tool_val))}")
264
+ # code_val = getattr(step, "code", None)
265
+ # if code_val:
266
+ # code_str = scrub_think_tags(str(code_val)).strip()
267
+ # parts.append("```python\n" + code_str + "\n```")
268
+ # args = getattr(step, "args", None)
269
+ # if args:
270
+ # try:
271
+ # parts.append(
272
+ # "Args: " + scrub_think_tags(json.dumps(args, ensure_ascii=False))
273
+ # )
274
+ # except Exception:
275
+ # parts.append("Args: " + scrub_think_tags(str(args)))
276
+ # error = getattr(step, "error", None)
277
+ # if error:
278
+ # parts.append(f"Error: {scrub_think_tags(str(error))}")
279
+ # obs = getattr(step, "observations", None)
280
+ # if obs is not None:
281
+ # if isinstance(obs, (list, tuple)):
282
+ # obs_str = "\n".join(map(str, obs))
283
+ # else:
284
+ # obs_str = str(obs)
285
+ # parts.append("Observation:\n" + scrub_think_tags(obs_str).strip())
286
+ # # If this looks like a FinalAnswer step object, surface a clean final answer
287
+ # try:
288
+ # tname = type(step).__name__
289
+ # except Exception:
290
+ # tname = ""
291
+ # if tname.lower().startswith("finalanswer"):
292
+ # out = getattr(step, "output", None)
293
+ # if out is not None:
294
+ # return f"Final answer: {scrub_think_tags(str(out)).strip()}"
295
+ # # Fallback: try to parse from string repr "FinalAnswerStep(output=...)"
296
+ # s = scrub_think_tags(str(step))
297
+ # m = re.search(r"FinalAnswer[^()]*\(\s*output\s*=\s*([^,)]+)", s)
298
+ # if m:
299
+ # return f"Final answer: {m.group(1).strip()}"
300
+ # # If the only content would be an object repr like FinalAnswerStep(...), drop it;
301
+ # # a cleaner "Final answer: ..." will come from the rule above or stdout.
302
+ # joined = "\n".join(parts).strip()
303
+ # if re.match(r"^FinalAnswer[^\n]+\)$", joined):
304
+ # return ""
305
+ # return joined or scrub_think_tags(str(step))