Skip to content

donna.cost.escalation_gate

donna.cost.escalation_gate

Estimate-driven gate for the over-budget decision tree (slice 17/18).

When a task's pre-flight estimate_usd exceeds either the daily budget remaining or task_approval_threshold_usd, the gate writes an :class:EscalationRequest row, posts a Discord message with the configured buttons, and awaits the user's resolution.

Slice 17 shipped Pause + Cancel. Slice 18 adds api_extended: [Approve $X extension] button, idempotent grant via :class:~donna.cost.budget_extension.BudgetExtensionRepository, hard daily and monthly ceilings, and the extension_amount_usd field on :class:GateOutcome for token-limit enforcement.

This is not a replacement for :class:donna.cost.budget.BudgetGuard. BudgetGuard continues to be the post-hoc spend-vs-threshold backstop that runs even when no estimate is available; the gate is the estimate-aware path that gives the user agency.

Realizes docs/superpowers/specs/manual-escalation.md §4, §5.1, §6.1, §10.6.

logger module-attribute

logger = get_logger()

EscalationMode module-attribute

EscalationMode = Literal['pause', 'cancel', 'api_extended', 'chat', 'claude_code']

ResolvedBy module-attribute

ResolvedBy = Literal['user', 'timeout']

DeliveryCallback module-attribute

DeliveryCallback = Callable[[EscalationRequestRow], Awaitable[bool]]

GateOutcome dataclass

GateOutcome(fired: bool, mode: EscalationMode | None, resolved_by: ResolvedBy | None, escalation_request_id: int | None, correlation_id: str | None, extension_amount_usd: float | None = None)

Result returned by :meth:EscalationGate.fire_and_wait.

fired instance-attribute

fired: bool

Whether an escalation was offered (a row was created).

mode instance-attribute

mode: EscalationMode | None

Resolution mode. None when fired is False.

resolved_by instance-attribute

resolved_by: ResolvedBy | None

user for a button click, timeout for the sweeper.

escalation_request_id instance-attribute

escalation_request_id: int | None

FK so callers can stamp resulting invocation_log rows.

correlation_id instance-attribute

correlation_id: str | None

extension_amount_usd class-attribute instance-attribute

extension_amount_usd: float | None = None

Granted extension amount when mode='api_extended'.

Callers use this to derive the max_tokens hard cap so actual spend cannot exceed the approved extension (§10.6 row 1).

EscalationGate

EscalationGate(*, repository: EscalationRepository, tracker: CostTracker, config: ManualEscalationConfig, daily_pause_threshold_usd: float, resolver: DashboardSettingResolver, deliver: DeliveryCallback, extension_repo: BudgetExtensionRepository, task_types_config: TaskTypesConfig | None = None, chat_prompt_builder: ChatPromptBuilder | None = None, spec_builder: ClaudeCodeSpecBuilder | None = None, host_repo: Any = None)

Decides whether to escalate, fires the Discord view, awaits resolution.

Source code in src/donna/cost/escalation_gate.py
def __init__(
    self,
    *,
    repository: EscalationRepository,
    tracker: CostTracker,
    config: ManualEscalationConfig,
    daily_pause_threshold_usd: float,
    resolver: DashboardSettingResolver,
    deliver: DeliveryCallback,
    extension_repo: BudgetExtensionRepository,
    task_types_config: TaskTypesConfig | None = None,
    chat_prompt_builder: ChatPromptBuilder | None = None,
    spec_builder: ClaudeCodeSpecBuilder | None = None,
    host_repo: Any = None,
) -> None:
    self._repo = repository
    self._tracker = tracker
    self._config = config
    # Posture: "shadow" observes (logs would-escalate, never prompts or
    # creates rows); "enforce" runs the full decision tree. See
    # config/manual_escalation.yaml gate.mode (manual-escalation.md §4).
    self._mode = config.gate.mode
    self._daily_pause_threshold_usd = daily_pause_threshold_usd
    self._resolver = resolver
    self._deliver = deliver
    self._extension_repo = extension_repo
    # Slice 20: per-task-type manual mode resolution + chat prompt
    # rendering. Both are optional so existing test fixtures + boots
    # without a Discord bot continue to assemble a gate that only
    # offers Pause / Cancel.
    self._task_types_config = task_types_config
    self._chat_prompt_builder = chat_prompt_builder
    # Slice 21 wiring — claude_code spec rendering + host repo for
    # base_sha capture. Optional; absence disables the claude_code
    # button without crashing boot.
    self._spec_builder = spec_builder
    self._host_repo = host_repo

