Source code for fermilink.cli.commands.packages

from __future__ import annotations

import argparse
import json
import re
import shutil
from datetime import datetime, timezone
from pathlib import Path
from urllib.parse import urlparse


AUTO_COMPILE_METADATA_TAG = "auto_compile_metadata"
AUTO_COMPILE_METADATA_TOKEN_RE = re.compile(
    rf"<{AUTO_COMPILE_METADATA_TAG}>(.*?)</{AUTO_COMPILE_METADATA_TAG}>",
    re.IGNORECASE | re.DOTALL,
)
GITHUB_OWNER_TOKEN_RE = re.compile(r"^[A-Za-z0-9](?:[A-Za-z0-9-]{0,38})$")
AUTO_COMPILE_COMMIT_TEMPLATE = "Add FermiLink skills for {package_id}"
ROUTER_KEYWORD_NOISE_TERMS = {
    "backend",
    "conda",
    "conda-forge",
    "cython",
    "cython backend",
    "numpy",
    "numpy scipy",
    "pip",
    "scipy",
}
ROUTER_NEGATIVE_NONSCIENTIFIC_TERMS = {
    "blockchain",
    "computer vision",
    "devops",
    "mobile app",
    "natural language processing",
    "relational database",
    "web frontend",
}


def _cli():
    from fermilink import cli

    return cli


def _save_curated_install_metadata(
    scipkg_root: Path,
    package_id: str,
    *,
    channel: str,
    curated_package_id: str,
    version_id: str,
    source_archive_url: str,
    verified: bool,
    source_ref_type: str | None,
    source_ref_value: str | None,
) -> None:
    cli = _cli()
    normalized_id = cli.normalize_package_id(package_id)
    registry = cli.load_registry(scipkg_root)
    packages = registry.get("packages")
    if not isinstance(packages, dict):
        return

    meta = packages.get(normalized_id)
    if not isinstance(meta, dict):
        return

    updated = dict(meta)
    updated["curated"] = {
        "channel": channel,
        "package_id": curated_package_id,
        "version_id": version_id,
        "source_archive_url": source_archive_url,
        "verified": verified,
        "source_ref": {
            "type": source_ref_type,
            "value": source_ref_value,
        },
    }
    packages[normalized_id] = updated
    cli.save_registry(scipkg_root, registry)


def _utc_now_z() -> str:
    return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")


def _build_compile_run_id(prefix: str) -> str:
    raw = _utc_now_z().replace("-", "").replace(":", "").replace(".", "")
    token = re.sub(r"[^A-Za-z0-9]+", "", raw).strip() or "run"
    return f"{prefix}_{token}"


def _read_json_object(path: Path) -> dict[str, object]:
    cli = _cli()
    try:
        payload = json.loads(path.read_text(encoding="utf-8"))
    except OSError as exc:
        raise cli.PackageError(f"Failed to read {path}: {exc}") from exc
    except json.JSONDecodeError as exc:
        raise cli.PackageError(f"Invalid JSON in {path}: {exc}") from exc
    if not isinstance(payload, dict):
        raise cli.PackageError(f"Expected JSON object in {path}.")
    return payload


def _write_json_atomic(path: Path, payload: dict[str, object]) -> None:
    cli = _cli()
    path.parent.mkdir(parents=True, exist_ok=True)
    temp_path = path.with_suffix(path.suffix + ".tmp")
    try:
        temp_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
        temp_path.replace(path)
    except OSError as exc:
        raise cli.PackageError(f"Failed to write JSON file {path}: {exc}") from exc


def _run_external_command(
    command: list[str],
    *,
    cwd: Path | None = None,
    check: bool = True,
) -> object:
    cli = _cli()
    try:
        completed = cli.subprocess.run(
            command,
            cwd=str(cwd) if isinstance(cwd, Path) else None,
            capture_output=True,
            text=True,
            check=False,
        )
    except FileNotFoundError as exc:
        raise cli.PackageError(f"Command not found: {command[0]}") from exc

    if check and completed.returncode != 0:
        stderr = (completed.stderr or "").strip()
        stdout = (completed.stdout or "").strip()
        detail = stderr or stdout or f"exit code {completed.returncode}"
        rendered = " ".join(command)
        raise cli.PackageError(f"Command failed ({rendered}): {detail}")
    return completed


def _normalize_github_repo_url(url: str) -> tuple[str, str, str]:
    cli = _cli()
    cleaned = str(url or "").strip()
    if not cleaned:
        raise cli.PackageError("upstream_repo_url is required.")

    owner = ""
    repo = ""
    if cleaned.startswith("git@github.com:"):
        suffix = cleaned.split(":", 1)[1]
        parts = [part for part in suffix.split("/") if part]
        if len(parts) >= 2:
            owner = parts[0].strip()
            repo = parts[1].strip()
    else:
        parsed = urlparse(cleaned)
        host = (parsed.netloc or "").lower()
        if host not in {"github.com", "www.github.com"}:
            raise cli.PackageError(f"Only GitHub repo URLs are supported: {cleaned}")
        path_parts = [part for part in parsed.path.split("/") if part]
        if len(path_parts) >= 2:
            owner = path_parts[0].strip()
            repo = path_parts[1].strip()

    if repo.endswith(".git"):
        repo = repo[:-4]
    if not owner or not repo:
        raise cli.PackageError(f"Invalid GitHub repository URL: {cleaned}")

    canonical = f"https://github.com/{owner}/{repo}"
    return owner, repo, canonical


def _load_auto_compile_specs(
    *,
    package_id_arg: str | None,
    upstream_repo_url_arg: str | None,
    spec_file_arg: str | None,
) -> list[dict[str, str]]:
    cli = _cli()
    package_id_raw = str(package_id_arg or "").strip()
    upstream_raw = str(upstream_repo_url_arg or "").strip()
    spec_raw = str(spec_file_arg or "").strip()

    if spec_raw and (package_id_raw or upstream_raw):
        raise cli.PackageError(
            "Use either positional package_id/upstream_repo_url or --spec-file, not both."
        )

    specs: list[dict[str, str]] = []
    if spec_raw:
        spec_path = Path(spec_raw).expanduser().resolve()
        payload = _read_json_object(spec_path)
        packages_raw = payload.get("packages")
        if not isinstance(packages_raw, list) or not packages_raw:
            raise cli.PackageError(
                f"--spec-file must include non-empty packages[]: {spec_path}"
            )
        for index, item in enumerate(packages_raw, start=1):
            if not isinstance(item, dict):
                raise cli.PackageError(
                    f"Invalid packages[{index}] in {spec_path}: expected object."
                )
            raw_id = str(item.get("package_id") or "").strip()
            raw_url = str(item.get("upstream_repo_url") or "").strip()
            if not raw_id or not raw_url:
                raise cli.PackageError(
                    f"Invalid packages[{index}] in {spec_path}: package_id and upstream_repo_url are required."
                )
            _, _, canonical = _normalize_github_repo_url(raw_url)
            specs.append(
                {
                    "package_id": cli.normalize_package_id(raw_id),
                    "upstream_repo_url": canonical,
                }
            )
    else:
        if not package_id_raw or not upstream_raw:
            raise cli.PackageError(
                "Provide <package_id> <upstream_repo_url> or --spec-file."
            )
        _, _, canonical = _normalize_github_repo_url(upstream_raw)
        specs.append(
            {
                "package_id": cli.normalize_package_id(package_id_raw),
                "upstream_repo_url": canonical,
            }
        )

    deduped: list[dict[str, str]] = []
    seen: set[str] = set()
    for spec in specs:
        package_id = spec["package_id"]
        if package_id in seen:
            raise cli.PackageError(
                f"Duplicate package_id in auto-compile input: {package_id}"
            )
        seen.add(package_id)
        deduped.append(spec)
    return deduped


def _ensure_required_commands_available(*, command_names: tuple[str, ...]) -> None:
    cli = _cli()
    missing = [
        name for name in command_names if not cli.shutil.which(str(name).strip())
    ]
    if missing:
        raise cli.PackageError(
            "Missing required commands: " + ", ".join(sorted(set(missing)))
        )


def _parse_repo_info_payload(raw_json: str, *, context: str) -> dict[str, object]:
    cli = _cli()
    try:
        payload = json.loads(raw_json)
    except json.JSONDecodeError as exc:
        raise cli.PackageError(f"Invalid JSON from {context}: {exc}") from exc
    if not isinstance(payload, dict):
        raise cli.PackageError(f"Invalid payload from {context}: expected object.")
    return payload


def _try_fetch_repo_info(name_with_owner: str) -> dict[str, object] | None:
    completed = _run_external_command(
        [
            "gh",
            "repo",
            "view",
            name_with_owner,
            "--json",
            "name,nameWithOwner,url,description,homepageUrl,defaultBranchRef,visibility",
        ],
        check=False,
    )
    if int(getattr(completed, "returncode", 1)) != 0:
        return None
    stdout = str(getattr(completed, "stdout", "") or "").strip()
    if not stdout:
        return None
    return _parse_repo_info_payload(stdout, context=f"gh repo view {name_with_owner}")


def _fetch_repo_info(name_with_owner: str) -> dict[str, object]:
    payload = _try_fetch_repo_info(name_with_owner)
    cli = _cli()
    if payload is None:
        raise cli.PackageError(
            f"Unable to fetch repository metadata: {name_with_owner}"
        )
    return payload


def _resolve_github_login() -> str:
    completed = _run_external_command(["gh", "api", "user", "--jq", ".login"])
    login = str(getattr(completed, "stdout", "") or "").strip()
    cli = _cli()
    if not login:
        raise cli.PackageError(
            "Unable to resolve GitHub login from `gh api user --jq .login`."
        )
    return login


def _normalize_github_owner(raw_owner: str | None, *, field_name: str) -> str | None:
    cli = _cli()
    value = str(raw_owner or "").strip()
    if value.startswith("@"):
        value = value[1:].strip()
    if not value:
        return None
    if not GITHUB_OWNER_TOKEN_RE.fullmatch(value):
        raise cli.PackageError(
            f"{field_name} must be a GitHub account/organization name "
            "(letters, numbers, hyphens)."
        )
    return value


def _repo_default_branch(
    repo_info: dict[str, object], *, fallback: str = "main"
) -> str:
    branch_ref = repo_info.get("defaultBranchRef")
    if isinstance(branch_ref, dict):
        value = str(branch_ref.get("name") or "").strip()
        if value:
            return value
    return fallback


def _ensure_public_fork(
    *,
    upstream_owner: str,
    upstream_repo: str,
    github_login: str,
    organization: str | None,
) -> dict[str, str]:
    cli = _cli()
    upstream = f"{upstream_owner}/{upstream_repo}"
    fork_owner = organization or github_login
    fork_name = f"{fork_owner}/{upstream_repo}"

    fork_info = _try_fetch_repo_info(fork_name)
    if fork_info is None:
        command = ["gh", "repo", "fork", upstream, "--clone=false"]
        if organization:
            command.extend(["--org", organization])
        _run_external_command(command)
        fork_info = _fetch_repo_info(fork_name)

    visibility = str(fork_info.get("visibility") or "").strip().lower()
    if visibility and visibility != "public":
        raise cli.PackageError(
            f"Fork {fork_name} exists but is not public (visibility={visibility})."
        )

    default_branch = _repo_default_branch(fork_info, fallback="main")
    fork_url = (
        str(fork_info.get("url") or "").strip() or f"https://github.com/{fork_name}"
    )
    return {
        "fork_name": fork_name,
        "fork_url": fork_url,
        "fork_clone_url": f"https://github.com/{fork_name}.git",
        "default_branch": default_branch,
    }


def _git_status_porcelain(repo_dir: Path) -> str:
    completed = _run_external_command(
        ["git", "status", "--porcelain"], cwd=repo_dir, check=True
    )
    return str(getattr(completed, "stdout", "") or "").strip()


def _checkout_branch(repo_dir: Path, branch_name: str) -> None:
    completed = _run_external_command(
        ["git", "checkout", branch_name], cwd=repo_dir, check=False
    )
    if int(getattr(completed, "returncode", 1)) == 0:
        return
    _run_external_command(
        ["git", "checkout", "-B", branch_name, f"origin/{branch_name}"],
        cwd=repo_dir,
    )


def _prepare_fork_clone(
    *,
    workspace_root: Path,
    package_id: str,
    upstream_repo: str,
    clone_url: str,
    default_branch: str,
) -> Path:
    cli = _cli()
    safe_repo = cli.normalize_package_id(upstream_repo)
    clone_dir = workspace_root / f"{package_id}-{safe_repo}"
    workspace_root.mkdir(parents=True, exist_ok=True)

    if clone_dir.exists():
        if not (clone_dir / ".git").is_dir():
            raise cli.PackageError(
                f"Clone path exists but is not a git repository: {clone_dir}"
            )
        if _git_status_porcelain(clone_dir):
            raise cli.PackageError(
                f"Clone has uncommitted changes; clean it before auto-compile: {clone_dir}"
            )
        _run_external_command(
            ["git", "remote", "set-url", "origin", clone_url], cwd=clone_dir
        )
        _run_external_command(["git", "fetch", "origin"], cwd=clone_dir)
        _checkout_branch(clone_dir, default_branch)
        _run_external_command(
            ["git", "pull", "--ff-only", "origin", default_branch], cwd=clone_dir
        )
    else:
        _run_external_command(["git", "clone", clone_url, str(clone_dir)])
        _run_external_command(["git", "fetch", "origin"], cwd=clone_dir)
        _checkout_branch(clone_dir, default_branch)

    return clone_dir


