| | |
| | import json |
| | from typing import Callable, Awaitable, Dict, Any, List |
| |
|
| | import aio_pika |
| |
|
| | Handler = Callable[[Any], Awaitable[None]] |
| |
|
| |
|
| | class RabbitListenerBase: |
| | def __init__(self, base, instance_name: str, handlers: Dict[str, Handler]): |
| | self._base = base |
| | self._instance_name = instance_name |
| | self._handlers = handlers |
| | self._consumers: List[aio_pika.abc.AbstractRobustQueue] = [] |
| |
|
| | def _qname(self, exchange: str, routing_keys: List[str]) -> str: |
| | rk_part = "-".join(sorted([rk for rk in (routing_keys or [""]) if rk])) or "" |
| | suffix = f"-{rk_part}" if rk_part else "" |
| | return f"{self._instance_name}-{exchange}{suffix}" |
| |
|
| | async def start(self, declarations: List[dict]) -> None: |
| | for d in declarations: |
| | exch = d["ExchangeName"] |
| | ttl = d.get("MessageTimeout") or None |
| | rks = d.get("RoutingKeys") or [""] |
| | qname = self._qname(exch, rks) |
| | q = await self._base.declare_queue_bind( |
| | exchange=exch, queue_name=qname, routing_keys=rks, ttl_ms=ttl |
| | ) |
| | await q.consume(self._make_consumer(d["FuncName"])) |
| | self._consumers.append(q) |
| |
|
| | def _make_consumer(self, func_name: str): |
| | handler = self._handlers.get(func_name) |
| |
|
| | async def _on_msg(msg: aio_pika.IncomingMessage): |
| | async with msg.process(): |
| | try: |
| | envelope = json.loads(msg.body.decode("utf-8")) |
| | |
| | data = envelope.get("data", None) |
| | if handler: |
| | await handler(data) |
| | except Exception: |
| | |
| | pass |
| |
|
| | return _on_msg |
| |
|