fire_and_wait async

fire_and_wait(*, user_id: str, task_id: str | None, task_type: str, estimate_usd: float, priority: int = 2, originating_entity: tuple[str, str] | None = None, target_paths: dict[str, str] | None = None, base_sha: str | None = None, original_prompt: str | None = None, estimate_source: str = 'caller') -> GateOutcome

Decide whether to escalate; if so, post the view and await.

Returns a :class:GateOutcome describing what the caller should do next: * fired=False — caller proceeds normally; budget OK. * fired=True, mode='pause' — caller transitions task to paused and exits without spending. * fired=True, mode='cancel' — caller transitions task to cancelled and exits without spending. * fired=True, mode='api_extended' — extension was granted; caller proceeds with the API call. extension_amount_usd is set for token-limit enforcement. * fired=True, mode='chat' — slice 20 manual handoff. Caller should treat the task as parked; the chat-mode ingestion poller transitions it to done once the user submits an answer through the dashboard or /donna submit. * fired=True, mode='claude_code' — slice 21 manual handoff. Caller treats the task as parked; the ClaudeCodePoller validates the user's branch and updates lifecycle state on success.

original_prompt is the fully-rendered prompt the caller would otherwise have sent to the API. Slice 20 uses it (with the chat prompt builder) to produce the prompt body the user pastes into Claude. Without it, chat mode cannot be offered for this call — the gate degrades to Pause / Cancel only.

originating_entity (slice 21) is the FK pair the claude_code poller uses to render {name}-substituted target_paths globs. task_id is NULL for skill_auto_draft and skill_evolution call sites, so this kwarg is the only way the validator can identify the target.

target_paths and base_sha snapshot the manual_escalation scope at gate-fire time. Both are persisted on the row so a config edit mid-flight cannot widen scope retroactively (spec §10.7 row 2) and the worktree command stays pinned to a specific main SHA (spec §5.3 / drift mitigation).

