Skip to content

donna.skills.evolution

donna.skills.evolution

Evolver — single-skill evolution attempt orchestrator.

Spec §6.6. Assembles the input package, calls Claude, parses output, runs four validation gates, persists or rejects.

logger module-attribute

logger = get_logger()

TASK_TYPE module-attribute

TASK_TYPE = 'skill_evolution'

EvolutionReport dataclass

EvolutionReport(skill_id: str, outcome: str, new_version_id: str | None = None, rationale: str | None = None, cost_usd: float = 0.0, latency_ms: int | None = None)

skill_id instance-attribute

skill_id: str

outcome instance-attribute

outcome: str

new_version_id class-attribute instance-attribute

new_version_id: str | None = None

rationale class-attribute instance-attribute

rationale: str | None = None

cost_usd class-attribute instance-attribute

cost_usd: float = 0.0

latency_ms class-attribute instance-attribute

latency_ms: int | None = None

Evolver

Evolver(connection: Connection, model_router: Any, budget_guard: Any, lifecycle_manager: SkillLifecycleManager, config: SkillSystemConfig, executor_factory: Callable[[], Any])
Source code in src/donna/skills/evolution.py
def __init__(
    self,
    connection: aiosqlite.Connection,
    model_router: Any,
    budget_guard: Any,
    lifecycle_manager: SkillLifecycleManager,
    config: SkillSystemConfig,
    executor_factory: Callable[[], Any],
) -> None:
    self._conn = connection
    self._router = model_router
    self._budget_guard = budget_guard
    self._lifecycle = lifecycle_manager
    self._config = config
    self._executor_factory = executor_factory
    self._input_builder = EvolutionInputBuilder(connection, config)
    self._log_repo = SkillEvolutionLogRepository(connection)

evolve_one async

