async def dispatch(self, automation: AutomationRow) -> DispatchReport:
now = datetime.now(UTC)
try:
if self._budget_guard is not None:
await self._budget_guard.check_pre_call(user_id=automation.user_id)
except BudgetPausedError:
next_run_at = self._compute_next_run(automation, now)
await self._repo.advance_schedule(
automation_id=automation.id,
last_run_at=now, next_run_at=next_run_at,
increment_run_count=False, increment_failure_count=False,
)
logger.info("automation_skipped_budget", automation_id=automation.id)
return DispatchReport(
automation_id=automation.id, run_id=None,
outcome="skipped_budget", alert_sent=False,
)
path = await self._decide_path(automation.capability_name)
run_id = await self._repo.insert_run(
automation_id=automation.id, started_at=now,
execution_path=path,
)
output: dict[str, Any] | None = None
skill_run_id: str | None = None
invocation_log_id: str | None = None
cost_usd: float = 0.0
run_status = "failed"
error: str | None = None
alert_sent = False
alert_content: str | None = None
try:
if path == "skill":
executor = self._skill_executor_factory()
if executor is None:
raise RuntimeError("skill path selected but executor_factory returned None")
result = await self._execute_skill(executor, automation, automation_run_id=run_id)
output = result.final_output if isinstance(result.final_output, dict) else None
cost_usd = float(getattr(result, "total_cost_usd", 0.0) or 0.0)
run_status = result.status
skill_run_id = getattr(result, "run_id", None)
if result.status != "succeeded":
error = (
getattr(result, "error", None)
or getattr(result, "escalation_reason", None)
)
else:
prior_run_end = await self._query_prior_run_end(automation_id=automation.id)
parsed, metadata = await self._router.complete(
prompt=self._build_prompt(automation, prior_run_end=prior_run_end),
task_type=automation.capability_name,
task_id=None,
user_id=automation.user_id,
)
output = parsed if isinstance(parsed, dict) else {"output": parsed}
invocation_log_id = getattr(metadata, "invocation_id", None)
cost_usd = float(getattr(metadata, "cost_usd", 0.0) or 0.0)
run_status = "succeeded"
except BudgetPausedError:
await self._repo.finish_run(
run_id=run_id, status="skipped_budget",
output=None, skill_run_id=None, invocation_log_id=None,
alert_sent=False, alert_content=None, error=None,
cost_usd=0.0,
)
next_run_at = self._compute_next_run(automation, now)
await self._repo.advance_schedule(
automation_id=automation.id, last_run_at=now,
next_run_at=next_run_at,
increment_run_count=False, increment_failure_count=False,
)
return DispatchReport(
automation_id=automation.id, run_id=run_id,
outcome="skipped_budget", alert_sent=False,
)
except Exception as exc:
error = str(exc)
run_status = "failed"
logger.warning(
"automation_run_exception",
automation_id=automation.id, error=error,
)
if (
run_status == "succeeded"
and automation.max_cost_per_run_usd is not None
and cost_usd > automation.max_cost_per_run_usd
):
run_status = "failed"
error = "cost_exceeded"
if run_status == "succeeded" and output is not None:
try:
fires = self._alerts.evaluate(automation.alert_conditions, output)
except Exception as exc:
logger.warning(
"automation_alert_check_failed",
automation_id=automation.id, error=str(exc),
)
fires = False
if fires:
alert_content = self._render_alert_content(automation, output)
try:
if self._notifier is not None:
await self._notifier.dispatch(
notification_type=NOTIF_AUTOMATION_ALERT,
content=alert_content,
channel=CHANNEL_TASKS,
priority=3,
)
alert_sent = True
except Exception:
logger.exception(
"automation_alert_dispatch_failed",
automation_id=automation.id,
)
await self._repo.finish_run(
run_id=run_id, status=run_status,
output=output, skill_run_id=skill_run_id,
invocation_log_id=invocation_log_id,
alert_sent=alert_sent, alert_content=alert_content,
error=error, cost_usd=cost_usd,
)
run_succeeded = run_status == "succeeded"
next_run_at = self._compute_next_run(automation, now)
await self._repo.advance_schedule(
automation_id=automation.id, last_run_at=now,
next_run_at=next_run_at,
increment_run_count=True,
increment_failure_count=not run_succeeded,
)
if run_succeeded:
await self._repo.reset_failure_count(automation.id)
if not run_succeeded:
updated = await self._repo.get(automation.id)
if (
updated is not None
and updated.failure_count >= self._config.automation_failure_pause_threshold
):
await self._repo.set_status(automation.id, "paused")
pause_msg = (
f"Automation '{automation.name}' paused after "
f"{updated.failure_count} consecutive failures. "
f"Last error: {error or 'unknown'}"
)
try:
if self._notifier is not None:
await self._notifier.dispatch(
notification_type=NOTIF_AUTOMATION_FAILURE,
content=pause_msg, channel=CHANNEL_TASKS, priority=4,
)
except Exception:
logger.exception(
"automation_pause_notification_failed",
automation_id=automation.id,
)
outcome = self._classify_outcome(run_status, error)
return DispatchReport(
automation_id=automation.id, run_id=run_id,
outcome=outcome, alert_sent=alert_sent, error=error,
)