async def evolve_one(
self, skill_id: str, triggered_by: str,
) -> EvolutionReport:
skill = await self._fetch_skill(skill_id)
if skill is None:
return EvolutionReport(
skill_id=skill_id, outcome="skipped",
rationale="skill not found",
)
if skill["state"] != "degraded":
return EvolutionReport(
skill_id=skill_id, outcome="skipped",
rationale=f"skill state is {skill['state']!r}, not 'degraded'",
)
# Budget check.
try:
if self._budget_guard is not None:
await self._budget_guard.check_pre_call(user_id="system")
except BudgetPausedError:
return EvolutionReport(
skill_id=skill_id, outcome="budget_exhausted",
)
# Assemble input package.
try:
package = await self._input_builder.build(skill_id=skill_id)
except (LookupError, ValueError) as exc:
logger.warning(
"skill_evolution_input_failed",
skill_id=skill_id, error=str(exc),
)
return EvolutionReport(
skill_id=skill_id, outcome="error",
rationale=str(exc),
)
# Call Claude.
try:
parsed, metadata = await self._router.complete(
prompt=self._build_prompt(package),
task_type=TASK_TYPE,
task_id=None,
user_id="system",
)
except BudgetPausedError:
return EvolutionReport(
skill_id=skill_id, outcome="budget_exhausted",
)
except Exception as exc:
logger.warning(
"skill_evolution_llm_failed",
skill_id=skill_id, error=str(exc),
)
return EvolutionReport(
skill_id=skill_id, outcome="error",
rationale=f"llm call failed: {exc}",
)
invocation_id = getattr(metadata, "invocation_id", None)
cost_usd = float(getattr(metadata, "cost_usd", 0.0) or 0.0)
latency_ms = getattr(metadata, "latency_ms", None)
# Validate output shape.
required_keys = ("diagnosis", "new_skill_version", "changelog", "targeted_failure_cases")
if not (isinstance(parsed, dict) and all(k in parsed for k in required_keys)):
await self._log_repo.record(
skill_id=skill_id, from_version_id=skill["current_version_id"],
to_version_id=None, triggered_by=triggered_by,
claude_invocation_id=invocation_id,
diagnosis=None, targeted_case_ids=None,
validation_results={"malformed_output": True},
outcome="rejected_validation",
)
await self._maybe_demote_after_failure(skill_id)
return EvolutionReport(
skill_id=skill_id, outcome="rejected_validation",
rationale="malformed llm output",
cost_usd=cost_usd, latency_ms=latency_ms,
)
new_version = parsed["new_skill_version"]
targeted = parsed["targeted_failure_cases"] or []
diagnosis = parsed.get("diagnosis")
# Run the four gates.
executor = self._executor_factory()
gates = EvolutionGates(self._conn, self._config, executor)
gate_results: dict[str, GateResult] = {}
structural = run_structural_gate(new_version)
gate_results["structural"] = structural
if not structural.passed:
return await self._record_rejection(
skill_id=skill_id, from_version_id=skill["current_version_id"],
triggered_by=triggered_by, invocation_id=invocation_id,
diagnosis=diagnosis, targeted=targeted,
gate_results=gate_results,
rationale=f"structural gate failed: {structural.failure_reason}",
cost_usd=cost_usd, latency_ms=latency_ms,
)
targeted_result = await gates.run_targeted_case_gate(
new_version=new_version, skill_id=skill_id,
targeted_case_ids=targeted,
)
gate_results["targeted"] = targeted_result
if not targeted_result.passed:
return await self._record_rejection(
skill_id=skill_id, from_version_id=skill["current_version_id"],
triggered_by=triggered_by, invocation_id=invocation_id,
diagnosis=diagnosis, targeted=targeted,
gate_results=gate_results,
rationale="targeted case gate failed",
cost_usd=cost_usd, latency_ms=latency_ms,
)
fixture_result = await gates.run_fixture_regression_gate(
new_version=new_version, skill_id=skill_id,
)
gate_results["fixture_regression"] = fixture_result
if not fixture_result.passed:
return await self._record_rejection(
skill_id=skill_id, from_version_id=skill["current_version_id"],
triggered_by=triggered_by, invocation_id=invocation_id,
diagnosis=diagnosis, targeted=targeted,
gate_results=gate_results,
rationale="fixture regression gate failed",
cost_usd=cost_usd, latency_ms=latency_ms,
)
recent_result = await gates.run_recent_success_gate(
new_version=new_version, skill_id=skill_id,
)
gate_results["recent_success"] = recent_result
if not recent_result.passed:
return await self._record_rejection(
skill_id=skill_id, from_version_id=skill["current_version_id"],
triggered_by=triggered_by, invocation_id=invocation_id,
diagnosis=diagnosis, targeted=targeted,
gate_results=gate_results,
rationale="recent success gate failed",
cost_usd=cost_usd, latency_ms=latency_ms,
)
# All gates passed: persist new version + transition.
new_version_id = await self._persist_new_version(
skill_id=skill_id,
current_version_id=skill["current_version_id"],
new_version=new_version,
changelog=parsed.get("changelog", ""),
)
# Destination state: sandbox unless requires_human_gate → draft.
to_state = (
SkillState.DRAFT if skill["requires_human_gate"]
else SkillState.SANDBOX
)
# Two-hop: degraded → draft (evolution creates a draft),
# then (if not requires_human_gate) draft → sandbox human_approval.
# But spec says degraded → draft with reason=gate_passed.
await self._lifecycle.transition(
skill_id=skill_id, to_state=SkillState.DRAFT,
reason="gate_passed", actor="system",
notes=f"evolution {new_version_id}",
)
if to_state == SkillState.SANDBOX:
# For non-gated skills, also flip draft → sandbox.
# draft → sandbox requires human_approval in the table.
# For automated evolution path, we accept the skill staying in draft.
with contextlib.suppress(IllegalTransitionError):
await self._lifecycle.transition(
skill_id=skill_id, to_state=SkillState.SANDBOX,
reason="gate_passed", actor="system",
notes=f"evolution {new_version_id}",
)
await self._log_repo.record(
skill_id=skill_id,
from_version_id=skill["current_version_id"],
to_version_id=new_version_id,
triggered_by=triggered_by,
claude_invocation_id=invocation_id,
diagnosis=diagnosis,
targeted_case_ids=targeted,
validation_results={
name: {"passed": g.passed, **g.details}
for name, g in gate_results.items()
},
outcome="success",
)
return EvolutionReport(
skill_id=skill_id, outcome="success",
new_version_id=new_version_id,
rationale="all 4 gates passed",
cost_usd=cost_usd, latency_ms=latency_ms,
)