| | |
| | from __future__ import annotations |
| | import json, time, uuid, logging |
| | from typing import Any, Dict, List, AsyncIterable, Optional |
| | from backends_base import ChatBackend, ImagesBackend |
| |
|
| | from rabbit_repo import RabbitRepo |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | |
| | def _now() -> int: return int(time.time()) |
| | def _chunk_text(s: str, sz: int = 140) -> List[str]: |
| | return [s[i:i+sz] for i in range(0, len(s or ""), sz)] if s else [] |
| | def _last_user_text(messages: List[Dict[str, Any]]) -> str: |
| | for m in reversed(messages or []): |
| | if (m or {}).get("role") == "user": |
| | c = m.get("content", "") |
| | if isinstance(c, str): |
| | return c |
| | if isinstance(c, list): |
| | texts = [p.get("text","") for p in c if p.get("type") == "text"] |
| | return " ".join([t for t in texts if t]) |
| | return "" |
| |
|
| |
|
| | |
| | class OpenAIServers: |
| | """ |
| | Handlers you can register in RabbitListenerBase: |
| | - 'oaChatCreate' -> handle_chat_create |
| | - 'oaImagesGenerate' -> handle_images_generate |
| | Uses RabbitRepo.publish(...) to emit CloudEvent-wrapped OpenAI JSON. |
| | """ |
| |
|
| | def __init__(self, publisher: RabbitRepo, |
| | *, chat_backend=None, images_backend=None): |
| | self._pub = publisher |
| | self._chat = chat_backend |
| | self._img = images_backend |
| |
|
| |
|
| | |
| | async def handle_chat_create(self, data: Dict[str, Any]) -> None: |
| | """ |
| | data: OpenAI chat request + 'reply_key' (string) |
| | Server publishes to exchange 'oa.chat.reply' with routing_key = reply_key. |
| | """ |
| | if not isinstance(data, dict): |
| | logger.warning("oaChatCreate: data is not a dict") |
| | return |
| |
|
| | reply_key = data.get("reply_key") |
| | if not reply_key: |
| | logger.error("oaChatCreate: missing reply_key") |
| | return |
| |
|
| | try: |
| | async for chunk in self._chat.stream(data): |
| | |
| | await self._pub.publish("oa.chat.reply", chunk, routing_key=reply_key) |
| | |
| | await self._pub.publish("oa.chat.reply", {"object": "stream.end"}, routing_key=reply_key) |
| | except Exception: |
| | logger.exception("oaChatCreate: streaming failed") |
| |
|
| | |
| | async def handle_images_generate(self, data: Dict[str, Any]) -> None: |
| | """ |
| | data: OpenAI images.generate request + 'reply_key' (string) |
| | """ |
| | if not isinstance(data, dict): |
| | logger.warning("oaImagesGenerate: data is not a dict") |
| | return |
| | reply_key = data.get("reply_key") |
| | if not reply_key: |
| | logger.error("oaImagesGenerate: missing reply_key") |
| | return |
| |
|
| | try: |
| | b64 = await self._img.generate_b64(data) |
| | resp = {"created": _now(), "data":[{"b64_json": b64}]} |
| | await self._pub.publish("oa.images.reply", resp, routing_key=reply_key) |
| | except Exception: |
| | logger.exception("oaImagesGenerate: generation failed") |
| |
|
| | |
| | |
| | try: |
| | ChatBackend |
| | except NameError: |
| | try: |
| | from typing import TYPE_CHECKING |
| | |
| | ChatBackend = ChatCompletionsBackend |
| | except Exception: |
| | pass |
| |
|
| | try: |
| | ImagesBackend |
| | except NameError: |
| | try: |
| | ImagesBackend = ImageGenerationsBackend |
| | except Exception: |
| | pass |
| |
|
| |
|