| | import json |
| | from dataclasses import dataclass, asdict |
| | from datetime import datetime, timezone |
| | from typing import Any |
| |
|
| | @dataclass |
| | class CloudEvent: |
| | id: str |
| | type: str |
| | source: str |
| | time: str |
| | data: Any |
| |
|
| | @staticmethod |
| | def wrap(obj: Any, *, event_type: str, source: str, id: str) -> bytes: |
| | evt = CloudEvent( |
| | id=id, |
| | type=event_type or (obj.__class__.__name__ if obj is not None else "NullOrEmpty"), |
| | source=source, |
| | time=datetime.now(timezone.utc).isoformat(), |
| | data=obj, |
| | ) |
| | return json.dumps(asdict(evt), ensure_ascii=False).encode("utf-8") |
| |
|