Source code in src/donna/cost/escalation_gate.py
async def fire_and_wait(
    self,
    *,
    user_id: str,
    task_id: str | None,
    task_type: str,
    estimate_usd: float,
    priority: int = 2,
    originating_entity: tuple[str, str] | None = None,
    target_paths: dict[str, str] | None = None,
    base_sha: str | None = None,
    original_prompt: str | None = None,
    estimate_source: str = "caller",
) -> GateOutcome:
    """Decide whether to escalate; if so, post the view and await.

    Returns a :class:`GateOutcome` describing what the caller should
    do next:
      * ``fired=False`` — caller proceeds normally; budget OK.
      * ``fired=True, mode='pause'`` — caller transitions task to
        ``paused`` and exits without spending.
      * ``fired=True, mode='cancel'`` — caller transitions task to
        ``cancelled`` and exits without spending.
      * ``fired=True, mode='api_extended'`` — extension was granted;
        caller proceeds with the API call. ``extension_amount_usd``
        is set for token-limit enforcement.
      * ``fired=True, mode='chat'`` — slice 20 manual handoff. Caller
        should treat the task as parked; the chat-mode ingestion
        poller transitions it to ``done`` once the user submits an
        answer through the dashboard or ``/donna submit``.
      * ``fired=True, mode='claude_code'`` — slice 21 manual handoff.
        Caller treats the task as parked; the ClaudeCodePoller
        validates the user's branch and updates lifecycle state on
        success.

    ``original_prompt`` is the fully-rendered prompt the caller would
    otherwise have sent to the API. Slice 20 uses it (with the chat
    prompt builder) to produce the prompt body the user pastes into
    Claude. Without it, chat mode cannot be offered for this call —
    the gate degrades to Pause / Cancel only.

    ``originating_entity`` (slice 21) is the FK pair the
    claude_code poller uses to render ``{name}``-substituted
    target_paths globs. ``task_id`` is NULL for skill_auto_draft
    and skill_evolution call sites, so this kwarg is the only way
    the validator can identify the target.

    ``target_paths`` and ``base_sha`` snapshot the manual_escalation
    scope at gate-fire time. Both are persisted on the row so a
    config edit mid-flight cannot widen scope retroactively
    (spec §10.7 row 2) and the worktree command stays pinned to a
    specific main SHA (spec §5.3 / drift mitigation).
    """
    if not await self._is_enabled():
        return GateOutcome(
            fired=False,
            mode=None,
            resolved_by=None,
            escalation_request_id=None,
            correlation_id=None,
        )

    # Shadow posture (manual-escalation.md §4): observe only. Compute the
    # same fire decision the enforce path would, log it for threshold
    # calibration, and ALWAYS proceed — never create a row, prompt the
    # user, or block. The budget *caps* (BudgetGuard) still apply; this
    # only suppresses the interactive per-task gate.
    if self._mode == "shadow":
        threshold = self._config.triggers.task_approval_threshold_usd
        daily_remaining = await self._daily_remaining(user_id)
        effective = min(daily_remaining, threshold)
        if estimate_usd > effective:
            logger.info(
                "escalation_shadow_would_fire",
                event_type="cost.escalation_shadow",
                task_type=task_type,
                task_id=task_id,
                user_id=user_id,
                estimate_usd=round(estimate_usd, 6),
                effective_threshold=round(effective, 6),
                daily_remaining=round(daily_remaining, 6),
                task_approval_threshold=threshold,
                estimate_source=estimate_source,
            )
        return GateOutcome(
            fired=False,
            mode=None,
            resolved_by=None,
            escalation_request_id=None,
            correlation_id=None,
        )

    # De-dup: if a previous claude_code escalation for this same
    # entity is still in-flight, refuse to open a parallel race
    # (spec §10.7 / brainstorm decision §21). We only RE-DELIVER
    # the Discord notification when the prior row is still in
    # ``open`` state; re-delivering for ``resolved`` (user clicked
    # but hasn't built yet) / ``submitted`` (poller is validating)
    # / ``failed`` (user is iterating) would just spam them about
    # work they already know is in flight.
    if originating_entity is not None:
        existing = await self._repo.find_open_for_originating_entity(
            user_id=user_id,
            entity_type=originating_entity[0],
            entity_id=originating_entity[1],
        )
        if existing is not None and (
            existing.mode == "claude_code"
            or "claude_code" in existing.offered_modes
        ):
            logger.info(
                "escalation_dedup_existing_claude_code",
                correlation_id=existing.correlation_id,
                existing_status=existing.status,
                originating_entity=originating_entity,
            )
            if existing.status == "open":
                # User still hasn't seen / clicked the existing
                # ping — a fresh delivery may help.
                try:
                    await self._deliver(existing)
                except Exception:
                    logger.exception(
                        "escalation_redeliver_failed",
                        correlation_id=existing.correlation_id,
                    )
            # Don't await resolution — return as if not fired so
            # the caller falls back to ``BudgetPausedError`` /
            # paused state. Spawning a parallel awaiter would race.
            return GateOutcome(
                fired=False,
                mode=None,
                resolved_by=None,
                escalation_request_id=None,
                correlation_id=None,
            )

    daily_remaining = await self._daily_remaining(user_id)
    threshold = self._config.triggers.task_approval_threshold_usd
    if estimate_usd <= min(daily_remaining, threshold):
        return GateOutcome(
            fired=False,
            mode=None,
            resolved_by=None,
            escalation_request_id=None,
            correlation_id=None,
        )

    # Slice 23 — per-task-type override grid. ``disabled`` short-
    # circuits to Pause / Cancel only; ``force_api`` and
    # ``force_manual`` filter the offered_modes after they are built
    # so the override always honours other gates (e.g. force_manual
    # cannot offer chat for a task type without a chat config block).
    override = await self._task_type_override(task_type)

    # Build offered_modes dynamically. Pause + Cancel are always present;
    # api_extended renders when the extension config allows it and there
    # is enough daily / monthly headroom; claude_code (slice 21) renders
    # when the per-task-type config + host_repo + spec_builder line up.
    offered_modes: list[str] = []
    if override != "disabled" and await self._should_offer_extension(
        estimate_usd, user_id
    ):
        offered_modes.append("api_extended")
    # Slice 20 — per-task-type chat mode. Only offer when:
    #   1. The master kill-switch is on (already checked above).
    #   2. Modes.chat is enabled (YAML, override-able via dashboard).
    #   3. The task type declares ``manual_escalation: {mode: chat}``.
    #   4. The caller passed an ``original_prompt`` for us to render.
    chat_eligible = override != "disabled" and await self._chat_mode_eligible(
        task_type=task_type, original_prompt=original_prompt
    )
    if chat_eligible:
        offered_modes.append("chat")
    # Slice 21 — claude_code mode. Only offer when:
    #   1. The master kill-switch is on (already checked above).
    #   2. Modes.claude_code is enabled (YAML, dashboard).
    #   3. The task type declares ``manual_escalation: {mode: claude_code}``.
    #   4. The host repo + spec builder are configured (cli_wiring).
    if override != "disabled" and await self._should_offer_claude_code(task_type):
        offered_modes.append("claude_code")
        # If the gate caller didn't pre-render target_paths, do it
        # now from the per-task-type config so the row carries the
        # exact scope at fire time (spec §10.7 row 2 — config can
        # change mid-flight).
        if target_paths is None and self._task_types_config is not None:
            target_paths = self._render_target_paths(task_type)
        # Capture base_sha if we can; the gate's caller may also
        # pre-supply one. Failing-soft: missing base_sha just means
        # the spec will reference the symbolic ref instead.
        if base_sha is None and self._host_repo is not None:
            try:
                base_sha = await self._host_repo.rev_parse(
                    f"refs/heads/{self._config.modes.claude_code.base_ref}"
                )
            except Exception:
                base_sha = None

    # Slice 23 — apply ``force_api`` / ``force_manual`` filters after
    # the modes are gathered so we never offer a button whose
    # underlying preconditions failed.
    if override == "force_api":
        offered_modes = [m for m in offered_modes if m == "api_extended"]
    elif override == "force_manual":
        offered_modes = [
            m for m in offered_modes if m in ("chat", "claude_code")
        ]
    # Whether the chat-mode prompt builder should run below depends on
    # the post-override modes, not just ``chat_eligible``.
    chat_eligible = "chat" in offered_modes
    offered_modes.extend(["pause", "cancel"])

    correlation_id = str(uuid6.uuid7())
    row = await self._repo.create(
        user_id=user_id,
        correlation_id=correlation_id,
        task_id=task_id,
        task_type=task_type,
        estimate_usd=estimate_usd,
        daily_remaining_usd=daily_remaining,
        offered_modes=offered_modes,
        priority=priority,
        originating_entity=originating_entity,
        target_paths=target_paths,
        base_sha=base_sha,
    )

    # Render the chat-mode prompt + summary BEFORE the delivery
    # callback runs so the Discord notification can attach the .md
    # alongside the summary text. Best-effort: failures here are
    # logged inside the builder but do not abort the escalation —
    # the row still exists, the buttons still render, and the user
    # can fall back to Pause / Cancel.
    if (
        chat_eligible
        and self._chat_prompt_builder is not None
        and original_prompt is not None
    ):
        try:
            await self._chat_prompt_builder.build_and_persist(
                conn=self._repo._conn,
                row=row,
                original_prompt=original_prompt,
            )
            # Re-read the row so downstream consumers (delivery
            # callback) see the freshly-persisted summary + prompt
            # path without an extra round trip.
            refreshed = await self._repo.get(row.id)
            if refreshed is not None:
                row = refreshed
        except Exception:
            logger.exception(
                "escalation_chat_prompt_build_failed",
                correlation_id=correlation_id,
                escalation_request_id=row.id,
            )

    await write_escalation_event(
        self._repo._conn,
        event=EVENT_OFFERED,
        escalation_request_id=row.id,
        correlation_id=correlation_id,
        user_id=user_id,
        task_id=task_id,
        payload={
            "task_type": task_type,
            "estimate_usd": estimate_usd,
            "daily_remaining_usd": daily_remaining,
            "modes": offered_modes,
            "priority": priority,
        },
    )

    event = asyncio.Event()
    EscalationGate._events[correlation_id] = event

    try:
        delivered = await self._deliver(row)
        if delivered:
            await self._repo.mark_delivery_attempt(
                row.id, delivery_status="sent"
            )
        else:
            await self._repo.mark_delivery_attempt(
                row.id, delivery_status="failed"
            )

        await event.wait()
        resolved = await self._repo.get(row.id)
        if resolved is None or resolved.resolution is None:
            logger.warning(
                "escalation_event_set_without_resolution",
                escalation_request_id=row.id,
                correlation_id=correlation_id,
            )
            return GateOutcome(
                fired=True,
                mode="pause",
                resolved_by="timeout",
                escalation_request_id=row.id,
                correlation_id=correlation_id,
            )
        extension_amount: float | None = None
        if resolved.resolution == "api_extended":
            extension_amount = resolved.estimate_usd
        return GateOutcome(
            fired=True,
            mode=_coerce_mode(resolved.resolution),
            resolved_by=_coerce_resolved_by(resolved.resolved_by),
            escalation_request_id=row.id,
            correlation_id=correlation_id,
            extension_amount_usd=extension_amount,
        )
    finally:
        EscalationGate._events.pop(correlation_id, None)