evolve_one(skill_id: str, triggered_by: str) -> EvolutionReport
Source code in src/donna/skills/evolution.py
async def evolve_one(
    self, skill_id: str, triggered_by: str,
) -> EvolutionReport:
    skill = await self._fetch_skill(skill_id)
    if skill is None:
        return EvolutionReport(
            skill_id=skill_id, outcome="skipped",
            rationale="skill not found",
        )
    if skill["state"] != "degraded":
        return EvolutionReport(
            skill_id=skill_id, outcome="skipped",
            rationale=f"skill state is {skill['state']!r}, not 'degraded'",
        )

    # Budget check.
    try:
        if self._budget_guard is not None:
            await self._budget_guard.check_pre_call(user_id="system")
    except BudgetPausedError:
        return EvolutionReport(
            skill_id=skill_id, outcome="budget_exhausted",
        )

    # Assemble input package.
    try:
        package = await self._input_builder.build(skill_id=skill_id)
    except (LookupError, ValueError) as exc:
        logger.warning(
            "skill_evolution_input_failed",
            skill_id=skill_id, error=str(exc),
        )
        return EvolutionReport(
            skill_id=skill_id, outcome="error",
            rationale=str(exc),
        )

    # Call Claude.
    try:
        parsed, metadata = await self._router.complete(
            prompt=self._build_prompt(package),
            task_type=TASK_TYPE,
            task_id=None,
            user_id="system",
        )
    except BudgetPausedError:
        return EvolutionReport(
            skill_id=skill_id, outcome="budget_exhausted",
        )
    except Exception as exc:
        logger.warning(
            "skill_evolution_llm_failed",
            skill_id=skill_id, error=str(exc),
        )
        return EvolutionReport(
            skill_id=skill_id, outcome="error",
            rationale=f"llm call failed: {exc}",
        )

    invocation_id = getattr(metadata, "invocation_id", None)
    cost_usd = float(getattr(metadata, "cost_usd", 0.0) or 0.0)
    latency_ms = getattr(metadata, "latency_ms", None)

    # Validate output shape.
    required_keys = ("diagnosis", "new_skill_version", "changelog", "targeted_failure_cases")
    if not (isinstance(parsed, dict) and all(k in parsed for k in required_keys)):
        await self._log_repo.record(
            skill_id=skill_id, from_version_id=skill["current_version_id"],
            to_version_id=None, triggered_by=triggered_by,
            claude_invocation_id=invocation_id,
            diagnosis=None, targeted_case_ids=None,
            validation_results={"malformed_output": True},
            outcome="rejected_validation",
        )
        await self._maybe_demote_after_failure(skill_id)
        return EvolutionReport(
            skill_id=skill_id, outcome="rejected_validation",
            rationale="malformed llm output",
            cost_usd=cost_usd, latency_ms=latency_ms,
        )

    new_version = parsed["new_skill_version"]
    targeted = parsed["targeted_failure_cases"] or []
    diagnosis = parsed.get("diagnosis")

    # Run the four gates.
    executor = self._executor_factory()
    gates = EvolutionGates(self._conn, self._config, executor)

    gate_results: dict[str, GateResult] = {}
    structural = run_structural_gate(new_version)
    gate_results["structural"] = structural
    if not structural.passed:
        return await self._record_rejection(
            skill_id=skill_id, from_version_id=skill["current_version_id"],
            triggered_by=triggered_by, invocation_id=invocation_id,
            diagnosis=diagnosis, targeted=targeted,
            gate_results=gate_results,
            rationale=f"structural gate failed: {structural.failure_reason}",
            cost_usd=cost_usd, latency_ms=latency_ms,
        )

    targeted_result = await gates.run_targeted_case_gate(
        new_version=new_version, skill_id=skill_id,
        targeted_case_ids=targeted,
    )
    gate_results["targeted"] = targeted_result
    if not targeted_result.passed:
        return await self._record_rejection(
            skill_id=skill_id, from_version_id=skill["current_version_id"],
            triggered_by=triggered_by, invocation_id=invocation_id,
            diagnosis=diagnosis, targeted=targeted,
            gate_results=gate_results,
            rationale="targeted case gate failed",
            cost_usd=cost_usd, latency_ms=latency_ms,
        )

    fixture_result = await gates.run_fixture_regression_gate(
        new_version=new_version, skill_id=skill_id,
    )
    gate_results["fixture_regression"] = fixture_result
    if not fixture_result.passed:
        return await self._record_rejection(
            skill_id=skill_id, from_version_id=skill["current_version_id"],
            triggered_by=triggered_by, invocation_id=invocation_id,
            diagnosis=diagnosis, targeted=targeted,
            gate_results=gate_results,
            rationale="fixture regression gate failed",
            cost_usd=cost_usd, latency_ms=latency_ms,
        )

    recent_result = await gates.run_recent_success_gate(
        new_version=new_version, skill_id=skill_id,
    )
    gate_results["recent_success"] = recent_result
    if not recent_result.passed:
        return await self._record_rejection(
            skill_id=skill_id, from_version_id=skill["current_version_id"],
            triggered_by=triggered_by, invocation_id=invocation_id,
            diagnosis=diagnosis, targeted=targeted,
            gate_results=gate_results,
            rationale="recent success gate failed",
            cost_usd=cost_usd, latency_ms=latency_ms,
        )

    # All gates passed: persist new version + transition.
    new_version_id = await self._persist_new_version(
        skill_id=skill_id,
        current_version_id=skill["current_version_id"],
        new_version=new_version,
        changelog=parsed.get("changelog", ""),
    )

    # Destination state: sandbox unless requires_human_gate → draft.
    to_state = (
        SkillState.DRAFT if skill["requires_human_gate"]
        else SkillState.SANDBOX
    )

    # Two-hop: degraded → draft (evolution creates a draft),
    # then (if not requires_human_gate) draft → sandbox human_approval.
    # But spec says degraded → draft with reason=gate_passed.
    await self._lifecycle.transition(
        skill_id=skill_id, to_state=SkillState.DRAFT,
        reason="gate_passed", actor="system",
        notes=f"evolution {new_version_id}",
    )
    if to_state == SkillState.SANDBOX:
        # For non-gated skills, also flip draft → sandbox.
        # draft → sandbox requires human_approval in the table.
        # For automated evolution path, we accept the skill staying in draft.
        with contextlib.suppress(IllegalTransitionError):
            await self._lifecycle.transition(
                skill_id=skill_id, to_state=SkillState.SANDBOX,
                reason="gate_passed", actor="system",
                notes=f"evolution {new_version_id}",
            )

    await self._log_repo.record(
        skill_id=skill_id,
        from_version_id=skill["current_version_id"],
        to_version_id=new_version_id,
        triggered_by=triggered_by,
        claude_invocation_id=invocation_id,
        diagnosis=diagnosis,
        targeted_case_ids=targeted,
        validation_results={
            name: {"passed": g.passed, **g.details}
            for name, g in gate_results.items()
        },
        outcome="success",
    )

    return EvolutionReport(
        skill_id=skill_id, outcome="success",
        new_version_id=new_version_id,
        rationale="all 4 gates passed",
        cost_usd=cost_usd, latency_ms=latency_ms,
    )