Skip to content

donna.automations.dispatcher

donna.automations.dispatcher

AutomationDispatcher — executes one due automation end-to-end.

Spec §6.9: skill vs claude_native resolution, per-run budget cap, global BudgetGuard, alert evaluation + dispatch, consecutive-failure pause.

logger module-attribute

logger = get_logger()

DispatchReport dataclass

DispatchReport(automation_id: str, run_id: str | None, outcome: str, alert_sent: bool, error: str | None = None)

automation_id instance-attribute

automation_id: str

run_id instance-attribute

run_id: str | None

outcome instance-attribute

outcome: str

alert_sent instance-attribute

alert_sent: bool

error class-attribute instance-attribute

error: str | None = None

AutomationDispatcher

AutomationDispatcher(*, connection: Connection, repository: AutomationRepository, model_router: Any, skill_executor_factory: Callable[[], Any], budget_guard: Any, alert_evaluator: Any, cron: Any, notifier: Any, config: SkillSystemConfig)
Source code in src/donna/automations/dispatcher.py
def __init__(
    self,
    *,
    connection: aiosqlite.Connection,
    repository: AutomationRepository,
    model_router: Any,
    skill_executor_factory: Callable[[], Any],
    budget_guard: Any,
    alert_evaluator: Any,
    cron: Any,
    notifier: Any,
    config: SkillSystemConfig,
) -> None:
    self._conn = connection
    self._repo = repository
    self._router = model_router
    self._skill_executor_factory = skill_executor_factory
    self._budget_guard = budget_guard
    self._alerts = alert_evaluator
    self._cron = cron
    self._notifier = notifier
    self._config = config

dispatch async

dispatch(automation: AutomationRow) -> DispatchReport
Source code in src/donna/automations/dispatcher.py
async def dispatch(self, automation: AutomationRow) -> DispatchReport:
    now = datetime.now(UTC)

    try:
        if self._budget_guard is not None:
            await self._budget_guard.check_pre_call(user_id=automation.user_id)
    except BudgetPausedError:
        next_run_at = self._compute_next_run(automation, now)
        await self._repo.advance_schedule(
            automation_id=automation.id,
            last_run_at=now, next_run_at=next_run_at,
            increment_run_count=False, increment_failure_count=False,
        )
        logger.info("automation_skipped_budget", automation_id=automation.id)
        return DispatchReport(
            automation_id=automation.id, run_id=None,
            outcome="skipped_budget", alert_sent=False,
        )

    path = await self._decide_path(automation.capability_name)
    run_id = await self._repo.insert_run(
        automation_id=automation.id, started_at=now,
        execution_path=path,
    )

    output: dict[str, Any] | None = None
    skill_run_id: str | None = None
    invocation_log_id: str | None = None
    cost_usd: float = 0.0
    run_status = "failed"
    error: str | None = None
    alert_sent = False
    alert_content: str | None = None

    try:
        if path == "skill":
            executor = self._skill_executor_factory()
            if executor is None:
                raise RuntimeError("skill path selected but executor_factory returned None")
            result = await self._execute_skill(executor, automation, automation_run_id=run_id)
            output = result.final_output if isinstance(result.final_output, dict) else None
            cost_usd = float(getattr(result, "total_cost_usd", 0.0) or 0.0)
            run_status = result.status
            skill_run_id = getattr(result, "run_id", None)
            if result.status != "succeeded":
                error = (
                    getattr(result, "error", None)
                    or getattr(result, "escalation_reason", None)
                )
        else:
            prior_run_end = await self._query_prior_run_end(automation_id=automation.id)
            parsed, metadata = await self._router.complete(
                prompt=self._build_prompt(automation, prior_run_end=prior_run_end),
                task_type=automation.capability_name,
                task_id=None,
                user_id=automation.user_id,
            )
            output = parsed if isinstance(parsed, dict) else {"output": parsed}
            invocation_log_id = getattr(metadata, "invocation_id", None)
            cost_usd = float(getattr(metadata, "cost_usd", 0.0) or 0.0)
            run_status = "succeeded"
    except BudgetPausedError:
        await self._repo.finish_run(
            run_id=run_id, status="skipped_budget",
            output=None, skill_run_id=None, invocation_log_id=None,
            alert_sent=False, alert_content=None, error=None,
            cost_usd=0.0,
        )
        next_run_at = self._compute_next_run(automation, now)
        await self._repo.advance_schedule(
            automation_id=automation.id, last_run_at=now,
            next_run_at=next_run_at,
            increment_run_count=False, increment_failure_count=False,
        )
        return DispatchReport(
            automation_id=automation.id, run_id=run_id,
            outcome="skipped_budget", alert_sent=False,
        )
    except Exception as exc:
        error = str(exc)
        run_status = "failed"
        logger.warning(
            "automation_run_exception",
            automation_id=automation.id, error=error,
        )

    if (
        run_status == "succeeded"
        and automation.max_cost_per_run_usd is not None
        and cost_usd > automation.max_cost_per_run_usd
    ):
        run_status = "failed"
        error = "cost_exceeded"

    if run_status == "succeeded" and output is not None:
        try:
            fires = self._alerts.evaluate(automation.alert_conditions, output)
        except Exception as exc:
            logger.warning(
                "automation_alert_check_failed",
                automation_id=automation.id, error=str(exc),
            )
            fires = False
        if fires:
            alert_content = self._render_alert_content(automation, output)
            try:
                if self._notifier is not None:
                    await self._notifier.dispatch(
                        notification_type=NOTIF_AUTOMATION_ALERT,
                        content=alert_content,
                        channel=CHANNEL_TASKS,
                        priority=3,
                    )
                    alert_sent = True
            except Exception:
                logger.exception(
                    "automation_alert_dispatch_failed",
                    automation_id=automation.id,
                )

    await self._repo.finish_run(
        run_id=run_id, status=run_status,
        output=output, skill_run_id=skill_run_id,
        invocation_log_id=invocation_log_id,
        alert_sent=alert_sent, alert_content=alert_content,
        error=error, cost_usd=cost_usd,
    )

    run_succeeded = run_status == "succeeded"
    next_run_at = self._compute_next_run(automation, now)
    await self._repo.advance_schedule(
        automation_id=automation.id, last_run_at=now,
        next_run_at=next_run_at,
        increment_run_count=True,
        increment_failure_count=not run_succeeded,
    )
    if run_succeeded:
        await self._repo.reset_failure_count(automation.id)

    if not run_succeeded:
        updated = await self._repo.get(automation.id)
        if (
            updated is not None
            and updated.failure_count >= self._config.automation_failure_pause_threshold
        ):
            await self._repo.set_status(automation.id, "paused")
            pause_msg = (
                f"Automation '{automation.name}' paused after "
                f"{updated.failure_count} consecutive failures. "
                f"Last error: {error or 'unknown'}"
            )
            try:
                if self._notifier is not None:
                    await self._notifier.dispatch(
                        notification_type=NOTIF_AUTOMATION_FAILURE,
                        content=pause_msg, channel=CHANNEL_TASKS, priority=4,
                    )
            except Exception:
                logger.exception(
                    "automation_pause_notification_failed",
                    automation_id=automation.id,
                )

    outcome = self._classify_outcome(run_status, error)
    return DispatchReport(
        automation_id=automation.id, run_id=run_id,
        outcome=outcome, alert_sent=alert_sent, error=error,
    )