open_tool_build_escalation async

open_tool_build_escalation(*, tool_request_id: int, tool_name: str, user_id: str, priority: int = 3, actor_id: str | None = None, proposed_signature: dict[str, Any] | None = None) -> tuple[EscalationRequestRow, RenderedSpec | None]

Open a tool_request_fulfillment escalation directly.

Bypasses :meth:fire_and_wait because tool builds have no API spend to gate — the user already chose to fulfill the request by clicking [File request] on a :class:donna.integrations.discord_views.ToolGapPingView.

Creates the escalation_request row with originating_entity=('tool_request', <id>) and offered_modes=['claude_code'], then immediately renders the spec via :meth:record_manual_handoff (status flips to resolved). Returns the row plus the rendered spec.

Parameters:

Name Type Description Default
tool_request_id int

FK back to the tool_request row.

required
tool_name str

Used as the {name} substitution into target_paths globs.

required
user_id str

Owner.

required
priority int

Inherited from the tool_request.

3
actor_id str | None

Discord ID of the clicker (logged on resolution).

None
proposed_signature dict[str, Any] | None

Optional sketch passed through the spec renderer's Jinja context.

None
Source code in src/donna/cost/escalation_gate.py
async def open_tool_build_escalation(
    self,
    *,
    tool_request_id: int,
    tool_name: str,
    user_id: str,
    priority: int = 3,
    actor_id: str | None = None,
    proposed_signature: dict[str, Any] | None = None,
) -> tuple[EscalationRequestRow, RenderedSpec | None]:
    """Open a ``tool_request_fulfillment`` escalation directly.

    Bypasses :meth:`fire_and_wait` because tool builds have no API
    spend to gate — the user already chose to fulfill the request
    by clicking ``[File request]`` on a
    :class:`donna.integrations.discord_views.ToolGapPingView`.

    Creates the ``escalation_request`` row with
    ``originating_entity=('tool_request', <id>)`` and
    ``offered_modes=['claude_code']``, then immediately renders the
    spec via :meth:`record_manual_handoff` (status flips to
    ``resolved``). Returns the row plus the rendered spec.

    Args:
        tool_request_id: FK back to the ``tool_request`` row.
        tool_name: Used as the ``{name}`` substitution into
            ``target_paths`` globs.
        user_id: Owner.
        priority: Inherited from the tool_request.
        actor_id: Discord ID of the clicker (logged on resolution).
        proposed_signature: Optional sketch passed through the
            spec renderer's Jinja context.
    """
    if not await self._is_enabled():
        logger.info(
            "tool_build_escalation_master_disabled",
            tool_request_id=tool_request_id,
        )
        return await self._abort_disabled(
            user_id=user_id,
            tool_request_id=tool_request_id,
            tool_name=tool_name,
        )
    if not await self._should_offer_claude_code("tool_request_fulfillment"):
        logger.warning(
            "tool_build_escalation_claude_code_unavailable",
            tool_request_id=tool_request_id,
        )
        return await self._abort_disabled(
            user_id=user_id,
            tool_request_id=tool_request_id,
            tool_name=tool_name,
        )

    # De-dup: refuse if an open/in-flight escalation already exists
    # for this tool_request, to mirror slice 21's skill de-dup
    # (manual-escalation.md §5.3 "De-dup").
    existing = await self._repo.find_open_for_originating_entity(
        user_id=user_id,
        entity_type="tool_request",
        entity_id=str(tool_request_id),
    )
    if existing is not None:
        logger.info(
            "tool_build_escalation_dedup_hit",
            tool_request_id=tool_request_id,
            existing_id=existing.id,
            status=existing.status,
        )
        return existing, None

    # Snapshot scope + base SHA up-front (mirrors fire_and_wait's
    # claude_code branch).
    target_paths = self._render_target_paths("tool_request_fulfillment")
    base_sha: str | None = None
    if self._host_repo is not None:
        try:
            base_sha = await self._host_repo.rev_parse(
                f"refs/heads/{self._config.modes.claude_code.base_ref}"
            )
        except Exception:
            base_sha = None

    correlation_id = str(uuid6.uuid7())
    row = await self._repo.create(
        user_id=user_id,
        correlation_id=correlation_id,
        task_id=None,
        task_type="tool_request_fulfillment",
        estimate_usd=0.0,
        daily_remaining_usd=await self._daily_remaining(user_id),
        offered_modes=["claude_code"],
        priority=priority,
        originating_entity=("tool_request", str(tool_request_id)),
        target_paths=target_paths,
        base_sha=base_sha,
    )
    await write_escalation_event(
        self._repo._conn,
        event=EVENT_OFFERED,
        escalation_request_id=row.id,
        correlation_id=correlation_id,
        user_id=user_id,
        task_id=None,
        payload={
            "task_type": "tool_request_fulfillment",
            "offered_modes": ["claude_code"],
            "tool_request_id": tool_request_id,
            "tool_name": tool_name,
            "estimate_usd": 0.0,
        },
    )

    rendered = await self.record_manual_handoff(
        correlation_id=correlation_id,
        mode="claude_code",
        capability_name=tool_name,
        actor_id=actor_id,
        task_summary=(
            f"Build new tool '{tool_name}' to unblock pending capabilities."
        ),
        acceptance_criteria=_tool_build_acceptance_criteria(
            tool_name, proposed_signature
        ),
        extra_context={
            "proposed_signature": proposed_signature,
            "requires_rebuild_default": (
                self._config.tool_gap.lint.requires_rebuild_default
            ),
            "default_timeout_seconds": (
                self._config.tool_gap.lint.default_timeout_seconds
            ),
        },
    )
    # Re-fetch so the caller sees the post-handoff status.
    refreshed = await self._repo.get(row.id)
    return refreshed or row, rendered

