johnbridges commited on
Commit
552430d
·
1 Parent(s): 66c4f69
Files changed (3) hide show
  1. hf_backend.py +3 -7
  2. listener.py +31 -16
  3. requirements.txt +1 -1
hf_backend.py CHANGED
@@ -141,8 +141,8 @@ class HFChatBackend(ChatBackend):
141
 
142
  messages = request.get("messages", [])
143
  tools = request.get("tools")
144
- temperature = float(request.get("temperature", settings.LlmTemp or 0.7))
145
- req_max_tokens = int(request.get("max_tokens", settings.LlmOpenAICtxSize or 512))
146
 
147
  rid = f"chatcmpl-hf-{int(time.time())}"
148
  now = int(time.time())
@@ -177,11 +177,7 @@ class HFChatBackend(ChatBackend):
177
  def _run_once(prompt: str, device: str, req_dtype: torch.dtype) -> str:
178
  model, eff_dtype = _get_model(device, req_dtype)
179
 
180
- # Clamp max_new_tokens for CPU to prevent stalls
181
- if device == "cpu":
182
- max_new_tokens = min(req_max_tokens, 512)
183
- else:
184
- max_new_tokens = req_max_tokens
185
 
186
  # Build inputs with context-aware truncation
187
  inputs, orig_in_len, ctx, limit = _build_inputs_with_truncation(prompt, device, max_new_tokens, model, tokenizer)
 
141
 
142
  messages = request.get("messages", [])
143
  tools = request.get("tools")
144
+ temperature = float(request.get("temperature", settings.LlmTemp or 0.3))
145
+ req_max_tokens = int(request.get("max_tokens", settings.LlmOpenAICtxSize or 32000))
146
 
147
  rid = f"chatcmpl-hf-{int(time.time())}"
148
  now = int(time.time())
 
177
  def _run_once(prompt: str, device: str, req_dtype: torch.dtype) -> str:
178
  model, eff_dtype = _get_model(device, req_dtype)
179
 
180
+ max_new_tokens = req_max_tokens
 
 
 
 
181
 
182
  # Build inputs with context-aware truncation
183
  inputs, orig_in_len, ctx, limit = _build_inputs_with_truncation(prompt, device, max_new_tokens, model, tokenizer)
listener.py CHANGED
@@ -3,16 +3,16 @@ import logging
3
  from typing import Callable, Awaitable, Dict, Any, List
4
 
5
  import aio_pika
 
6
 
7
- Handler = Callable[[Any], Awaitable[None]] # payload is envelope["data"]
8
-
9
  logger = logging.getLogger(__name__)
10
 
11
 
12
  class RabbitListenerBase:
13
  def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
14
  self._base = base
15
- self._instance_name = instance_name # queue prefix (like your .NET instance name)
16
  self._handlers = handlers
17
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
18
 
@@ -30,7 +30,7 @@ class RabbitListenerBase:
30
  q = await self._base.declare_queue_bind(
31
  exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
32
  )
33
- # explicit manual-ack, parity with .NET (autoAck: false)
34
  await q.consume(self._make_consumer(d["FuncName"]), no_ack=False)
35
  self._consumers.append(q)
36
 
@@ -38,28 +38,43 @@ class RabbitListenerBase:
38
  handler = self._handlers.get(func_name)
39
 
40
  async def _on_msg(msg: aio_pika.IncomingMessage):
41
- # manual ack after handler completes; no nack/requeue loops
42
  try:
43
  raw_body = msg.body.decode("utf-8", errors="replace")
44
  logger.info("Received message for handler '%s': %s", func_name, raw_body)
45
-
46
- # safe JSON parse to mirror .NET ConvertToObject (no throw)
47
  try:
48
  envelope = json.loads(raw_body)
49
  except Exception:
50
  logger.exception("Invalid JSON for '%s'", func_name)
51
  envelope = {"data": None}
52
-
53
  data = envelope.get("data", None)
 
 
 
 
 
 
 
54
 
55
- if handler:
56
- await handler(data)
57
- else:
58
- logger.error("No handler bound for '%s'", func_name)
59
-
60
- await msg.ack() # ack on success path
 
61
  except Exception:
62
- # match .NET: on exception, do not ack or nack; connection loss will requeue
63
- logger.exception("Error processing message for '%s'", func_name)
 
 
 
 
 
 
 
 
 
 
64
 
65
  return _on_msg
 
3
  from typing import Callable, Awaitable, Dict, Any, List
4
 
5
  import aio_pika
6
+ from aiormq.exceptions import ChannelInvalidStateError
7
 
8
+ Handler = Callable[[Any], Awaitable[None]]
 
9
  logger = logging.getLogger(__name__)
10
 
11
 
12
  class RabbitListenerBase:
13
  def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]):
14
  self._base = base
15
+ self._instance_name = instance_name
16
  self._handlers = handlers
17
  self._consumers: List[aio_pika.abc.AbstractRobustQueue] = []
18
 
 
30
  q = await self._base.declare_queue_bind(
31
  exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl
32
  )
33
+ # manual ack, parity with .NET (autoAck: false)
34
  await q.consume(self._make_consumer(d["FuncName"]), no_ack=False)
35
  self._consumers.append(q)
36
 
 
38
  handler = self._handlers.get(func_name)
39
 
40
  async def _on_msg(msg: aio_pika.IncomingMessage):
41
+ # decode
42
  try:
43
  raw_body = msg.body.decode("utf-8", errors="replace")
44
  logger.info("Received message for handler '%s': %s", func_name, raw_body)
 
 
45
  try:
46
  envelope = json.loads(raw_body)
47
  except Exception:
48
  logger.exception("Invalid JSON for '%s'", func_name)
49
  envelope = {"data": None}
 
50
  data = envelope.get("data", None)
51
+ except Exception:
52
+ # if we cannot decode, ack to drop (matches .NET non-requeue behavior)
53
+ try:
54
+ await msg.ack()
55
+ except Exception:
56
+ pass
57
+ return
58
 
59
+ # ACK FIRST (like C#)
60
+ try:
61
+ await msg.ack()
62
+ except ChannelInvalidStateError:
63
+ # channel died; message may be redelivered; avoid loops
64
+ logger.warning("Ack failed: channel invalid for '%s'. Skipping ack.", func_name)
65
+ return
66
  except Exception:
67
+ # swallow ack errors to avoid crash; mirrors resilient .NET behavior
68
+ logger.exception("Ack error for '%s'", func_name)
69
+ return
70
+
71
+ # run handler after ack; if it fails, caller handles own idempotency
72
+ if handler:
73
+ try:
74
+ await handler(data)
75
+ except Exception:
76
+ logger.exception("Handler error for '%s'", func_name)
77
+ else:
78
+ logger.error("No handler bound for '%s'", func_name)
79
 
80
  return _on_msg
requirements.txt CHANGED
@@ -13,5 +13,5 @@ huggingface_hub
13
 
14
  vllm>=0.10.0
15
  torch>=2.7.1
16
- transformers>=4.50.0
17
 
 
13
 
14
  vllm>=0.10.0
15
  torch>=2.7.1
16
+ transformers>=4.51.0
17