Skip to content

donna.tasks.events

donna.tasks.events

Lightweight async pub/sub for task lifecycle events.

Subscribers receive (task, **context) and must be async callables. Exceptions in subscribers are logged and swallowed — a failing subscriber must never break the caller's flow.

logger module-attribute

logger = get_logger()

Callback module-attribute

Callback = Callable[..., Coroutine[Any, Any, None]]

TaskEventBus

TaskEventBus()

In-process async event bus for task lifecycle events.

Source code in src/donna/tasks/events.py
def __init__(self) -> None:
    self._subscribers: dict[str, list[Callback]] = defaultdict(list)

subscribe

subscribe(event_type: str, callback: Callback) -> None
Source code in src/donna/tasks/events.py
def subscribe(self, event_type: str, callback: Callback) -> None:
    self._subscribers[event_type].append(callback)

emit async

emit(event_type: str, *, task: Any, **context: Any) -> None
Source code in src/donna/tasks/events.py
async def emit(self, event_type: str, *, task: Any, **context: Any) -> None:
    for callback in self._subscribers.get(event_type, []):
        try:
            await callback(task, **context)
        except Exception:
            logger.exception(
                "event_subscriber_failed",
                event_type=event_type,
                subscriber=getattr(callback, "__qualname__", str(callback)),
            )