signal_resolution classmethod

signal_resolution(correlation_id: str) -> None

Wake any awaiter for correlation_id.

Called by the view's button handlers and by the timeout sweep in :mod:donna.notifications.escalation_delivery_loop.

Source code in src/donna/cost/escalation_gate.py
@classmethod
def signal_resolution(cls, correlation_id: str) -> None:
    """Wake any awaiter for ``correlation_id``.

    Called by the view's button handlers and by the timeout sweep
    in :mod:`donna.notifications.escalation_delivery_loop`.
    """
    event = cls._events.get(correlation_id)
    if event is not None:
        event.set()

record_user_resolution async

record_user_resolution(*, correlation_id: str, mode: EscalationMode, owner_user_id: str, task_id: str | None) -> bool

Persist a user-driven resolution and write the audit entry.

Returns True if this call mutated the row, False if it was already resolved (race with another button click or the timeout sweep).

Source code in src/donna/cost/escalation_gate.py
async def record_user_resolution(
    self,
    *,
    correlation_id: str,
    mode: EscalationMode,
    owner_user_id: str,
    task_id: str | None,
) -> bool:
    """Persist a user-driven resolution and write the audit entry.

    Returns True if this call mutated the row, False if it was
    already resolved (race with another button click or the
    timeout sweep).
    """
    row = await self._repo.get_by_correlation(correlation_id)
    if row is None:
        return False
    ok = await self._repo.resolve(
        row.id, resolution=mode, resolved_by="user"
    )
    if not ok:
        return False
    await write_escalation_event(
        self._repo._conn,
        event=EVENT_RESOLVED,
        escalation_request_id=row.id,
        correlation_id=correlation_id,
        user_id=owner_user_id,
        task_id=task_id,
        payload={"mode": mode, "resolved_by": "user"},
    )
    EscalationGate.signal_resolution(correlation_id)
    return True

