Skip to content

donna.cost.tool_lint

donna.cost.tool_lint

Lint pipeline for tool_request_fulfillment builds (slice 22).

Realizes docs/superpowers/specs/manual-escalation.md §10.5 — the extra checks tool builds get on top of the slice-21 claude_code protocol.

Pipeline (executed in order):

  1. AST parse each .py source under the diff scope.
  2. Per-file rules:

  3. :func:donna.cost.tool_lint.anthropic_import.check_anthropic_import (§10.5 row 3)

  4. :func:donna.cost.tool_lint.import_io.check_import_time_io (§10.5 row 5)
  5. :func:donna.cost.tool_lint.secrets.scan_for_secrets (§10.5 row 2)
  6. :func:donna.cost.tool_lint.metadata.check_tool_metadata (§10.5 rows 1 + 6)
  7. Whole-diff rules:

  8. :func:donna.cost.tool_lint.allowlist.check_allowlist_update (§10.5 row 4)

  9. :func:donna.cost.tool_lint.inert_test.check_inert_at_import_test (§10.5 row 5)
  10. Optional execution gate:

  11. :func:donna.cost.tool_lint.import_smoke.run_import_smoke (validation step — runs after lint passes).

Failures stop validation; warnings (e.g. requires_rebuild=True) flow through to the dashboard panel.

logger module-attribute

logger = get_logger()

LintFailure dataclass

LintFailure(rule: str, path: str, line: int | None, message: str)

One lint violation.

rule matches the §10.5 row name (secrets, anthropic_import, import_io, allowlist, metadata, inert_test, syntax, requires_rebuild_warning).

rule instance-attribute

rule: str

path instance-attribute

path: str

line instance-attribute

line: int | None

message instance-attribute

message: str

LintResult dataclass

LintResult(failures: list[LintFailure] = list(), warnings: list[LintFailure] = list())

failures class-attribute instance-attribute

failures: list[LintFailure] = field(default_factory=list)

warnings class-attribute instance-attribute

warnings: list[LintFailure] = field(default_factory=list)

passed property

passed: bool

ToolLintConfig dataclass

ToolLintConfig(detect_secrets_enabled: bool = False, requires_rebuild_default: bool = False, default_timeout_seconds: int = 5)

detect_secrets_enabled class-attribute instance-attribute

detect_secrets_enabled: bool = False

requires_rebuild_default class-attribute instance-attribute

requires_rebuild_default: bool = False

default_timeout_seconds class-attribute instance-attribute

default_timeout_seconds: int = 5

check_allowlist_update

check_allowlist_update(diff_paths: list[str], source_text_by_path: dict[str, str], tool_name: str) -> list[LintFailure]

Verify tool is allowlisted somewhere or marked unallowlisted=True.

Source code in src/donna/cost/tool_lint/allowlist.py
def check_allowlist_update(
    diff_paths: list[str],
    source_text_by_path: dict[str, str],
    tool_name: str,
) -> list[LintFailure]:
    """Verify tool is allowlisted somewhere or marked ``unallowlisted=True``."""
    if _has_unallowlisted_marker(source_text_by_path, tool_name):
        return []

    touched_allowlists = [
        p for p in diff_paths if any(p.endswith(name) for name in ALLOWLIST_PATHS)
    ]
    if not touched_allowlists:
        return [
            LintFailure(
                rule="allowlist",
                path=", ".join(ALLOWLIST_PATHS),
                line=None,
                message=(
                    f"tool '{tool_name}' is not added to any allowlist. "
                    "Update one of "
                    f"{', '.join(ALLOWLIST_PATHS)}, or set "
                    "`unallowlisted = True` at the top of the tool module"
                ),
            )
        ]

    for path in touched_allowlists:
        text = source_text_by_path.get(path, "")
        if _allowlist_mentions_tool(text, tool_name):
            return []

    return [
        LintFailure(
            rule="allowlist",
            path=", ".join(touched_allowlists),
            line=None,
            message=(
                f"tool '{tool_name}' not found in any of the modified "
                "allowlist files near a `tools:` key"
            ),
        )
    ]