def _invoke_compile_for_auto_compile(
    *,
    package_id: str,
    project_root: Path,
    max_skills: int,
    core_skill_count: int,
    docs_only: bool,
    keep_compile_artifacts: bool,
    strict_compile_validation: bool,
) -> dict[str, object]:
    skills_root = project_root / "skills"
    if skills_root.is_dir():
        return {
            "performed": False,
            "reason": "skills_already_exists",
            "project_root": str(project_root),
        }

    compile_args = argparse.Namespace(
        package_id=package_id,
        project_path=str(project_root),
        title=None,
        max_skills=max_skills,
        core_skill_count=core_skill_count,
        docs_only=docs_only,
        keep_compile_artifacts=keep_compile_artifacts,
        strict_compile_validation=strict_compile_validation,
        install_off=True,
        activate=False,
        no_router_sync=True,
        json=False,
    )
    exit_code = cmd_compile(compile_args)
    if exit_code != 0:
        raise _cli().PackageError(
            f"Compile failed for {package_id} at {project_root} with exit code {exit_code}."
        )
    return {
        "performed": True,
        "reason": "compiled",
        "project_root": str(project_root),
    }


def _commit_and_push_changes(
    *,
    repo_dir: Path,
    package_id: str,
    default_branch: str,
) -> dict[str, object]:
    _checkout_branch(repo_dir, default_branch)
    initial_status = _git_status_porcelain(repo_dir)
    committed = False
    commit_sha = ""
    if initial_status:
        _run_external_command(["git", "add", "-A"], cwd=repo_dir)
        staged_status = _git_status_porcelain(repo_dir)
        if staged_status:
            message = AUTO_COMPILE_COMMIT_TEMPLATE.format(package_id=package_id)
            _run_external_command(["git", "commit", "-m", message], cwd=repo_dir)
            committed = True
            commit_sha = str(
                getattr(
                    _run_external_command(["git", "rev-parse", "HEAD"], cwd=repo_dir),
                    "stdout",
                    "",
                )
                or ""
            ).strip()
    _run_external_command(
        ["git", "push", "origin", f"HEAD:{default_branch}"], cwd=repo_dir
    )
    return {
        "committed": committed,
        "commit_sha": commit_sha,
        "pushed_branch": default_branch,
        "has_changes": bool(initial_status),
    }


def _read_repo_excerpt(repo_dir: Path, *, max_chars: int = 5000) -> str:
    for candidate in ("README.md", "README.rst", "Readme.md"):
        path = repo_dir / candidate
        if not path.is_file():
            continue
        try:
            text = path.read_text(encoding="utf-8")
        except OSError:
            continue
        cleaned = text.strip()
        if not cleaned:
            continue
        return cleaned[:max_chars]
    return ""


def _load_disambiguation_package_ids(
    *,
    fermilink_repo: Path,
    channel_id: str,
    package_id: str,
) -> list[str]:
    cli = _cli()
    normalized_channel = cli.normalize_channel_id(channel_id)
    normalized_package_id = cli.normalize_package_id(package_id)
    curated_path = (
        fermilink_repo
        / "src"
        / "fermilink"
        / "data"
        / "curated_channels"
        / f"{normalized_channel}.json"
    )
    family_path = (
        fermilink_repo / "src" / "fermilink" / "data" / "router" / "family_hints.json"
    )

    candidates: list[str] = []
    if curated_path.is_file():
        curated_payload = _read_json_object(curated_path)
        packages_raw = curated_payload.get("packages")
        if isinstance(packages_raw, list):
            for item in packages_raw:
                if not isinstance(item, dict):
                    continue
                candidate = cli.normalize_package_id(str(item.get("package_id") or ""))
                if candidate and candidate != normalized_package_id:
                    candidates.append(candidate)

    if family_path.is_file():
        family_payload = _read_json_object(family_path)
        families_raw = family_payload.get("families")
        if isinstance(families_raw, dict):
            for family_name in families_raw:
                candidate = cli.normalize_package_id(str(family_name or ""))
                if candidate and candidate != normalized_package_id:
                    candidates.append(candidate)

    return _normalize_unique_terms(
        candidates,
        field_name="disambiguation_package_ids",
        min_items=0,
        max_items=200,
        lowercase=True,
    )


def _normalize_unique_terms(
    raw: object,
    *,
    field_name: str,
    min_items: int = 0,
    max_items: int | None = None,
    lowercase: bool = True,
) -> list[str]:
    cli = _cli()
    if isinstance(raw, str):
        items = [token.strip() for token in raw.split(",")]
    elif isinstance(raw, list):
        items = [str(item).strip() for item in raw if isinstance(item, str)]
    else:
        items = []
    terms: list[str] = []
    seen: set[str] = set()
    for item in items:
        if not item:
            continue
        value = item.lower() if lowercase else item
        key = value.lower()
        if key in seen:
            continue
        seen.add(key)
        terms.append(value)
    if max_items is not None:
        terms = terms[:max_items]
    if len(terms) < min_items:
        raise cli.PackageError(
            f"Generated metadata field `{field_name}` requires at least {min_items} item(s)."
        )
    return terms


def _build_auto_compile_metadata_prompt(
    *,
    package_id: str,
    upstream_repo_url: str,
    fork_repo_url: str,
    default_branch: str,
    upstream_description: str,
    upstream_homepage: str,
    readme_excerpt: str,
    disambiguation_package_ids: list[str],
) -> str:
    excerpt_block = readme_excerpt.strip() or "(no README excerpt available)"
    disambiguation_block = ", ".join(disambiguation_package_ids[:30]) or "(none)"
    return (
        "Generate metadata for onboarding one scientific package into FermiLink.\n"
        "Return only one tagged JSON payload and no extra text.\n"
        f"Use this exact format: <{AUTO_COMPILE_METADATA_TAG}>{{...}}</{AUTO_COMPILE_METADATA_TAG}>.\n"
        "Required JSON fields:\n"
        "- title: short human-friendly package title.\n"
        "- description: one concise sentence (12-40 words).\n"
        "- tags: list of 3-8 short lowercase tags.\n"
        "- family_description: one concise sentence for router family hints.\n"
        "- strong_keywords: list of 4-12 high-confidence routing terms.\n"
        "- keywords: list of 4-14 secondary routing terms.\n"
        "- negative_keywords: list of 0-10 disambiguation terms likely belonging "
        "to other scientific packages.\n"
        "Constraints:\n"
        "- Terms must be plain strings, no punctuation-only tokens.\n"
        "- Keep terms domain-specific and useful for routing user intents.\n"
        "- Avoid generic AI words.\n"
        "- Include package canonical name in strong_keywords.\n"
        "- For strong_keywords/keywords, avoid generic dependency/toolchain labels "
        "(numpy/scipy/cython/conda/backend) unless absolutely central to intent.\n"
        "- For negative_keywords, prefer entries from candidate disambiguation package "
        "ids when suitable.\n"
        "- Do not include non-scientific software/product terms "
        "(web frontend, mobile app, blockchain, devops).\n"
        f"- Package id: {package_id}\n"
        f"- Upstream repo: {upstream_repo_url}\n"
        f"- Fork repo: {fork_repo_url}\n"
        f"- Fork default branch: {default_branch}\n"
        f"- Upstream description: {upstream_description or '(none)'}\n"
        f"- Upstream homepage: {upstream_homepage or '(none)'}\n"
        f"- Candidate disambiguation package ids: {disambiguation_block}\n"
        "README excerpt:\n"
        "<<<README\n"
        f"{excerpt_block}\n"
        "README>>>\n"
    )


def _generate_metadata_with_provider(
    *,
    metadata_repo_dir: Path,
    package_id: str,
    upstream_repo_url: str,
    fork_repo_url: str,
    default_branch: str,
    upstream_description: str,
    upstream_homepage: str,
    readme_excerpt: str,
    disambiguation_package_ids: list[str],
) -> dict[str, object]:
    cli = _cli()
    runtime_policy = cli.resolve_agent_runtime_policy()
    provider = runtime_policy.provider
    if not cli.provider_supports_auto_compile_metadata_generation(provider):
        raise cli.PackageError(
            "auto-compile metadata generation is not supported by the current "
            f"provider '{provider}'. Select a provider whose agent adapter "
            "enables metadata generation."
        )

    prompt = _build_auto_compile_metadata_prompt(
        package_id=package_id,
        upstream_repo_url=upstream_repo_url,
        fork_repo_url=fork_repo_url,
        default_branch=default_branch,
        upstream_description=upstream_description,
        upstream_homepage=upstream_homepage,
        readme_excerpt=readme_excerpt,
        disambiguation_package_ids=disambiguation_package_ids,
    )
    if not metadata_repo_dir.is_dir():
        raise cli.PackageError(
            f"Invalid metadata repo directory for auto-compile: {metadata_repo_dir}"
        )
    response = cli._run_exec_chat_turn(
        repo_dir=metadata_repo_dir,
        prompt=prompt,
        sandbox="read-only",
        provider_bin_override=cli.resolve_provider_binary_override(
            provider,
            raw_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE,
        ),
        provider=provider,
        sandbox_policy="enforce",
        model=runtime_policy.model,
        reasoning_effort=runtime_policy.reasoning_effort,
    )
    return_code_raw = response.get("return_code")
    try:
        return_code = int(1 if return_code_raw is None else return_code_raw)
    except (TypeError, ValueError):
        return_code = 1
    if return_code != 0:
        stderr = str(response.get("stderr") or "").strip()
        detail = stderr or f"exit code {return_code}"
        raise cli.PackageError(f"{provider} metadata generation failed: {detail}")

    assistant_text = str(response.get("assistant_text") or "")
    payload = cli._extract_tagged_json_payload(
        assistant_text,
        token_re=AUTO_COMPILE_METADATA_TOKEN_RE,
    )
    if not isinstance(payload, dict):
        raise cli.PackageError(
            "Failed to parse generated metadata JSON payload from tagged response."
        )
    return payload


def _build_curated_entry_from_metadata(
    *,
    package_id: str,
    upstream_repo_url: str,
    upstream_homepage: str,
    fork_owner_repo: str,
    default_branch: str,
    metadata_payload: dict[str, object],
) -> dict[str, object]:
    cli = _cli()
    title_raw = str(metadata_payload.get("title") or "").strip()
    description_raw = str(metadata_payload.get("description") or "").strip()
    if not title_raw or not description_raw:
        raise cli.PackageError(
            "Generated metadata is missing required title/description fields."
        )
    tags = _normalize_unique_terms(
        metadata_payload.get("tags"),
        field_name="tags",
        min_items=3,
        max_items=8,
        lowercase=True,
    )
    branch = str(default_branch or "main").strip() or "main"
    source_archive_url = (
        f"https://github.com/{fork_owner_repo}/archive/refs/heads/{branch}.zip"
    )
    homepage = str(upstream_homepage or "").strip() or upstream_repo_url
    return {
        "package_id": cli.normalize_package_id(package_id),
        "title": title_raw,
        "description": description_raw,
        "upstream_repo_url": upstream_repo_url,
        "homepage_url": homepage,
        "zip_url": source_archive_url,
        "default_version": "branch-head",
        "versions": [
            {
                "version_id": "branch-head",
                "source_archive_url": source_archive_url,
                "source_ref": {
                    "type": "branch",
                    "value": branch,
                },
                "verified": False,
            }
        ],
        "tags": tags,
    }