grant_budget_extension async

grant_budget_extension(*, correlation_id: str, granted_by: str) -> DailyBudgetExtensionRow | None

Grant a budget extension for the given escalation.

Called by the Discord button callback BEFORE record_user_resolution so the extension row exists before the resolution event fires. The operation is idempotent: a Discord retry will find the existing row and return it unchanged without double-granting.

Returns:

Type Description
DailyBudgetExtensionRow | None

The new or existing DailyBudgetExtensionRow, or None

DailyBudgetExtensionRow | None

if the escalation row cannot be found or DB insertion fails.

Source code in src/donna/cost/escalation_gate.py
async def grant_budget_extension(
    self,
    *,
    correlation_id: str,
    granted_by: str,
) -> DailyBudgetExtensionRow | None:
    """Grant a budget extension for the given escalation.

    Called by the Discord button callback BEFORE ``record_user_resolution``
    so the extension row exists before the resolution event fires. The
    operation is idempotent: a Discord retry will find the existing row
    and return it unchanged without double-granting.

    Returns:
        The new or existing ``DailyBudgetExtensionRow``, or ``None``
        if the escalation row cannot be found or DB insertion fails.
    """
    row = await self._repo.get_by_correlation(correlation_id)
    if row is None:
        logger.warning(
            "grant_budget_extension_no_row",
            correlation_id=correlation_id,
        )
        return None

    # Guard: enforce monthly ceiling before granting.
    today = date.today()
    if not await self._monthly_headroom_ok(row.user_id, today, row.estimate_usd):
        logger.warning(
            "grant_budget_extension_monthly_ceiling",
            correlation_id=correlation_id,
            user_id=row.user_id,
        )
        return None

    extension = await self._extension_repo.grant(
        user_id=row.user_id,
        for_date=today,
        amount_usd=row.estimate_usd,
        granted_by=granted_by,
        escalation_request_id=row.id,
        now=datetime.now(tz=UTC),
    )
    if extension is None:
        return None

    # Audit log (idempotent: duplicate audit rows are acceptable on retry).
    try:
        await write_escalation_event(
            self._repo._conn,
            event=EVENT_EXTENSION_GRANTED,
            escalation_request_id=row.id,
            correlation_id=correlation_id,
            user_id=row.user_id,
            task_id=row.task_id,
            payload={
                "extension_id": extension.id,
                "amount_usd": extension.amount_usd,
                "granted_by": granted_by,
            },
        )
    except Exception:
        logger.exception(
            "grant_budget_extension_audit_failed",
            correlation_id=correlation_id,
        )
    return extension