check_anthropic_import

check_anthropic_import(tree: AST, path: str) -> list[LintFailure]

Walk tree and reject any anthropic[.…] import.

Returns one :class:LintFailure per offending statement.

Source code in src/donna/cost/tool_lint/anthropic_import.py
def check_anthropic_import(tree: ast.AST, path: str) -> list[LintFailure]:
    """Walk ``tree`` and reject any ``anthropic[.…]`` import.

    Returns one :class:`LintFailure` per offending statement.
    """
    if _is_allowed(path):
        return []

    failures: list[LintFailure] = []
    for node in ast.walk(tree):
        if isinstance(node, ast.Import):
            for alias in node.names:
                if alias.name == "anthropic" or alias.name.startswith("anthropic."):
                    failures.append(
                        LintFailure(
                            rule="anthropic_import",
                            path=path,
                            line=node.lineno,
                            message=(
                                f"`import {alias.name}` outside "
                                f"src/donna/llm/ — route through donna.llm "
                                "gateway"
                            ),
                        )
                    )
        elif isinstance(node, ast.ImportFrom):
            module = node.module or ""
            if module == "anthropic" or module.startswith("anthropic."):
                failures.append(
                    LintFailure(
                        rule="anthropic_import",
                        path=path,
                        line=node.lineno,
                        message=(
                            f"`from {module} import …` outside "
                            f"src/donna/llm/ — route through donna.llm "
                            "gateway"
                        ),
                    )
                )
    return failures

check_import_time_io

check_import_time_io(tree: AST, path: str) -> list[LintFailure]

Reject top-level network/disk I/O in tool source files.

Source code in src/donna/cost/tool_lint/import_io.py
def check_import_time_io(tree: ast.AST, path: str) -> list[LintFailure]:
    """Reject top-level network/disk I/O in tool source files."""
    failures: list[LintFailure] = []
    for call in _module_body_calls(tree):
        flagged = _flag_call(call)
        if flagged is None:
            continue
        failures.append(
            LintFailure(
                rule="import_io",
                path=path,
                line=call.lineno,
                message=(
                    f"module-level I/O `{flagged}` — wrap in a function "
                    "and call from a deferred entrypoint"
                ),
            )
        )
    return failures

check_inert_at_import_test

check_inert_at_import_test(diff_paths: list[str], source_text_by_path: dict[str, str], tool_name: str) -> list[LintFailure]
Source code in src/donna/cost/tool_lint/inert_test.py
def check_inert_at_import_test(
    diff_paths: list[str],
    source_text_by_path: dict[str, str],
    tool_name: str,
) -> list[LintFailure]:
    expected_path = _expected_test_path(tool_name)
    expected_arg = _expected_module_arg(tool_name)

    matched_path: str | None = None
    for path in diff_paths:
        if path.endswith(expected_path):
            matched_path = path
            break
    if matched_path is None:
        return [
            LintFailure(
                rule="inert_test",
                path=expected_path,
                line=None,
                message=(
                    f"missing required test file `{expected_path}` — "
                    "must call `is_inert_at_import('"
                    f"{expected_arg}')` (§10.5 row 5)"
                ),
            )
        ]

    text = source_text_by_path.get(matched_path, "")
    try:
        tree = ast.parse(text, filename=matched_path)
    except SyntaxError as exc:
        return [
            LintFailure(
                rule="inert_test:syntax",
                path=matched_path,
                line=exc.lineno,
                message=str(exc),
            )
        ]

    if not _has_inert_call(tree, expected_arg):
        return [
            LintFailure(
                rule="inert_test",
                path=matched_path,
                line=None,
                message=(
                    f"`{matched_path}` does not call "
                    f"`is_inert_at_import('{expected_arg}')`"
                ),
            )
        ]
    return []

check_tool_metadata

check_tool_metadata(*, source_text_by_path: dict[str, str], tool_name: str) -> MetadataResult