def _build_family_entry_from_metadata(
    *,
    package_id: str,
    metadata_payload: dict[str, object],
    disambiguation_package_ids: list[str] | None = None,
) -> dict[str, object]:
    description = str(metadata_payload.get("family_description") or "").strip()
    if not description:
        description = f"Routing hints for {package_id} workflows."
    strong_keywords = _normalize_unique_terms(
        metadata_payload.get("strong_keywords"),
        field_name="strong_keywords",
        min_items=4,
        max_items=12,
        lowercase=True,
    )
    keywords = _normalize_unique_terms(
        metadata_payload.get("keywords"),
        field_name="keywords",
        min_items=4,
        max_items=14,
        lowercase=True,
    )
    filtered_keywords = [
        term for term in keywords if term not in ROUTER_KEYWORD_NOISE_TERMS
    ]
    if len(filtered_keywords) >= 4:
        keywords = filtered_keywords
    negative_keywords = _normalize_unique_terms(
        metadata_payload.get("negative_keywords"),
        field_name="negative_keywords",
        min_items=0,
        max_items=10,
        lowercase=True,
    )
    negative_keywords = [
        term
        for term in negative_keywords
        if term not in ROUTER_NEGATIVE_NONSCIENTIFIC_TERMS
    ]
    if isinstance(disambiguation_package_ids, list) and disambiguation_package_ids:
        peer_terms = _normalize_unique_terms(
            disambiguation_package_ids,
            field_name="disambiguation_package_ids",
            min_items=0,
            max_items=200,
            lowercase=True,
        )
        for peer in peer_terms:
            if peer == package_id or peer in negative_keywords:
                continue
            negative_keywords.append(peer)
    negative_keywords = _normalize_unique_terms(
        negative_keywords,
        field_name="negative_keywords",
        min_items=0,
        max_items=10,
        lowercase=True,
    )
    if package_id not in strong_keywords:
        strong_keywords.insert(0, package_id)
        strong_keywords = _normalize_unique_terms(
            strong_keywords,
            field_name="strong_keywords",
            min_items=4,
            max_items=12,
            lowercase=True,
        )
    return {
        "description": description,
        "strong_keywords": strong_keywords,
        "keywords": keywords,
        "negative_keywords": negative_keywords,
    }


def _validate_curated_entry_shape(
    *,
    package_id: str,
    curated_entry: dict[str, object],
) -> None:
    cli = _cli()
    required = {
        "package_id",
        "title",
        "description",
        "upstream_repo_url",
        "default_version",
        "versions",
    }
    missing = [key for key in sorted(required) if key not in curated_entry]
    if missing:
        raise cli.PackageError(
            f"Curated entry is missing required fields: {', '.join(missing)}"
        )
    normalized_id = cli.normalize_package_id(str(curated_entry.get("package_id") or ""))
    if normalized_id != package_id:
        raise cli.PackageError(
            "Curated entry package_id mismatch against requested package id."
        )
    for text_field in ("title", "description", "upstream_repo_url", "default_version"):
        value = str(curated_entry.get(text_field) or "").strip()
        if not value:
            raise cli.PackageError(
                f"Curated entry field `{text_field}` must be non-empty."
            )
    versions = curated_entry.get("versions")
    if not isinstance(versions, list) or not versions:
        raise cli.PackageError("Curated entry requires non-empty versions[].")
    for index, version in enumerate(versions, start=1):
        if not isinstance(version, dict):
            raise cli.PackageError(
                f"Curated entry versions[{index}] must be an object."
            )
        version_id = str(version.get("version_id") or "").strip()
        source_archive_url = str(version.get("source_archive_url") or "").strip()
        if not version_id or not source_archive_url:
            raise cli.PackageError(
                f"Curated entry versions[{index}] missing version_id/source_archive_url."
            )
        if not isinstance(version.get("verified"), bool):
            raise cli.PackageError(
                f"Curated entry versions[{index}].verified must be boolean."
            )
        source_ref = version.get("source_ref")
        if not isinstance(source_ref, dict):
            raise cli.PackageError(
                f"Curated entry versions[{index}].source_ref must be object."
            )
        ref_type = str(source_ref.get("type") or "").strip()
        ref_value = str(source_ref.get("value") or "").strip()
        if not ref_type or not ref_value:
            raise cli.PackageError(
                f"Curated entry versions[{index}].source_ref requires type/value."
            )
    if "tags" in curated_entry:
        _normalize_unique_terms(
            curated_entry.get("tags"),
            field_name="tags",
            min_items=1,
            max_items=20,
            lowercase=True,
        )


def _validate_family_entry_shape(family_entry: dict[str, object]) -> None:
    cli = _cli()
    description = str(family_entry.get("description") or "").strip()
    if not description:
        raise cli.PackageError("Family hints entry requires non-empty description.")
    for field_name in ("strong_keywords", "keywords", "negative_keywords"):
        raw = family_entry.get(field_name, [])
        _normalize_unique_terms(
            raw,
            field_name=field_name,
            min_items=0,
            max_items=50,
            lowercase=True,
        )


def _validate_data_payloads_with_script(
    *,
    fermilink_repo: Path,
    channel_id: str,
    curated_payload: dict[str, object],
    family_payload: dict[str, object],
) -> None:
    cli = _cli()
    validate_script = fermilink_repo / "scripts" / "validate_data.py"
    if not validate_script.is_file():
        raise cli.PackageError(f"Missing data validation script: {validate_script}")

    with cli.tempfile.TemporaryDirectory(
        prefix="fermilink-auto-compile-validate-"
    ) as temp_dir:
        temp_root = Path(temp_dir)
        curated_path = (
            temp_root
            / "src"
            / "fermilink"
            / "data"
            / "curated_channels"
            / f"{channel_id}.json"
        )
        family_path = (
            temp_root / "src" / "fermilink" / "data" / "router" / "family_hints.json"
        )
        _write_json_atomic(curated_path, curated_payload)
        _write_json_atomic(family_path, family_payload)
        completed = cli.subprocess.run(
            [cli.sys.executable, str(validate_script), "--repo-root", str(temp_root)],
            capture_output=True,
            text=True,
            check=False,
        )
    if completed.returncode != 0:
        stderr = (completed.stderr or "").strip()
        stdout = (completed.stdout or "").strip()
        detail = stderr or stdout or f"exit code {completed.returncode}"
        raise cli.PackageError(f"Data validation failed after merge preview: {detail}")


def _precheck_metadata_merge_conflicts(
    *,
    fermilink_repo: Path,
    channel_id: str,
    package_id: str,
    update_existing: bool,
) -> None:
    if update_existing:
        return

    cli = _cli()
    normalized_channel = cli.normalize_channel_id(channel_id)
    curated_path = (
        fermilink_repo
        / "src"
        / "fermilink"
        / "data"
        / "curated_channels"
        / f"{normalized_channel}.json"
    )
    family_path = (
        fermilink_repo / "src" / "fermilink" / "data" / "router" / "family_hints.json"
    )
    if not curated_path.is_file():
        raise cli.PackageError(f"Missing curated channel file: {curated_path}")
    if not family_path.is_file():
        raise cli.PackageError(f"Missing family hints file: {family_path}")

    curated_payload = _read_json_object(curated_path)
    family_payload = _read_json_object(family_path)
    packages_raw = curated_payload.get("packages")
    if not isinstance(packages_raw, list):
        raise cli.PackageError(f"Invalid curated channel file: {curated_path}")
    existing_package_ids = {
        str(item.get("package_id") or "").strip().lower()
        for item in packages_raw
        if isinstance(item, dict)
    }
    if package_id in existing_package_ids:
        raise cli.PackageError(
            f"Package '{package_id}' already exists in {curated_path}. "
            "Use --update-existing to replace it."
        )

    families_raw = family_payload.get("families")
    if not isinstance(families_raw, dict):
        raise cli.PackageError(f"Invalid family hints file: {family_path}")
    if package_id in families_raw:
        raise cli.PackageError(
            f"Family '{package_id}' already exists in {family_path}. "
            "Use --update-existing to replace it."
        )


def _merge_metadata_entries(
    *,
    fermilink_repo: Path,
    channel_id: str,
    package_id: str,
    curated_entry: dict[str, object],
    family_entry: dict[str, object],
    update_existing: bool,
    dry_run: bool,
) -> dict[str, object]:
    cli = _cli()
    normalized_channel = cli.normalize_channel_id(channel_id)
    curated_path = (
        fermilink_repo
        / "src"
        / "fermilink"
        / "data"
        / "curated_channels"
        / f"{normalized_channel}.json"
    )
    family_path = (
        fermilink_repo / "src" / "fermilink" / "data" / "router" / "family_hints.json"
    )
    if not curated_path.is_file():
        raise cli.PackageError(f"Missing curated channel file: {curated_path}")
    if not family_path.is_file():
        raise cli.PackageError(f"Missing family hints file: {family_path}")

    curated_payload = _read_json_object(curated_path)
    family_payload = _read_json_object(family_path)

    packages_raw = curated_payload.get("packages")
    if not isinstance(packages_raw, list):
        raise cli.PackageError(f"Invalid curated channel file: {curated_path}")
    packages = [item for item in packages_raw if isinstance(item, dict)]
    existing_package_ids = {
        str(item.get("package_id") or "").strip().lower() for item in packages
    }

    replaced_curated = False
    if package_id in existing_package_ids:
        if not update_existing:
            raise cli.PackageError(
                f"Package '{package_id}' already exists in {curated_path}. "
                "Use --update-existing to replace it."
            )
        for index, item in enumerate(packages):
            package_key = str(item.get("package_id") or "").strip().lower()
            if package_key == package_id:
                packages[index] = curated_entry
                replaced_curated = True
                break
    if not replaced_curated:
        packages.append(curated_entry)
    packages.sort(key=lambda item: str(item.get("package_id") or "").strip().lower())
    curated_payload["packages"] = packages

    families_raw = family_payload.get("families")
    if not isinstance(families_raw, dict):
        raise cli.PackageError(f"Invalid family hints file: {family_path}")
    families = dict(families_raw)
    replaced_family = package_id in families
    if replaced_family and not update_existing:
        raise cli.PackageError(
            f"Family '{package_id}' already exists in {family_path}. "
            "Use --update-existing to replace it."
        )
    families[package_id] = family_entry
    family_payload["families"] = families

    timestamp = _utc_now_z()
    curated_payload["updated_at"] = timestamp
    family_payload["updated_at"] = timestamp

    _validate_data_payloads_with_script(
        fermilink_repo=fermilink_repo,
        channel_id=normalized_channel,
        curated_payload=curated_payload,
        family_payload=family_payload,
    )

    if not dry_run:
        _write_json_atomic(curated_path, curated_payload)
        _write_json_atomic(family_path, family_payload)

    return {
        "channel_id": normalized_channel,
        "curated_path": str(curated_path),
        "family_path": str(family_path),
        "replaced_curated": replaced_curated,
        "replaced_family": replaced_family,
        "dry_run": dry_run,
        "updated_at": timestamp,
    }


def _process_auto_compile_package(
    *,
    package_id: str,
    upstream_repo_url: str,
    github_login: str,
    organization: str | None,
    fermilink_repo: Path,
    workspace_root: Path,
    channel: str,
    max_skills: int,
    core_skill_count: int,
    docs_only: bool,
    keep_compile_artifacts: bool,
    strict_compile_validation: bool,
    update_existing: bool,
    dry_run: bool,
    cleanup_clone: bool,
) -> dict[str, object]:
    upstream_owner, upstream_repo, canonical_upstream = _normalize_github_repo_url(
        upstream_repo_url
    )
    _precheck_metadata_merge_conflicts(
        fermilink_repo=fermilink_repo,
        channel_id=channel,
        package_id=package_id,
        update_existing=update_existing,
    )
    fork = _ensure_public_fork(
        upstream_owner=upstream_owner,
        upstream_repo=upstream_repo,
        github_login=github_login,
        organization=organization,
    )
    clone_dir: Path | None = None
    try:
        clone_dir = _prepare_fork_clone(
            workspace_root=workspace_root,
            package_id=package_id,
            upstream_repo=upstream_repo,
            clone_url=fork["fork_clone_url"],
            default_branch=fork["default_branch"],
        )
        compile_result = _invoke_compile_for_auto_compile(
            package_id=package_id,
            project_root=clone_dir,
            max_skills=max_skills,
            core_skill_count=core_skill_count,
            docs_only=docs_only,
            keep_compile_artifacts=keep_compile_artifacts,
            strict_compile_validation=strict_compile_validation,
        )
        push_result = _commit_and_push_changes(
            repo_dir=clone_dir,
            package_id=package_id,
            default_branch=fork["default_branch"],
        )

        upstream_info = _fetch_repo_info(f"{upstream_owner}/{upstream_repo}")
        upstream_description = str(upstream_info.get("description") or "").strip()
        upstream_homepage = str(upstream_info.get("homepageUrl") or "").strip()
        readme_excerpt = _read_repo_excerpt(clone_dir)
        disambiguation_package_ids = _load_disambiguation_package_ids(
            fermilink_repo=fermilink_repo,
            channel_id=channel,
            package_id=package_id,
        )

        generated_metadata = _generate_metadata_with_provider(
            metadata_repo_dir=clone_dir,
            package_id=package_id,
            upstream_repo_url=canonical_upstream,
            fork_repo_url=fork["fork_url"],
            default_branch=fork["default_branch"],
            upstream_description=upstream_description,
            upstream_homepage=upstream_homepage,
            readme_excerpt=readme_excerpt,
            disambiguation_package_ids=disambiguation_package_ids,
        )

        curated_entry = _build_curated_entry_from_metadata(
            package_id=package_id,
            upstream_repo_url=canonical_upstream,
            upstream_homepage=upstream_homepage,
            fork_owner_repo=fork["fork_name"],
            default_branch=fork["default_branch"],
            metadata_payload=generated_metadata,
        )
        family_entry = _build_family_entry_from_metadata(
            package_id=package_id,
            metadata_payload=generated_metadata,
            disambiguation_package_ids=disambiguation_package_ids,
        )
        _validate_curated_entry_shape(
            package_id=package_id,
            curated_entry=curated_entry,
        )
        _validate_family_entry_shape(family_entry)

        merge_result = _merge_metadata_entries(
            fermilink_repo=fermilink_repo,
            channel_id=channel,
            package_id=package_id,
            curated_entry=curated_entry,
            family_entry=family_entry,
            update_existing=update_existing,
            dry_run=dry_run,
        )
        return {
            "package_id": package_id,
            "upstream_repo_url": canonical_upstream,
            "fork": fork,
            "clone_dir": str(clone_dir),
            "compile": compile_result,
            "push": push_result,
            "metadata": {
                "title": curated_entry.get("title"),
                "description": curated_entry.get("description"),
                "tags": curated_entry.get("tags"),
                "family_description": family_entry.get("description"),
                "strong_keywords": family_entry.get("strong_keywords"),
                "keywords": family_entry.get("keywords"),
                "negative_keywords": family_entry.get("negative_keywords"),
            },
            "merge": merge_result,
            "status": "ok",
        }
    finally:
        if cleanup_clone and isinstance(clone_dir, Path) and clone_dir.exists():
            shutil.rmtree(clone_dir, ignore_errors=True)