record_manual_handoff async

record_manual_handoff(*, correlation_id: str, mode: str, capability_name: str | None = None, actor_id: str | None = None, task_summary: str | None = None, acceptance_criteria: list[str] | None = None, extra_context: dict[str, Any] | None = None) -> RenderedSpec | None

Resolve an open escalation as a manual handoff (slice 21).

Mirrors the slice 18 grant_budget_extension precedent: an idempotent helper that mutates the row BEFORE the resolution event fires so the dashboard already has data when the user follows the Discord link.

For mode='claude_code': renders the spec template, writes it to disk, mirrors into escalation_request.prompt_body, marks status='resolved' with resolution='claude_code'.

Returns the :class:RenderedSpec (so the caller can attach the file in Discord) or None on failure.

Source code in src/donna/cost/escalation_gate.py
async def record_manual_handoff(
    self,
    *,
    correlation_id: str,
    mode: str,
    capability_name: str | None = None,
    actor_id: str | None = None,
    task_summary: str | None = None,
    acceptance_criteria: list[str] | None = None,
    extra_context: dict[str, Any] | None = None,
) -> RenderedSpec | None:
    """Resolve an open escalation as a manual handoff (slice 21).

    Mirrors the slice 18 ``grant_budget_extension`` precedent: an
    idempotent helper that mutates the row BEFORE the resolution
    event fires so the dashboard already has data when the user
    follows the Discord link.

    For ``mode='claude_code'``: renders the spec template, writes
    it to disk, mirrors into ``escalation_request.prompt_body``,
    marks ``status='resolved'`` with ``resolution='claude_code'``.

    Returns the :class:`RenderedSpec` (so the caller can attach
    the file in Discord) or ``None`` on failure.
    """
    if mode != "claude_code":
        logger.warning(
            "record_manual_handoff_unsupported_mode",
            mode=mode,
        )
        return None
    if self._spec_builder is None:
        logger.warning(
            "record_manual_handoff_no_spec_builder",
            correlation_id=correlation_id,
        )
        return None

    row = await self._repo.get_by_correlation(correlation_id)
    if row is None:
        return None
    if self._task_types_config is None:
        return None
    entry = self._task_types_config.task_types.get(row.task_type)
    if entry is None or entry.manual_escalation is None:
        return None

    if capability_name is None:
        capability_name = await self._resolve_capability_name(row)
        if capability_name is None:
            logger.warning(
                "record_manual_handoff_no_capability",
                correlation_id=correlation_id,
                originating_entity_type=row.originating_entity_type,
                originating_entity_id=row.originating_entity_id,
            )
            return None

    try:
        rendered = self._spec_builder.render(
            correlation_id=correlation_id,
            task_type=row.task_type,
            capability_name=capability_name,
            manual=entry.manual_escalation,
            base_sha=row.base_sha or "HEAD",
            task_summary=task_summary or _default_summary(row),
            acceptance_criteria=acceptance_criteria or _default_acceptance(),
            extra_context=extra_context,
        )
    except Exception:
        logger.exception(
            "record_manual_handoff_render_failed",
            correlation_id=correlation_id,
        )
        return None

    await self._repo.set_manual_handoff(
        row.id,
        mode="claude_code",
        prompt_path=str(rendered.path),
        prompt_body=rendered.body,
    )
    ok = await self._repo.resolve(
        row.id, resolution="claude_code", resolved_by=actor_id or "user"
    )
    if not ok:
        # Already resolved by another click / sweep — return the
        # rendered spec anyway so the caller can still surface it.
        logger.info(
            "record_manual_handoff_already_resolved",
            correlation_id=correlation_id,
        )

    await write_escalation_event(
        self._repo._conn,
        event=EVENT_RESOLVED,
        escalation_request_id=row.id,
        correlation_id=correlation_id,
        user_id=row.user_id,
        task_id=row.task_id,
        payload={
            "mode": "claude_code",
            "resolved_by": "user",
            "spec_path": str(rendered.path),
            "branch_name": rendered.branch_name,
        },
    )
    EscalationGate.signal_resolution(correlation_id)
    return rendered

extension_filter_reason async

extension_filter_reason(*, user_id: str, estimate_usd: float) -> Literal['disabled', 'over_headroom', 'over_ceiling'] | None

Public renderer-facing accessor. See :meth:_extension_filter_reason.

Source code in src/donna/cost/escalation_gate.py
async def extension_filter_reason(
    self, *, user_id: str, estimate_usd: float
) -> Literal["disabled", "over_headroom", "over_ceiling"] | None:
    """Public renderer-facing accessor. See :meth:`_extension_filter_reason`."""
    return await self._extension_filter_reason(estimate_usd, user_id)