Verify tool source declares the required metadata fields.

Looks for src/donna/skills/tools/<tool_name>.py (or donna/skills/tools/<tool_name>.py if the diff is rooted differently — picked by suffix match).

Source code in src/donna/cost/tool_lint/metadata.py
def check_tool_metadata(
    *,
    source_text_by_path: dict[str, str],
    tool_name: str,
) -> MetadataResult:
    """Verify tool source declares the required metadata fields.

    Looks for ``src/donna/skills/tools/<tool_name>.py`` (or
    ``donna/skills/tools/<tool_name>.py`` if the diff is rooted
    differently — picked by suffix match).
    """
    suffix = f"donna/skills/tools/{tool_name}.py"
    target_path: str | None = None
    target_text: str | None = None
    for path, text in source_text_by_path.items():
        if path.endswith(suffix):
            target_path = path
            target_text = text
            break

    if target_text is None or target_path is None:
        return MetadataResult(
            failures=[
                LintFailure(
                    rule="metadata:missing_module",
                    path=suffix,
                    line=None,
                    message=(
                        f"tool source `{suffix}` not present in branch "
                        f"diff — required for tool '{tool_name}'"
                    ),
                )
            ]
        )

    try:
        tree = ast.parse(target_text, filename=target_path)
    except SyntaxError as exc:
        return MetadataResult(
            failures=[
                LintFailure(
                    rule="metadata:syntax",
                    path=target_path,
                    line=exc.lineno,
                    message=str(exc),
                )
            ]
        )

    failures: list[LintFailure] = []
    warnings: list[LintFailure] = []
    assigns = _module_level_assigns(tree)

    rb = assigns.get("requires_rebuild")
    if rb is None:
        failures.append(
            LintFailure(
                rule="metadata:requires_rebuild",
                path=target_path,
                line=None,
                message=(
                    "missing module-level `requires_rebuild = <bool>` — "
                    "required by §10.5 row 1"
                ),
            )
        )
    else:
        rb_value = _const(rb.value)
        if not isinstance(rb_value, bool):
            failures.append(
                LintFailure(
                    rule="metadata:requires_rebuild_type",
                    path=target_path,
                    line=rb.lineno,
                    message=(
                        f"`requires_rebuild` must be a bool literal "
                        f"(got {type(rb_value).__name__ if rb_value is not None else 'None'})"
                    ),
                )
            )
        elif rb_value is True:
            warnings.append(
                LintFailure(
                    rule="requires_rebuild_warning",
                    path=target_path,
                    line=rb.lineno,
                    message=(
                        "`requires_rebuild = True` — registry will refuse "
                        "to mark this tool active until orchestrator "
                        "restart with new build SHA"
                    ),
                )
            )

    to = assigns.get("default_timeout_seconds")
    if to is None:
        failures.append(
            LintFailure(
                rule="metadata:default_timeout",
                path=target_path,
                line=None,
                message=(
                    "missing module-level `default_timeout_seconds = <int>` "
                    "— required by §10.5 row 6"
                ),
            )
        )
    else:
        to_value = _const(to.value)
        if not isinstance(to_value, (int, float)) or isinstance(to_value, bool):
            failures.append(
                LintFailure(
                    rule="metadata:default_timeout_type",
                    path=target_path,
                    line=to.lineno,
                    message=(
                        "`default_timeout_seconds` must be a numeric literal"
                    ),
                )
            )
        elif to_value <= 0:
            failures.append(
                LintFailure(
                    rule="metadata:default_timeout_value",
                    path=target_path,
                    line=to.lineno,
                    message="`default_timeout_seconds` must be positive",
                )
            )

    return MetadataResult(failures=failures, warnings=warnings)

scan_for_secrets

scan_for_secrets(text: str, path: str, *, detect_secrets_enabled: bool = False) -> list[LintFailure]

Scan text for hardcoded credentials.

Parameters:

Name Type Description Default
text str

File source.

required
path str

Logical path (for the failure record).

required
detect_secrets_enabled bool