[docs] def cmd_auto_compile(args: argparse.Namespace) -> int: """ Execute the `auto-compile` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() _ensure_required_commands_available(command_names=("gh", "git")) runtime_policy = cli.resolve_agent_runtime_policy() if not cli.provider_supports_auto_compile_metadata_generation( runtime_policy.provider ): raise cli.PackageError( "auto-compile requires a provider whose agent adapter supports " f"metadata generation. Current provider '{runtime_policy.provider}' " "does not support it." ) specs = _load_auto_compile_specs( package_id_arg=getattr(args, "package_id", None), upstream_repo_url_arg=getattr(args, "upstream_repo_url", None), spec_file_arg=getattr(args, "spec_file", None), ) if not specs: raise cli.PackageError("No packages provided for auto-compile.") fermilink_repo = Path(str(args.fermilink_repo)).expanduser().resolve() if not fermilink_repo.is_dir(): raise cli.PackageError(f"Invalid --fermilink-repo path: {fermilink_repo}") workspace_root = Path(str(args.workspace_root)).expanduser().resolve() workspace_root.mkdir(parents=True, exist_ok=True) max_skills = int(getattr(args, "max_skills", 30)) if max_skills < 2: raise cli.PackageError("--max-skills must be >= 2.") core_skill_count = int(getattr(args, "core_skill_count", 6)) if core_skill_count < 1: raise cli.PackageError("--core-skill-count must be >= 1.") channel = cli.normalize_channel_id(getattr(args, "channel", "skilled-scipkg")) github_login = _resolve_github_login() organization = _normalize_github_owner( getattr(args, "organization", None), field_name="--organization", ) fork_owner = organization or github_login processed: list[dict[str, object]] = [] failed: list[dict[str, object]] = [] for spec in specs: package_id = str(spec["package_id"]) upstream_repo_url = str(spec["upstream_repo_url"]) try: result = _process_auto_compile_package( package_id=package_id, upstream_repo_url=upstream_repo_url, github_login=github_login, organization=organization, fermilink_repo=fermilink_repo, workspace_root=workspace_root, channel=channel, max_skills=max_skills, core_skill_count=core_skill_count, docs_only=bool(getattr(args, "docs_only", False)), keep_compile_artifacts=bool( getattr(args, "keep_compile_artifacts", False) ), strict_compile_validation=bool( getattr(args, "strict_compile_validation", False) ), update_existing=bool(getattr(args, "update_existing", False)), dry_run=bool(getattr(args, "dry_run", False)), cleanup_clone=bool(getattr(args, "cleanup_clone", False)), ) processed.append(result) except ( cli.PackageError, ValueError, OSError, RuntimeError, ) as exc: failed.append( { "package_id": package_id, "upstream_repo_url": upstream_repo_url, "error": str(exc), } ) if bool(getattr(args, "fail_fast", False)): break payload = { "github_login": github_login, "organization": organization, "fork_owner": fork_owner, "channel": channel, "fermilink_repo": str(fermilink_repo), "workspace_root": str(workspace_root), "dry_run": bool(getattr(args, "dry_run", False)), "processed_count": len(processed), "failed_count": len(failed), "processed": processed, "failed": failed, "requested_count": len(specs), } lines = [ ( f"Auto-compile processed {len(processed)} package(s) with " f"{len(failed)} failure(s)." ), f"GitHub account: {github_login}.", ( f"Fork owner organization: {organization}." if isinstance(organization, str) and organization else f"Fork owner account: {fork_owner}." ), f"Curated channel: {channel}.", ] if failed: for item in failed: lines.append( f"Failed: {item['package_id']} ({item['upstream_repo_url']}): {item['error']}" ) else: lines.append("All packages completed successfully.") cli._emit_output(args, payload, lines) return 0 if not failed else 2
[docs] def cmd_compile(args: argparse.Namespace) -> int: """ Execute the `compile` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() install_off = bool(getattr(args, "install_off", False)) scipkg_root: Path | None = None if not install_off: scipkg_root = cli.resolve_scipkg_root() package_id = cli.normalize_package_id(args.package_id) project_root = cli._resolve_project_path(args.project_path) if not project_root.exists() or not project_root.is_dir(): raise cli.PackageError(f"Compile path is not a directory: {project_root}") git_repo_initialized = cli._ensure_compile_repo_ready(project_root) max_skills = int(getattr(args, "max_skills", 30)) if max_skills < 2: raise cli.PackageError("--max-skills must be >= 2.") core_skill_count = int(getattr(args, "core_skill_count", 6)) if core_skill_count < 1: raise cli.PackageError("--core-skill-count must be >= 1.") docs_only_override = bool(getattr(args, "docs_only", False)) keep_compile_artifacts = bool(getattr(args, "keep_compile_artifacts", False)) strict_compile_validation = bool(getattr(args, "strict_compile_validation", False)) if not install_off and scipkg_root is not None: registry = cli.load_registry(scipkg_root) packages = registry.get("packages", {}) if isinstance(packages, dict) and package_id in packages: raise cli.PackageError( f"Warning: package id '{package_id}' already exists. " "Choose a new package id for compile." ) tool_source = cli._resolve_compile_tool_source() if not tool_source.is_dir(): raise cli.PackageError(f"Missing compile tool source: {tool_source}") runtime_policy = cli.resolve_agent_runtime_policy() provider = runtime_policy.provider provider_bin_override = cli.resolve_provider_binary_override( provider, raw_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE, ) provider_bin = cli.resolve_provider_binary( provider, provider_bin_override=provider_bin_override, ) tool_dest = project_root / "sci-skills-generator" if tool_dest.exists(): raise cli.PackageError( f"Compile path already contains {tool_dest.name}/. " "Remove it first or choose a different path." ) run_mode = "compile" run_id = _build_compile_run_id("compile") run_goal = ( "Compile skills with deterministic generation, targeted enrichment, and audit." ) compile_memory_path = cli._reset_compile_memory_short_term( project_root, package_id=package_id, mode=run_mode, run_id=run_id, run_goal=run_goal, ) previous_source_inventory = cli._load_previous_source_inventory(project_root) shutil.copytree(tool_source, tool_dest) compile_runs: list[dict[str, object]] = [] profile_payload: dict[str, object] = {} skill_plan_payload: dict[str, object] = {} skill_plan_path = "" pass_scope_diffs: dict[str, dict[str, list[str]]] = {} generation_result: dict[str, object] = {} evidence_payload: dict[str, object] = {} validation_payload: dict[str, object] = {} memory_update_payload: dict[str, object] = {} compile_report_path = "" try: pass_1_prompt = ( f"{cli.COMPILE_PROMPT_1}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan output file: {cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Read compile memory first and keep this run plan consistent with prior gaps." ) pass_1 = cli._run_compile_provider_pass( project_root, prompt=pass_1_prompt, pass_index=1, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_1_assistant_text = str(pass_1.pop("assistant_text", "") or "") compile_runs.append(pass_1) profile_payload = cli._load_compile_profile( project_root, default_package_name=package_id, assistant_text=pass_1_assistant_text, ) skill_plan_payload = cli._load_compile_skill_plan( project_root, package_id=package_id, mode=run_mode, assistant_text=pass_1_assistant_text, ) skill_plan_path = cli._write_compile_skill_plan( project_root, skill_plan=skill_plan_payload, ) generation_result = cli._run_compile_generator( project_root, tool_dir=tool_dest, profile=profile_payload, max_skills=max_skills, docs_only_override=docs_only_override, ) evidence_payload = cli._build_compile_evidence_bundle( project_root, core_skill_count=core_skill_count, ) available_skill_ids = cli._list_skill_ids(project_root) core_skill_ids_raw = evidence_payload.get("core_skills") core_skill_ids = ( list(core_skill_ids_raw) if isinstance(core_skill_ids_raw, list) else [] ) skill_plan_payload = cli._normalize_compile_skill_plan( skill_plan_payload, package_id=package_id, mode=run_mode, available_skill_ids=available_skill_ids, core_skill_ids=core_skill_ids, ) skill_plan_path = cli._write_compile_skill_plan( project_root, skill_plan=skill_plan_payload, ) pass_2_prompt = ( f"{cli.COMPILE_PROMPT_2}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan JSON file: {skill_plan_path or cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Allowed edit scope in pass 2:\n" "- skills/...\n\n" "Skill plan JSON payload:\n" f"{json.dumps(skill_plan_payload, indent=2)}\n" ) pass_2_before_snapshot = cli._snapshot_skills_tree(project_root) pass_2 = cli._run_compile_provider_pass( project_root, prompt=pass_2_prompt, pass_index=2, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_2.pop("assistant_text", None) compile_runs.append(pass_2) pass_2_after_snapshot = cli._snapshot_skills_tree(project_root) pass_2_diff = cli._diff_skills_tree_snapshot( pass_2_before_snapshot, pass_2_after_snapshot, ) pass_scope_diffs["pass_2"] = pass_2_diff cli._assert_skills_change_scope( change_diff=pass_2_diff, allowed_prefixes=["skills"], stage_label="compile pass 2", ) pass_3_prompt = ( f"{cli.COMPILE_PROMPT_3}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan JSON file: {skill_plan_path or cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Allowed edit scope in pass 3:\n" "- skills/...\n" "- skills/.compile_report.json (optional notes)\n" ) pass_3_before_snapshot = cli._snapshot_skills_tree(project_root) pass_3 = cli._run_compile_provider_pass( project_root, prompt=pass_3_prompt, pass_index=3, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_3.pop("assistant_text", None) compile_runs.append(pass_3) pass_3_after_snapshot = cli._snapshot_skills_tree(project_root) pass_3_diff = cli._diff_skills_tree_snapshot( pass_3_before_snapshot, pass_3_after_snapshot, ) pass_scope_diffs["pass_3"] = pass_3_diff cli._assert_skills_change_scope( change_diff=pass_3_diff, allowed_prefixes=["skills"], stage_label="compile pass 3", ) validation_payload = cli._validate_compiled_skills( project_root, profile=profile_payload, core_skill_count=core_skill_count, skill_plan=skill_plan_payload, source_inventory=( evidence_payload.get("source_inventory") if isinstance(evidence_payload, dict) else None ), previous_source_inventory=previous_source_inventory, ) compile_report_path = cli._write_compile_report( project_root, payload={ "mode": run_mode, "run_id": run_id, "compiled_package_id": package_id, "project_root": str(project_root), "compile_memory_path": compile_memory_path, "profile": profile_payload, "skill_plan": skill_plan_payload, "skill_plan_path": skill_plan_path, "generation": generation_result, "evidence": evidence_payload, "pass_scope_diffs": pass_scope_diffs, "passes": compile_runs, "validation": validation_payload, }, ) memory_update_payload = cli._record_compile_memory_run( project_root, package_id=package_id, mode=run_mode, run_id=run_id, run_goal=run_goal, skill_plan=skill_plan_payload, pass_scope_diffs=pass_scope_diffs, evidence=evidence_payload, validation=validation_payload, compile_report_path=compile_report_path, ) if strict_compile_validation and not bool(validation_payload.get("ok", False)): errors = validation_payload.get("errors") if isinstance(errors, list) and errors: summary = "; ".join(str(item) for item in errors[:5]) else: summary = "unknown validation error" raise cli.PackageError(f"Compile validation failed: {summary}") finally: if not keep_compile_artifacts: shutil.rmtree(tool_dest, ignore_errors=True) if not keep_compile_artifacts and tool_dest.exists(): raise cli.PackageError( f"Failed to clean up temporary tool directory: {tool_dest}" ) installed: dict[str, object] | None = None router_sync: dict[str, object] | None = None active: str | None = None if not install_off and scipkg_root is not None: installed = cli.install_from_local_path( scipkg_root, package_id, local_path=project_root, title=args.title, activate=args.activate, force=False, ) if not args.no_router_sync: router_sync = cli.sync_router_rules(scipkg_root) active_raw = cli.load_registry(scipkg_root).get("active_package") if isinstance(active_raw, str): active = active_raw payload = { "compiled_package_id": package_id, "project_root": str(project_root), "git_repo_initialized": git_repo_initialized, "run_id": run_id, "compile_runs": compile_runs, "compile_memory": compile_memory_path, "compile_memory_update": memory_update_payload, "compile_profile": profile_payload, "skill_plan": skill_plan_payload, "skill_plan_path": skill_plan_path, "generation": generation_result, "evidence": evidence_payload, "pass_scope_diffs": pass_scope_diffs, "previous_source_inventory": previous_source_inventory, "validation": validation_payload, "validation_enforced": strict_compile_validation, "compile_report": compile_report_path, "installed": installed, "active_package": active, "router_sync": router_sync, "install_off": install_off, "scipkg_root": str(scipkg_root) if isinstance(scipkg_root, Path) else None, } source_links_total = validation_payload.get("source_links_total", 0) validation_ok = bool(validation_payload.get("ok", False)) validation_errors = validation_payload.get("errors", []) error_count = len(validation_errors) if isinstance(validation_errors, list) else 0 lines = [ f"Compiled skills for '{package_id}' from {project_root}.", f"Validated skills with {source_links_total} source-code links.", ( "Validation status: ok." if validation_ok else ( f"Validation status: {error_count} finding(s) (non-blocking). " "Use --strict-compile-validation to enforce failures." ) ), ( "Install step skipped (--install-off); updated local skills only." if install_off else ( f"Installed to scientific packages. Active package: {active}." if isinstance(active, str) and active else "Installed to scientific packages." ) ), ] if git_repo_initialized: lines.insert(1, f"Initialized git repository at {project_root} (missing .git).") cli._emit_output(args, payload, lines) return 0
[docs] def cmd_recompile(args: argparse.Namespace) -> int: """ Execute the `recompile` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() package_id = cli.normalize_package_id(args.package_id) managed_project_root: Path | None = None raw_project_path = getattr(args, "project_path", None) if isinstance(raw_project_path, str) and raw_project_path.strip(): project_root = cli._resolve_project_path(raw_project_path) else: managed_project_root = cli.resolve_scipkg_root() project_root = (managed_project_root / "packages" / package_id).resolve() if not project_root.exists() or not project_root.is_dir(): raise cli.PackageError(f"Recompile path is not a directory: {project_root}") git_repo_initialized = cli._ensure_compile_repo_ready(project_root) raw_memory_path = str(getattr(args, "memory", "") or "").strip() raw_memory_scope_value = getattr(args, "memory_scope", None) raw_memory_scope = str(raw_memory_scope_value or "all").strip() raw_doc_path = str(getattr(args, "doc", "") or "").strip() raw_data_dir = str(getattr(args, "data_dir", "") or "").strip() comment_text = " ".join(str(getattr(args, "comment", "") or "").split()).strip() memory_mode_enabled = bool(raw_memory_path) memory_scope = cli._normalize_recompile_memory_scope(raw_memory_scope) memory_scope_label = cli._render_recompile_memory_scope(memory_scope) install_off = bool(getattr(args, "install_off", False)) or memory_mode_enabled scipkg_root: Path | None = None if not install_off: if managed_project_root is not None: scipkg_root = managed_project_root else: scipkg_root = cli.resolve_scipkg_root() resolved_memory_path: Path | None = None resolved_doc_path: Path | None = None resolved_data_dir: Path | None = None if not memory_mode_enabled and raw_memory_scope_value is not None: raise cli.PackageError("--memory-scope requires --memory.") if memory_mode_enabled and (raw_doc_path or raw_data_dir or comment_text): raise cli.PackageError( "--memory cannot be combined with --doc/--data-dir/--comment." ) if raw_memory_path: resolved_memory_path = cli._resolve_project_path(raw_memory_path) if not resolved_memory_path.exists(): raise cli.PackageError(f"--memory does not exist: {resolved_memory_path}") if not resolved_memory_path.is_file() and not resolved_memory_path.is_dir(): raise cli.PackageError( f"--memory must be a file or directory: {resolved_memory_path}" ) if comment_text and not raw_doc_path: raise cli.PackageError("--comment requires --doc.") if raw_data_dir and not raw_doc_path: raise cli.PackageError("--data-dir requires --doc.") if raw_doc_path: resolved_doc_path = cli._resolve_project_path(raw_doc_path) if not resolved_doc_path.exists(): raise cli.PackageError(f"--doc does not exist: {resolved_doc_path}") if not resolved_doc_path.is_file(): raise cli.PackageError(f"--doc must be a file: {resolved_doc_path}") try: resolved_doc_path.read_text(encoding="utf-8", errors="replace") except OSError as exc: raise cli.PackageError( f"--doc is not readable: {resolved_doc_path}: {exc}" ) from exc if raw_data_dir: resolved_data_dir = cli._resolve_project_path(raw_data_dir) if not resolved_data_dir.exists(): raise cli.PackageError(f"--data-dir does not exist: {resolved_data_dir}") if not resolved_data_dir.is_dir(): raise cli.PackageError( f"--data-dir must be a directory: {resolved_data_dir}" ) try: next(resolved_data_dir.iterdir(), None) except OSError as exc: raise cli.PackageError( f"--data-dir is not readable: {resolved_data_dir}: {exc}" ) from exc paper_mode_enabled = not memory_mode_enabled and bool( isinstance(resolved_doc_path, Path) or isinstance(resolved_data_dir, Path) or bool(comment_text) ) paper_data_context: dict[str, object] | None = None paper_staged_assets: dict[str, object] | None = None if paper_mode_enabled: paper_run_dir = ( project_root / cli.COMPILE_EVIDENCE_DIR_REL_PATH / "paper_context" ) paper_data_context = cli._resolve_invocation_data_context( repo_dir=project_root, run_dir=paper_run_dir, workflow_name="recompile", args=args, ) if bool(paper_data_context.get("enabled")): paper_data_context = cli._prepare_workflow_data_artifacts( repo_dir=project_root, run_dir=paper_run_dir, data_context=paper_data_context, ) paper_staged_assets = cli._stage_recompile_paper_assets( project_root, data_context=paper_data_context, ) core_skill_count = int(getattr(args, "core_skill_count", 6)) if core_skill_count < 1: raise cli.PackageError("--core-skill-count must be >= 1.") docs_only_override = bool(getattr(args, "docs_only", False)) keep_compile_artifacts = bool(getattr(args, "keep_compile_artifacts", False)) strict_compile_validation = bool(getattr(args, "strict_compile_validation", False)) # Recompile is an in-place refresh workflow and should always replace the # installed package payload for the same package id. force_install = True skills_root = project_root / "skills" if not skills_root.is_dir(): raise cli.PackageError( f"Recompile requires an existing skills/ folder: {skills_root}" ) tool_source = cli._resolve_compile_tool_source() if not tool_source.is_dir(): raise cli.PackageError(f"Missing compile tool source: {tool_source}") runtime_policy = cli.resolve_agent_runtime_policy() provider = runtime_policy.provider provider_bin_override = cli.resolve_provider_binary_override( provider, raw_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE, ) provider_bin = cli.resolve_provider_binary( provider, provider_bin_override=provider_bin_override, ) tool_dest = project_root / "sci-skills-generator" if tool_dest.exists(): raise cli.PackageError( f"Recompile path already contains {tool_dest.name}/. " "Remove it first or choose a different path." ) run_mode = ( "recompile_memory_plan" if memory_mode_enabled else ("recompile_paper" if paper_mode_enabled else "recompile") ) run_id = _build_compile_run_id("recompile") run_goal = ( ( "Generate append-only package-specific skills update plan from unified memory suggestions." if memory_scope == "package_specific" else ( "Generate append-only machine-specific skills update plan from unified memory suggestions." if memory_scope == "machine_specific" else "Generate append-only skills update plan from unified memory suggestions." ) ) if memory_mode_enabled else ( "Recompile paper tutorial and refresh package skills." if paper_mode_enabled else "Refresh existing skills with targeted coverage updates and audit." ) ) compile_memory_path = cli._reset_compile_memory_short_term( project_root, package_id=package_id, mode=run_mode, run_id=run_id, run_goal=run_goal, ) previous_source_inventory = cli._load_previous_source_inventory(project_root) shutil.copytree(tool_source, tool_dest) compile_runs: list[dict[str, object]] = [] profile_payload: dict[str, object] = {} skill_plan_payload: dict[str, object] = {} skill_plan_path = "" memory_suggestions_payload: dict[str, object] | None = None memory_plan_payload: dict[str, object] | None = None memory_apply_payload: dict[str, object] | None = None memory_plan_path = "" paper_context_payload: dict[str, object] | None = None paper_plan_payload: dict[str, object] | None = None paper_plan_rel = "" paper_skill_id = "" pass_scope_diffs: dict[str, dict[str, list[str]]] = {} evidence_payload: dict[str, object] = {} validation_payload: dict[str, object] = {} paper_validation_payload: dict[str, object] | None = None memory_update_payload: dict[str, object] = {} compile_report_path = "" manuscript_text = "" manuscript_source = "" if memory_mode_enabled: if not isinstance(resolved_memory_path, Path): raise cli.PackageError( "Internal recompile memory-mode error: --memory path missing after validation." ) try: memory_suggestions_payload = cli._collect_recompile_memory_suggestions( project_root, package_id=package_id, memory_path=resolved_memory_path, memory_scope=memory_scope, ) available_skill_ids = cli._list_skill_ids(project_root) suggestions = ( memory_suggestions_payload.get("suggestions") if isinstance(memory_suggestions_payload, dict) else [] ) suggestion_items = suggestions if isinstance(suggestions, list) else [] suggestion_payload_json = json.dumps( suggestion_items[:80], indent=2, ) memory_scope_rule = ( "Scope rule: include only package-specific machine-independent/shareable updates; " "do not create or modify `skills/user-specific-settings/SKILL.md`.\n" if memory_scope == "package_specific" else ( "Scope rule: include only machine-specific updates; route accepted items to " "`skills/user-specific-settings/SKILL.md`.\n" if memory_scope == "machine_specific" else "Scope rule: include both package-specific and machine-specific updates.\n" ) ) pass_1_prompt = ( f"{cli.RECOMPILE_MEMORY_PROMPT_1_PLAN}\n\n" f"Package id: {package_id}\n" f"Memory scope: {memory_scope_label}\n" f"Compile memory file: {compile_memory_path}\n" f"Memory-plan output file: {cli.RECOMPILE_MEMORY_PLAN_REL_PATH}\n" f"Memory input path: {resolved_memory_path}\n" f"Memory source files: {json.dumps(memory_suggestions_payload.get('memory_sources', []), indent=2)}\n" f"Existing skill ids: {json.dumps(available_skill_ids, indent=2)}\n" f"{memory_scope_rule}" f"Filtered suggested updates payload ({len(suggestion_items)} entries; truncated to 80 below):\n" f"{suggestion_payload_json}\n" ) pass_1_before_snapshot = cli._snapshot_skills_tree(project_root) pass_1 = cli._run_compile_provider_pass( project_root, prompt=pass_1_prompt, pass_index=1, total_passes=1, provider=provider, provider_bin=provider_bin, ) pass_1_assistant_text = str(pass_1.pop("assistant_text", "") or "") compile_runs.append(pass_1) raw_memory_plan = cli._extract_recompile_memory_plan_from_assistant_text( pass_1_assistant_text ) if raw_memory_plan is None: raise cli.PackageError( "Memory-mode recompile pass response must include " f"<{cli.RECOMPILE_MEMORY_PLAN_TAG}>...</{cli.RECOMPILE_MEMORY_PLAN_TAG}>." ) memory_plan_payload = cli._normalize_recompile_memory_plan( raw_memory_plan, package_id=package_id, suggestions=suggestion_items, available_skill_ids=available_skill_ids, memory_scope=memory_scope, ) pass_1_after_snapshot = cli._snapshot_skills_tree(project_root) pass_1_diff = cli._diff_skills_tree_snapshot( pass_1_before_snapshot, pass_1_after_snapshot, ) pass_scope_diffs["pass_1"] = pass_1_diff cli._assert_skills_change_scope( change_diff=pass_1_diff, allowed_prefixes=[], stage_label="recompile memory pass 1", ) memory_plan_path = cli._write_recompile_memory_plan( project_root, memory_plan=memory_plan_payload, ) plan_apply_before_snapshot = cli._snapshot_skills_tree(project_root) memory_apply_payload = cli._apply_recompile_memory_plan( project_root, package_id=package_id, memory_plan=memory_plan_payload, memory_scope=memory_scope, ) plan_apply_after_snapshot = cli._snapshot_skills_tree(project_root) plan_apply_diff = cli._diff_skills_tree_snapshot( plan_apply_before_snapshot, plan_apply_after_snapshot, ) pass_scope_diffs["plan_apply"] = plan_apply_diff cli._assert_skills_change_scope( change_diff=plan_apply_diff, allowed_prefixes=["skills"], stage_label="recompile memory apply", ) memory_plan_path = cli._write_recompile_memory_plan( project_root, memory_plan=memory_plan_payload, ) validation_payload = { "ok": True, "errors": [], "warnings": ( list(memory_apply_payload.get("warnings") or []) if isinstance(memory_apply_payload, dict) else [] ), "source_links_total": 0, "mode": "memory_plan_only", } compile_report_path = cli._write_compile_report( project_root, payload={ "mode": run_mode, "run_id": run_id, "recompiled_package_id": package_id, "project_root": str(project_root), "compile_memory_path": compile_memory_path, "memory_input": str(resolved_memory_path), "memory_scope": memory_scope_label, "memory_suggestions": memory_suggestions_payload, "memory_plan": memory_plan_payload, "memory_plan_path": memory_plan_path, "memory_apply": memory_apply_payload, "passes": compile_runs, "pass_scope_diffs": pass_scope_diffs, "validation": validation_payload, }, ) memory_update_payload = cli._record_compile_memory_run( project_root, package_id=package_id, mode=run_mode, run_id=run_id, run_goal=run_goal, skill_plan=None, pass_scope_diffs=pass_scope_diffs, evidence=memory_suggestions_payload, validation=validation_payload, compile_report_path=compile_report_path, ) finally: if not keep_compile_artifacts: shutil.rmtree(tool_dest, ignore_errors=True) if not keep_compile_artifacts and tool_dest.exists(): raise cli.PackageError( f"Failed to clean up temporary tool directory: {tool_dest}" ) suggestions_total = ( len(memory_suggestions_payload.get("suggestions", [])) if isinstance(memory_suggestions_payload, dict) and isinstance(memory_suggestions_payload.get("suggestions"), list) else 0 ) operations_total = ( len(memory_plan_payload.get("operations", [])) if isinstance(memory_plan_payload, dict) and isinstance(memory_plan_payload.get("operations"), list) else 0 ) applied_total = ( int(memory_apply_payload.get("applied_count") or 0) if isinstance(memory_apply_payload, dict) else 0 ) modified_files_total = ( len(memory_apply_payload.get("modified_files", [])) if isinstance(memory_apply_payload, dict) and isinstance(memory_apply_payload.get("modified_files"), list) else 0 ) payload = { "recompiled_package_id": package_id, "project_root": str(project_root), "run_id": run_id, "run_mode": run_mode, "memory_mode": True, "memory_input_path": str(resolved_memory_path), "memory_scope": memory_scope_label, "compile_memory": compile_memory_path, "compile_memory_update": memory_update_payload, "memory_suggestions": memory_suggestions_payload, "memory_plan": memory_plan_payload, "memory_plan_path": memory_plan_path or None, "memory_apply": memory_apply_payload, "pass_scope_diffs": pass_scope_diffs, "compile_runs": compile_runs, "compile_report": compile_report_path, "validation": validation_payload, "validation_enforced": strict_compile_validation, "install_off": True, "installed": None, "active_package": None, "router_sync": None, "scipkg_root": None, } lines = [ f"Generated recompile memory update plan for '{package_id}' from {project_root}.", f"Memory scope: {memory_scope_label}.", f"Collected {suggestions_total} matching suggested skills updates from memory files after scope filtering.", f"Planned {operations_total} append-only skill updates in {memory_plan_path or cli.RECOMPILE_MEMORY_PLAN_REL_PATH}.", f"Applied {applied_total} append-only updates across {modified_files_total} skill file(s).", "Install step skipped for memory-plan mode; package registry/install were not modified.", ] cli._emit_output(args, payload, lines) return 0 if paper_mode_enabled and isinstance(resolved_doc_path, Path): try: manuscript_text = resolved_doc_path.read_text( encoding="utf-8", errors="replace" ) except OSError as exc: raise cli.PackageError( f"Failed to read --doc file: {resolved_doc_path}: {exc}" ) from exc if not manuscript_text.strip(): raise cli.PackageError(f"--doc is empty: {resolved_doc_path}") try: manuscript_source = str(resolved_doc_path.relative_to(project_root)) except ValueError: manuscript_source = str(resolved_doc_path) try: pass_1_prompt = ( f"{cli.RECOMPILE_PROMPT_1}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan output file: {cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Read compile memory first and keep this run plan consistent with prior gaps." ) if paper_mode_enabled: scope_text = ( comment_text if comment_text else "No --comment provided; include all key paper results." ) pass_1_prompt = ( f"{cli.RECOMPILE_PAPER_PROMPT_1_PLAN}\n\n" f"Package id: {package_id}\n" f"Paper source file: {manuscript_source}\n" f"Scope directive: {scope_text}\n" f"Write paper plan JSON to: {cli.RECOMPILE_PAPER_PLAN_REL_PATH}\n" f"Write compile profile JSON to: {cli.COMPILE_PROFILE_REL_PATH}\n" f"Paper context directory: {cli.RECOMPILE_PAPER_CONTEXT_DIR_REL_PATH}\n\n" "Original manuscript content:\n" f"{manuscript_text.strip()}\n" ) pass_1_prompt += ( f"\nCompile memory file: {compile_memory_path}\n" "Read compile memory first and align paper tutorial edits with prior gaps.\n" ) if isinstance(paper_data_context, dict) and bool( paper_data_context.get("enabled") ): artifacts = ( paper_data_context.get("artifacts") if isinstance(paper_data_context.get("artifacts"), dict) else {} ) summary_rel = str(artifacts.get("summary") or "").strip() manifest_rel = str( artifacts.get("manifest_compact") or artifacts.get("manifest") or "" ).strip() if summary_rel: pass_1_prompt += f"\nData summary file available: {summary_rel}\n" if manifest_rel: pass_1_prompt += ( f"Compact data manifest file available: {manifest_rel}\n" ) pass_1 = cli._run_compile_provider_pass( project_root, prompt=pass_1_prompt, pass_index=1, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_1_assistant_text = str(pass_1.pop("assistant_text", "") or "") compile_runs.append(pass_1) if paper_mode_enabled: raw_plan_payload = cli._extract_recompile_paper_plan_from_assistant_text( pass_1_assistant_text ) if raw_plan_payload is None: raise cli.PackageError( f"Paper-mode recompile pass 1 response must include " f"<{cli.RECOMPILE_PAPER_PLAN_TAG}>...</{cli.RECOMPILE_PAPER_PLAN_TAG}>." ) paper_plan_payload = cli._normalize_recompile_paper_plan( raw_plan_payload, paper_source=manuscript_source or str(resolved_doc_path), package_id=package_id, scope_comment=comment_text or None, ) paper_plan_rel = cli._write_recompile_paper_plan( project_root, paper_plan=paper_plan_payload, ) if not isinstance(resolved_doc_path, Path): raise cli.PackageError( "Internal paper-mode error: --doc path missing after validation." ) paper_skill_id = cli._derive_recompile_paper_skill_id( project_root, package_id=package_id, doc_path=resolved_doc_path, comment=comment_text or None, paper_plan=paper_plan_payload, ) cli._ensure_recompile_paper_skill_scaffold( project_root, skill_id=paper_skill_id, paper_plan=paper_plan_payload, ) cli._initialize_recompile_paper_sidecar_files( project_root, paper_plan=paper_plan_payload, paper_skill_id=paper_skill_id, ) profile_payload = cli._load_compile_profile( project_root, default_package_name=package_id, assistant_text=pass_1_assistant_text, ) if docs_only_override: profile_payload["docs_only"] = True if not paper_mode_enabled: skill_plan_payload = cli._load_compile_skill_plan( project_root, package_id=package_id, mode=run_mode, assistant_text=pass_1_assistant_text, ) skill_plan_path = cli._write_compile_skill_plan( project_root, skill_plan=skill_plan_payload, ) if isinstance(resolved_doc_path, Path): paper_context_payload = cli._build_recompile_paper_context( project_root, doc_path=resolved_doc_path, comment=comment_text or None, data_context=paper_data_context, staged_assets=paper_staged_assets, paper_plan=paper_plan_payload, paper_skill_id=paper_skill_id, ) evidence_payload = cli._build_recompile_evidence_bundle( project_root, profile=profile_payload, core_skill_count=core_skill_count, ) if isinstance(paper_context_payload, dict): evidence_payload["paper_context"] = paper_context_payload if isinstance(paper_staged_assets, dict): evidence_payload["paper_staged_assets"] = paper_staged_assets if not paper_mode_enabled: available_skill_ids = cli._list_skill_ids(project_root) core_skill_ids_raw = evidence_payload.get("core_skills") core_skill_ids = ( list(core_skill_ids_raw) if isinstance(core_skill_ids_raw, list) else [] ) skill_plan_payload = cli._normalize_compile_skill_plan( skill_plan_payload, package_id=package_id, mode=run_mode, available_skill_ids=available_skill_ids, core_skill_ids=core_skill_ids, ) skill_plan_path = cli._write_compile_skill_plan( project_root, skill_plan=skill_plan_payload, ) pass_2_prompt = ( f"{cli.RECOMPILE_PROMPT_2}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan JSON file: {skill_plan_path or cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Allowed edit scope in pass 2:\n" "- skills/...\n" ) if not paper_mode_enabled and isinstance(skill_plan_payload, dict): pass_2_prompt += ( "\nSkill plan JSON payload:\n" f"{json.dumps(skill_plan_payload, indent=2)}\n" ) pass_2_before_snapshot: dict[str, str] | None = cli._snapshot_skills_tree( project_root ) if paper_mode_enabled: if not isinstance(paper_plan_payload, dict): raise cli.PackageError( "Internal paper-mode error: paper plan payload missing after pass 1." ) pass_2_before_snapshot = cli._snapshot_recompile_paper_skills(project_root) pass_2_prompt = ( f"{cli.RECOMPILE_PAPER_PROMPT_2_TUTORIAL}\n\n" f"Package id: {package_id}\n" f"Paper plan JSON file: {paper_plan_rel}\n" f"Target tutorial skill id: {paper_skill_id}\n" f"Target tutorial skill root: skills/{paper_skill_id}/\n" f"Figure-data map file: {cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH}\n" f"Paper skill manifest file: {cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH}\n" f"Staged assets manifest: {cli.RECOMPILE_PAPER_STAGED_ASSETS_MANIFEST_REL_PATH}\n" f"Staged assets root: {cli.RECOMPILE_PAPER_STAGED_ASSETS_DIR_REL_PATH}\n\n" f"Compile memory file: {compile_memory_path}\n" "Allowed edit scope in pass 2:\n" f"- skills/{paper_skill_id}/...\n" f"- {cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH}\n" f"- {cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH}\n\n" "Paper plan JSON payload:\n" f"{json.dumps(paper_plan_payload, indent=2)}\n" ) if isinstance(paper_data_context, dict) and bool( paper_data_context.get("enabled") ): artifacts = ( paper_data_context.get("artifacts") if isinstance(paper_data_context.get("artifacts"), dict) else {} ) summary_rel = str(artifacts.get("summary") or "").strip() manifest_rel = str( artifacts.get("manifest_compact") or artifacts.get("manifest") or "" ).strip() manifest_full_rel = str(artifacts.get("manifest_full") or "").strip() if summary_rel: pass_2_prompt += f"\nData summary file: {summary_rel}\n" if manifest_rel: pass_2_prompt += f"Compact manifest file: {manifest_rel}\n" if manifest_full_rel: pass_2_prompt += f"Full manifest file: {manifest_full_rel}\n" pass_2 = cli._run_compile_provider_pass( project_root, prompt=pass_2_prompt, pass_index=2, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_2.pop("assistant_text", None) compile_runs.append(pass_2) if not paper_mode_enabled and isinstance(pass_2_before_snapshot, dict): pass_2_after_snapshot = cli._snapshot_skills_tree(project_root) pass_2_diff = cli._diff_skills_tree_snapshot( pass_2_before_snapshot, pass_2_after_snapshot, ) pass_scope_diffs["pass_2"] = pass_2_diff cli._assert_skills_change_scope( change_diff=pass_2_diff, allowed_prefixes=["skills"], stage_label="recompile pass 2", ) if paper_mode_enabled and isinstance(pass_2_before_snapshot, dict): pass_2_after_snapshot = cli._snapshot_recompile_paper_skills(project_root) pass_2_diff = cli._diff_recompile_paper_skills_snapshot( pass_2_before_snapshot, pass_2_after_snapshot, ) pass_scope_diffs["pass_2"] = pass_2_diff cli._assert_recompile_paper_change_scope( change_diff=pass_2_diff, allowed_prefixes=[ f"skills/{paper_skill_id}", cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH, cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH, ], stage_label="paper-mode pass 2", ) pass_3_prompt = ( f"{cli.RECOMPILE_PROMPT_3}\n\n" f"Compile memory file: {compile_memory_path}\n" f"Skill plan JSON file: {skill_plan_path or cli.COMPILE_SKILL_PLAN_REL_PATH}\n" "Allowed edit scope in pass 3:\n" "- skills/...\n" "- skills/.compile_report.json (optional notes)\n" ) pass_3_before_snapshot: dict[str, str] | None = cli._snapshot_skills_tree( project_root ) if paper_mode_enabled: pass_3_before_snapshot = cli._snapshot_recompile_paper_skills(project_root) figure_data_map_text = "" figure_data_map_path = ( project_root / cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH ) if figure_data_map_path.is_file(): figure_data_map_text = figure_data_map_path.read_text( encoding="utf-8", errors="replace" ) pass_3_prompt = ( f"{cli.RECOMPILE_PAPER_PROMPT_3_AUDIT}\n\n" f"Package id: {package_id}\n" f"Paper plan JSON file: {paper_plan_rel}\n" f"Paper context file: {cli.RECOMPILE_PAPER_CONTEXT_REL_PATH}\n" f"Figure-data map file: {cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH}\n" f"Paper skill manifest file: {cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH}\n" f"Target tutorial skill id: {paper_skill_id}\n" f"Optional manuscript file for double check: {manuscript_source}\n" f"Compile memory file: {compile_memory_path}\n" ) if isinstance(resolved_data_dir, Path): pass_3_prompt += ( f"Optional supplementary data dir: {resolved_data_dir}\n" ) pass_3_prompt += ( "\nAllowed edit scope in pass 3:\n" f"- skills/{paper_skill_id}/...\n" "- skills/*-index/SKILL.md and related index references for routing update\n" f"- {cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH}\n" f"- {cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH}\n\n" "Paper plan JSON payload:\n" f"{json.dumps(paper_plan_payload, indent=2)}\n" ) if figure_data_map_text.strip(): pass_3_prompt += ( "\nCurrent figure-data map JSON payload:\n" f"{figure_data_map_text.strip()}\n" ) pass_3 = cli._run_compile_provider_pass( project_root, prompt=pass_3_prompt, pass_index=3, total_passes=3, provider=provider, provider_bin=provider_bin, ) pass_3.pop("assistant_text", None) compile_runs.append(pass_3) if not paper_mode_enabled and isinstance(pass_3_before_snapshot, dict): pass_3_after_snapshot = cli._snapshot_skills_tree(project_root) pass_3_diff = cli._diff_skills_tree_snapshot( pass_3_before_snapshot, pass_3_after_snapshot, ) pass_scope_diffs["pass_3"] = pass_3_diff cli._assert_skills_change_scope( change_diff=pass_3_diff, allowed_prefixes=["skills"], stage_label="recompile pass 3", ) if paper_mode_enabled and isinstance(pass_3_before_snapshot, dict): pass_3_after_snapshot = cli._snapshot_recompile_paper_skills(project_root) pass_3_diff = cli._diff_recompile_paper_skills_snapshot( pass_3_before_snapshot, pass_3_after_snapshot, ) pass_scope_diffs["pass_3"] = pass_3_diff index_prefixes = [] for entry in skills_root.iterdir(): if not entry.is_dir() or not entry.name.endswith("-index"): continue if not (entry / "SKILL.md").is_file(): continue index_prefixes.append(f"skills/{entry.name}") cli._assert_recompile_paper_change_scope( change_diff=pass_3_diff, allowed_prefixes=[ f"skills/{paper_skill_id}", *index_prefixes, cli.RECOMPILE_PAPER_FIGURE_DATA_MAP_REL_PATH, cli.RECOMPILE_PAPER_SKILL_MANIFEST_REL_PATH, ], stage_label="paper-mode pass 3", ) validation_payload = cli._validate_compiled_skills( project_root, profile=profile_payload, core_skill_count=core_skill_count, skill_plan=( skill_plan_payload if isinstance(skill_plan_payload, dict) else None ), source_inventory=( evidence_payload.get("source_inventory") if isinstance(evidence_payload, dict) else None ), previous_source_inventory=previous_source_inventory, ) if paper_mode_enabled: paper_validation_payload = cli._validate_recompile_paper_outputs( project_root, paper_plan=paper_plan_payload, paper_skill_id=paper_skill_id, doc_path=resolved_doc_path, data_dir=resolved_data_dir, staged_assets=paper_staged_assets, ) validation_payload["paper"] = paper_validation_payload paper_errors = paper_validation_payload.get("errors") if isinstance(paper_errors, list) and paper_errors: existing_errors = validation_payload.get("errors") merged_errors = ( list(existing_errors) if isinstance(existing_errors, list) else [] ) merged_errors.extend( f"[paper] {str(item)}" for item in paper_errors if str(item).strip() ) validation_payload["errors"] = merged_errors paper_warnings = paper_validation_payload.get("warnings") if isinstance(paper_warnings, list) and paper_warnings: existing_warnings = validation_payload.get("warnings") merged_warnings = ( list(existing_warnings) if isinstance(existing_warnings, list) else [] ) merged_warnings.extend( f"[paper] {str(item)}" for item in paper_warnings if str(item).strip() ) validation_payload["warnings"] = merged_warnings validation_payload["ok"] = bool( validation_payload.get("ok", False) ) and bool(paper_validation_payload.get("ok", False)) compile_report_path = cli._write_compile_report( project_root, payload={ "mode": run_mode, "run_id": run_id, "recompiled_package_id": package_id, "project_root": str(project_root), "compile_memory_path": compile_memory_path, "profile": profile_payload, "skill_plan": ( skill_plan_payload if isinstance(skill_plan_payload, dict) else None ), "skill_plan_path": skill_plan_path or None, "paper_context": paper_context_payload, "paper_plan": paper_plan_payload, "paper_plan_path": paper_plan_rel, "paper_skill_id": paper_skill_id, "pass_scope_diffs": pass_scope_diffs, "paper_pass_scope_diffs": pass_scope_diffs, "paper_staged_assets": paper_staged_assets, "paper_validation": paper_validation_payload, "evidence": evidence_payload, "previous_source_inventory": previous_source_inventory, "passes": compile_runs, "validation": validation_payload, }, ) memory_update_payload = cli._record_compile_memory_run( project_root, package_id=package_id, mode=run_mode, run_id=run_id, run_goal=run_goal, skill_plan=( skill_plan_payload if isinstance(skill_plan_payload, dict) else None ), pass_scope_diffs=pass_scope_diffs, evidence=evidence_payload, validation=validation_payload, compile_report_path=compile_report_path, ) if strict_compile_validation and not bool(validation_payload.get("ok", False)): errors = validation_payload.get("errors") if isinstance(errors, list) and errors: summary = "; ".join(str(item) for item in errors[:5]) else: summary = "unknown validation error" raise cli.PackageError(f"Recompile validation failed: {summary}") finally: if not keep_compile_artifacts: shutil.rmtree(tool_dest, ignore_errors=True) if not keep_compile_artifacts and tool_dest.exists(): raise cli.PackageError( f"Failed to clean up temporary tool directory: {tool_dest}" ) installed: dict[str, object] | None = None router_sync: dict[str, object] | None = None active: str | None = None if not install_off and scipkg_root is not None: installed = cli.install_from_local_path( scipkg_root, package_id, local_path=project_root, title=args.title, activate=args.activate, force=force_install, ) if not args.no_router_sync: router_sync = cli.sync_router_rules(scipkg_root) active_raw = cli.load_registry(scipkg_root).get("active_package") if isinstance(active_raw, str): active = active_raw payload = { "recompiled_package_id": package_id, "project_root": str(project_root), "git_repo_initialized": git_repo_initialized, "run_id": run_id, "run_mode": run_mode, "doc_path": ( str(resolved_doc_path) if isinstance(resolved_doc_path, Path) else None ), "data_dir": ( str(resolved_data_dir) if isinstance(resolved_data_dir, Path) else None ), "comment": comment_text or None, "compile_memory": compile_memory_path, "compile_memory_update": memory_update_payload, "skill_plan": ( skill_plan_payload if isinstance(skill_plan_payload, dict) else None ), "skill_plan_path": skill_plan_path or None, "paper_mode": paper_mode_enabled, "paper_data_context": ( paper_data_context if isinstance(paper_data_context, dict) else None ), "paper_context": ( paper_context_payload if isinstance(paper_context_payload, dict) else None ), "paper_plan": ( paper_plan_payload if isinstance(paper_plan_payload, dict) else None ), "paper_plan_path": paper_plan_rel or None, "paper_skill_id": paper_skill_id or None, "pass_scope_diffs": pass_scope_diffs, "paper_pass_scope_diffs": pass_scope_diffs, "paper_staged_assets": ( paper_staged_assets if isinstance(paper_staged_assets, dict) else None ), "paper_validation": ( paper_validation_payload if isinstance(paper_validation_payload, dict) else None ), "compile_runs": compile_runs, "compile_profile": profile_payload, "evidence": evidence_payload, "previous_source_inventory": previous_source_inventory, "validation": validation_payload, "validation_enforced": strict_compile_validation, "compile_report": compile_report_path, "installed": installed, "active_package": active, "router_sync": router_sync, "force_install": force_install, "install_off": install_off, "scipkg_root": str(scipkg_root) if isinstance(scipkg_root, Path) else None, } source_links_total = validation_payload.get("source_links_total", 0) validation_ok = bool(validation_payload.get("ok", False)) validation_errors = validation_payload.get("errors", []) error_count = len(validation_errors) if isinstance(validation_errors, list) else 0 lines = [ f"Recompiled skills for '{package_id}' from {project_root}.", f"Validated skills with {source_links_total} source-code links.", ( "Validation status: ok." if validation_ok else ( f"Validation status: {error_count} finding(s) (non-blocking). " "Use --strict-compile-validation to enforce failures." ) ), ( "Install step skipped (--install-off); updated local skills only." if install_off else ( f"Installed to scientific packages. Active package: {active}." if isinstance(active, str) and active else "Installed to scientific packages." ) ), ] if git_repo_initialized: lines.insert(1, f"Initialized git repository at {project_root} (missing .git).") cli._emit_output(args, payload, lines) return 0
[docs] def cmd_install(args: argparse.Namespace) -> int: """ Execute the `install` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() raw_package_id = getattr(args, "package_id", None) if isinstance(raw_package_id, list): requested_ids = [ item for item in raw_package_id if isinstance(item, str) and item.strip() ] elif isinstance(raw_package_id, str) and raw_package_id.strip(): requested_ids = [raw_package_id.strip()] else: requested_ids = [] if not requested_ids: raise cli.PackageError("Package id is required for fermilink install.") requested_version_raw = getattr(args, "version_id", None) requested_version = ( str(requested_version_raw).strip() if isinstance(requested_version_raw, str) else "" ) if requested_version == "": requested_version = None require_verified = bool(getattr(args, "require_verified", False)) if requested_version and (args.local_path or args.zip_url): raise cli.PackageError("--version only applies to curated channel installs.") if require_verified and (args.local_path or args.zip_url): raise cli.PackageError( "--require-verified only applies to curated channel installs." ) package_ids = [cli.normalize_package_id(item) for item in requested_ids] normalized_channel = cli.normalize_channel_id(args.channel) if len(package_ids) > 1: if args.activate: raise cli.PackageError( "Cannot combine multiple package ids with --activate/--active. " "Install them first, then run `fermilink activate <package_id>`." ) if args.local_path: raise cli.PackageError( "Cannot combine multiple package ids with --local-path." ) if args.zip_url: raise cli.PackageError( "Cannot combine multiple package ids with --zip-url." ) if args.title: raise cli.PackageError("Cannot combine multiple package ids with --title.") if requested_version: raise cli.PackageError( "Cannot combine multiple package ids with --version." ) installed: list[dict[str, object]] = [] sources: dict[str, str] = {} selected_versions: dict[str, str] = {} unverified: list[str] = [] for package_id in package_ids: curated = cli.resolve_curated_package( package_id, channel=normalized_channel ) selected_version = cli.select_package_version(curated) if require_verified and not selected_version.verified: raise cli.PackageError( f"Selected curated version '{selected_version.version_id}' for package " f"'{package_id}' in channel '{normalized_channel}' is not verified. " "Use a verified version or remove --require-verified." ) if not selected_version.verified: unverified.append(f"{package_id}@{selected_version.version_id}") meta = cli.install_from_zip( scipkg_root, package_id, zip_url=selected_version.source_archive_url, title=curated.title, activate=False, force=args.force, max_zip_bytes=args.max_zip_bytes, ) installed_id = str(meta.get("id") or package_id) _save_curated_install_metadata( scipkg_root, installed_id, channel=normalized_channel, curated_package_id=curated.package_id, version_id=selected_version.version_id, source_archive_url=selected_version.source_archive_url, verified=selected_version.verified, source_ref_type=selected_version.source_ref_type, source_ref_value=selected_version.source_ref_value, ) installed.append(meta) sources[installed_id] = str(selected_version.source_archive_url) selected_versions[installed_id] = selected_version.version_id router = None if not args.no_router_sync: router = cli.sync_router_rules(scipkg_root) active = cli.load_registry(scipkg_root).get("active_package") payload = { "installed": installed, "sources": sources, "selected_versions": selected_versions, "require_verified": require_verified, "scipkg_root": str(scipkg_root), "router_sync": router, "active_package": active, } if unverified: payload["unverified_versions"] = unverified summary = ", ".join( str(item.get("id") or "") for item in installed if isinstance(item, dict) ) summary = summary or ", ".join(package_ids) lines = [ f"Installed {len(installed)} packages: {summary}.", ( f"Active package: {active}." if isinstance(active, str) and active else "Active package unchanged." ), ] if unverified: lines.append( "Warning: installed unverified curated versions: " + ", ".join(unverified) + "." ) cli._emit_output(args, payload, lines) return 0 package_id = package_ids[0] title = args.title source: str selected_unverified_label: str | None = None if args.local_path: meta = cli.install_from_local_path( scipkg_root, package_id, local_path=Path(args.local_path), title=title, activate=args.activate, force=args.force, ) source = f"local-path:{Path(args.local_path).expanduser().resolve()}" else: zip_url = args.zip_url selected_version_id: str | None = None selected_version_verified: bool | None = None selected_source_ref: dict[str, str | None] | None = None if not zip_url: curated = cli.resolve_curated_package( package_id, channel=normalized_channel ) selected_version = cli.select_package_version( curated, version_id=requested_version ) if require_verified and not selected_version.verified: raise cli.PackageError( f"Selected curated version '{selected_version.version_id}' for package " f"'{package_id}' in channel '{normalized_channel}' is not verified. " "Use a verified version or remove --require-verified." ) zip_url = selected_version.source_archive_url if title is None: title = curated.title selected_version_id = selected_version.version_id selected_version_verified = selected_version.verified selected_source_ref = { "type": selected_version.source_ref_type, "value": selected_version.source_ref_value, } if not selected_version.verified: selected_unverified_label = ( f"{package_id}@{selected_version.version_id}" ) meta = cli.install_from_zip( scipkg_root, package_id, zip_url=zip_url, title=title, activate=args.activate, force=args.force, max_zip_bytes=args.max_zip_bytes, ) installed_id = str(meta.get("id") or package_id) if not args.zip_url: _save_curated_install_metadata( scipkg_root, installed_id, channel=normalized_channel, curated_package_id=package_id, version_id=selected_version_id or "branch-head", source_archive_url=str(zip_url), verified=bool(selected_version_verified), source_ref_type=( selected_source_ref.get("type") if selected_source_ref else None ), source_ref_value=( selected_source_ref.get("value") if selected_source_ref else None ), ) source = str(zip_url) router = None if not args.no_router_sync: router = cli.sync_router_rules(scipkg_root) payload = { "installed": meta, "source": source, "requested_version": requested_version, "require_verified": require_verified, "scipkg_root": str(scipkg_root), "router_sync": router, } active = cli.load_registry(scipkg_root).get("active_package") lines = [ f"Installed package '{meta.get('id', package_id)}' from {source}.", ( f"Active package: {active}." if isinstance(active, str) and active else "Active package unchanged." ), ] if selected_unverified_label: lines.append( f"Warning: installed unverified curated version: {selected_unverified_label}." ) cli._emit_output(args, payload, lines) return 0
[docs] def cmd_list(args: argparse.Namespace) -> int: """ Execute the `list` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() registry = cli.load_registry(scipkg_root) packages = cli.list_packages(scipkg_root) package_ids = sorted(packages.keys()) if isinstance(packages, dict) else [] active = registry.get("active_package") payload = { "scipkg_root": str(scipkg_root), "active_package": active, "packages": packages, } summary = ", ".join(package_ids) if package_ids else "(none)" lines = [ f"Installed packages: {len(package_ids)}. Active: {active or 'none'}.", f"Packages: {summary}.", ] cli._emit_output(args, payload, lines) return 0
[docs] def cmd_avail(args: argparse.Namespace) -> int: """ Execute the `avail` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() query = str(getattr(args, "query", "") or "").strip() if not query: raise cli.PackageError("Query is required for fermilink avail.") normalized_channel = cli.normalize_channel_id(getattr(args, "channel", None)) curated_packages = cli.list_curated_packages(channel=normalized_channel) lowered_query = query.lower() exact_match = curated_packages.get(lowered_query) matched: list[dict[str, object]] = [] if exact_match is not None: versions = [ { "version_id": version.version_id, "source_archive_url": version.source_archive_url, "verified": version.verified, "source_ref": { "type": version.source_ref_type, "value": version.source_ref_value, }, } for version in exact_match.versions ] matched.append( { "package_id": exact_match.package_id, "title": exact_match.title, "zip_url": exact_match.zip_url, "match_type": "exact", "description": exact_match.description or "", "upstream_repo_url": exact_match.upstream_repo_url or "", "homepage_url": exact_match.homepage_url or "", "tags": list(exact_match.tags), "default_version": exact_match.default_version, "versions": versions, } ) else: for package in curated_packages.values(): package_id = package.package_id.lower() title = package.title.lower() if lowered_query in package_id or lowered_query in title: versions = [ { "version_id": version.version_id, "source_archive_url": version.source_archive_url, "verified": version.verified, "source_ref": { "type": version.source_ref_type, "value": version.source_ref_value, }, } for version in package.versions ] matched.append( { "package_id": package.package_id, "title": package.title, "zip_url": package.zip_url, "match_type": "partial", "description": package.description or "", "upstream_repo_url": package.upstream_repo_url or "", "homepage_url": package.homepage_url or "", "tags": list(package.tags), "default_version": package.default_version, "versions": versions, } ) matched.sort(key=lambda item: str(item.get("package_id") or "")) payload = { "channel": normalized_channel, "query": query, "found": bool(matched), "results": matched, "total_curated_packages": len(curated_packages), } if matched: lines = [ f"Found {len(matched)} package(s) in channel '{normalized_channel}' for '{query}'.", ] for item in matched: versions = item.get("versions") version_list = ( ", ".join( f"{str(version.get('version_id'))}{'' if bool(version.get('verified')) else ' (unverified)'}" for version in versions if isinstance(version, dict) ) if isinstance(versions, list) else "" ) default_version = str(item.get("default_version") or "branch-head") base_line = ( f"{item['package_id']}: {item['title']} ({item['zip_url']}) " f"[default={default_version}]" ) lines.append(base_line) description = str(item.get("description") or "").strip() if description: lines.append(f" - {description}") if version_list: lines.append(f" - versions: {version_list}") else: lines = [ f"No curated package matched '{query}' in channel '{normalized_channel}'.", ( "Try `fermilink list` to see installed packages, or " "`fermilink install <package_id>` for an exact curated id." ), ] cli._emit_output(args, payload, lines) return 0
[docs] def cmd_activate(args: argparse.Namespace) -> int: """ Execute the `activate` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() package_id = cli.normalize_package_id(args.package_id) meta = cli.activate_package(scipkg_root, package_id) payload = { "active_package": package_id, "meta": meta, "scipkg_root": str(scipkg_root), } cli._emit_output(args, payload, [f"Active package set to '{package_id}'."]) return 0
def _collect_csv_and_repeat( values: list[str] | None, csv_value: str | None ) -> list[str]: collected: list[str] = [] if values: collected.extend(values) if csv_value: collected.extend(csv_value.split(",")) return collected def _normalize_overlay_name_values(values: list[str]) -> list[str]: """Normalize repeated/csv overlay entry values while preserving order.""" normalized: list[str] = [] seen: set[str] = set() for value in values: for candidate in str(value).split(","): name = candidate.strip() if not name or name in seen: continue seen.add(name) normalized.append(name) return normalized def _normalize_overlay_meta_entries(raw: object) -> list[str] | None: """Normalize stored package overlay metadata into a deduplicated list.""" cli = _cli() if raw is None: return None if isinstance(raw, str): candidates = raw.split(",") elif isinstance(raw, list): candidates = raw else: raise cli.PackageError( "Package metadata field overlay_entries must be a list or comma-separated string." ) normalized: list[str] = [] seen: set[str] = set() for candidate in candidates: if not isinstance(candidate, str): raise cli.PackageError("overlay_entries can only contain strings.") name = candidate.strip() if not name or name in seen: continue seen.add(name) normalized.append(name) return normalized def _resolve_overlay_entries_for_remove( *, scipkg_root: Path, package_id: str, ) -> tuple[list[str], bool]: """Resolve overlay entries used as the baseline for `overlay --remove`.""" cli = _cli() packages = cli.list_packages(scipkg_root) if not isinstance(packages, dict): raise cli.PackageNotFoundError(f"Package not found: {package_id}") package_meta = packages.get(package_id) if not isinstance(package_meta, dict): raise cli.PackageNotFoundError(f"Package not found: {package_id}") configured_entries = _normalize_overlay_meta_entries( package_meta.get("overlay_entries") ) if configured_entries is not None: return configured_entries, True raw_installed_path = package_meta.get("installed_path") if not isinstance(raw_installed_path, str) or not raw_installed_path.strip(): raise cli.PackageError("Package metadata is missing installed_path.") package_root = Path(raw_installed_path).expanduser() if not package_root.is_absolute(): package_root = (Path.cwd() / package_root).resolve() from fermilink.packages.package_registry import iter_package_entries entries, _missing = iter_package_entries(package_root, include_names=None) return [entry.name for entry in entries], False
[docs] def cmd_overlay(args: argparse.Namespace) -> int: """ Execute the `overlay` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() package_id = cli.normalize_package_id(args.package_id) collected = _collect_csv_and_repeat(args.entry, args.entries_csv) remove_collected = _collect_csv_and_repeat(args.remove, None) if args.clear and (collected or remove_collected): raise cli.PackageError( "Cannot combine --clear with --entry/--entries/--remove." ) if remove_collected and collected: raise cli.PackageError("Cannot combine --remove with --entry/--entries.") if args.clear: entries: list[str] | None = None elif remove_collected: remove_entries = _normalize_overlay_name_values(remove_collected) if not remove_entries: raise cli.PackageError("Provide at least one non-empty value for --remove.") baseline_entries, had_explicit_overlay = _resolve_overlay_entries_for_remove( scipkg_root=scipkg_root, package_id=package_id, ) remove_set = set(remove_entries) entries_after_remove = [ name for name in baseline_entries if name not in remove_set ] if not had_explicit_overlay and entries_after_remove == baseline_entries: entries = None else: entries = entries_after_remove else: if not collected: raise cli.PackageError( "Provide --entry/--entries to set exposed items, " "--remove to subtract entries, or use --clear." ) entries = collected meta = cli.set_package_overlay_entries(scipkg_root, package_id, entries) overlay_entries = meta.get("overlay_entries") if isinstance(overlay_entries, list): entry_text = ( ", ".join(str(item) for item in overlay_entries) if overlay_entries else "(no exportable entries)" ) else: entry_text = "(all exportable entries)" payload = { "package_id": package_id, "overlay_entries": overlay_entries, "meta": meta, "scipkg_root": str(scipkg_root), } cli._emit_output( args, payload, [f"Overlay entries for '{package_id}': {entry_text}."] ) return 0
[docs] def cmd_dependencies(args: argparse.Namespace) -> int: """ Execute the `dependencies` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() package_id = cli.normalize_package_id(args.package_id) collected = _collect_csv_and_repeat(args.package, args.packages_csv) if args.clear and collected: raise cli.PackageError("Cannot combine --clear with --package/--packages.") if args.clear: dependency_ids: list[str] | None = None else: if not collected: raise cli.PackageError( "Provide --package/--packages to set dependencies, or use --clear." ) dependency_ids = collected meta = cli.set_package_dependency_ids(scipkg_root, package_id, dependency_ids) dependency_ids = meta.get("dependency_package_ids") if isinstance(dependency_ids, list) and dependency_ids: deps_text = ", ".join(str(item) for item in dependency_ids) else: deps_text = "(none)" payload = { "package_id": package_id, "dependency_package_ids": dependency_ids, "meta": meta, "scipkg_root": str(scipkg_root), } cli._emit_output(args, payload, [f"Dependencies for '{package_id}': {deps_text}."]) return 0
[docs] def cmd_delete(args: argparse.Namespace) -> int: """ Execute the `delete` CLI subcommand. Parameters ---------- args : argparse.Namespace Parsed CLI arguments namespace for the subcommand. Returns ------- int Process exit code (`0` on success, non-zero on failure). """ cli = _cli() scipkg_root = cli.resolve_scipkg_root() package_id = cli.normalize_package_id(args.package_id) result = cli.delete_package( scipkg_root, package_id, remove_files=not args.keep_files, ) router = None if not args.no_router_sync: router = cli.sync_router_rules(scipkg_root) payload = { "deleted": result, "router_sync": router, "scipkg_root": str(scipkg_root), } removed_files = bool(result.get("removed_files")) active = result.get("active_package") lines = [ f"Deleted package '{package_id}' from registry. Removed files: {'yes' if removed_files else 'no'}.", ( f"Active package: {active}." if isinstance(active, str) and active else "No active package set." ), ] cli._emit_output(args, payload, lines) return 0