If True and the detect-secrets package is importable, also run the library scanner.

False
Source code in src/donna/cost/tool_lint/secrets.py
def scan_for_secrets(
    text: str,
    path: str,
    *,
    detect_secrets_enabled: bool = False,
) -> list[LintFailure]:
    """Scan ``text`` for hardcoded credentials.

    Args:
        text: File source.
        path: Logical path (for the failure record).
        detect_secrets_enabled: If True and the ``detect-secrets``
            package is importable, also run the library scanner.
    """
    failures: list[LintFailure] = []
    failures.extend(_scan_provider_tokens(text, path))
    failures.extend(_scan_vault_naming(text, path))
    if detect_secrets_enabled:
        failures.extend(_scan_with_detect_secrets(text, path))
    return failures

lint_tool_branch async

lint_tool_branch(*, branch: str, diff_paths: list[str], tool_name: str, source_text_by_path: dict[str, str], config: ToolLintConfig | None = None) -> LintResult

Run every §10.5 lint rule against a tool-build branch.

Parameters:

Name Type Description Default
branch str

Branch name (used only for logging).

required
diff_paths list[str]

Paths the user touched, scope-validated by :class:donna.cost.diff_validator.DiffValidator.

required
tool_name str

The tool being built (matches {name} substitution from task_types.yaml target_paths).

required
source_text_by_path dict[str, str]

Pre-fetched committed source for every path in diff_paths (caller uses host_repo.show_file).

required
config ToolLintConfig | None

Tunables (detect-secrets opt-in flag, defaults).

None

Returns:

Type Description
LintResult

class:LintResult with per-rule failures + warnings.

Source code in src/donna/cost/tool_lint/__init__.py
async def lint_tool_branch(
    *,
    branch: str,
    diff_paths: list[str],
    tool_name: str,
    source_text_by_path: dict[str, str],
    config: ToolLintConfig | None = None,
) -> LintResult:
    """Run every §10.5 lint rule against a tool-build branch.

    Args:
        branch: Branch name (used only for logging).
        diff_paths: Paths the user touched, scope-validated by
            :class:`donna.cost.diff_validator.DiffValidator`.
        tool_name: The tool being built (matches ``{name}``
            substitution from task_types.yaml ``target_paths``).
        source_text_by_path: Pre-fetched committed source for every
            path in ``diff_paths`` (caller uses ``host_repo.show_file``).
        config: Tunables (``detect-secrets`` opt-in flag, defaults).

    Returns:
        :class:`LintResult` with per-rule failures + warnings.
    """
    cfg = config or ToolLintConfig()
    failures: list[LintFailure] = []
    warnings: list[LintFailure] = []

    for path, text in source_text_by_path.items():
        if not path.endswith(".py"):
            continue
        try:
            tree = ast.parse(text, filename=path)
        except SyntaxError as exc:
            failures.append(
                LintFailure(
                    rule="syntax",
                    path=path,
                    line=exc.lineno,
                    message=f"could not parse {path}: {exc.msg}",
                )
            )
            continue
        failures.extend(check_anthropic_import(tree, path))
        failures.extend(check_import_time_io(tree, path))
        secret_failures = scan_for_secrets(
            text, path, detect_secrets_enabled=cfg.detect_secrets_enabled
        )
        failures.extend(secret_failures)

    # Tool metadata + dependent diff-wide rules
    metadata_results = check_tool_metadata(
        source_text_by_path=source_text_by_path,
        tool_name=tool_name,
    )
    failures.extend(metadata_results.failures)
    warnings.extend(metadata_results.warnings)

    failures.extend(check_allowlist_update(diff_paths, source_text_by_path, tool_name))
    failures.extend(
        check_inert_at_import_test(diff_paths, source_text_by_path, tool_name)
    )

    logger.info(
        "tool_lint_completed",
        branch=branch,
        tool_name=tool_name,
        failure_count=len(failures),
        warning_count=len(warnings),
    )
    return LintResult(failures=failures, warnings=warnings)