Source code for fermilink.packages.package_core

from __future__ import annotations

import json
import os
import shutil
from collections.abc import Callable
from pathlib import Path
from typing import Any


REGISTRY_FILENAME = "registry.json"
WORKSPACE_MANIFEST_FILENAME = ".package_manifest.json"
PACKAGE_OVERLAY_ENTRIES_KEY = "overlay_entries"
PACKAGE_DEPENDENCY_IDS_KEY = "dependency_package_ids"
PACKAGE_DEPENDENCIES_DIRNAME = "external_packages"

SKIP_ENTRY_NAMES = {
    ".git",
    "__pycache__",
    ".mypy_cache",
    ".pytest_cache",
    ".ruff_cache",
    ".venv",
    "node_modules",
}
TEMPLATE_RESERVED_ENTRY_NAMES = {"agents.md"}


[docs] def normalize_package_id(value: str) -> str: """ Normalize and validate a package identifier. Parameters ---------- value : str Raw value to normalize. Returns ------- str Normalized package id. """ cleaned = "".join( char.lower() if (char.isalnum() or char in {"-", "_"}) else "-" for char in (value or "").strip() ) while "--" in cleaned: cleaned = cleaned.replace("--", "-") cleaned = cleaned.strip("-_") if not cleaned: raise ValueError("Package id is empty after normalization.") return cleaned
[docs] def build_default_registry(*, updated_at: str) -> dict[str, Any]: """ Build an empty package registry payload with standard fields. Parameters ---------- updated_at : str Timestamp string written into normalized payload metadata. Returns ------- dict[str, Any] Default registry payload with empty package map. """ return { "version": 1, "active_package": None, "packages": {}, "updated_at": updated_at, }
[docs] def atomic_write_json(path: Path, payload: dict[str, Any]) -> None: """ Write JSON payload to disk atomically. Parameters ---------- path : Path Filesystem path to read/write. payload : dict[str, Any] JSON-like payload to normalize or persist. Returns ------- None No return value. """ path.parent.mkdir(parents=True, exist_ok=True) temp_path = path.with_suffix(path.suffix + ".tmp") with temp_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, sort_keys=True) handle.write("\n") temp_path.replace(path)
[docs] def normalize_registry_payload( payload: Any, *, updated_at: str, normalize_package_id: Callable[[str], str], dependency_key: str = PACKAGE_DEPENDENCY_IDS_KEY, dependency_normalizer: Callable[[Any, str], list[str] | None] | None = None, coerce_non_dict_meta_to_empty: bool = True, fallback_active_to_first_package: bool = True, ) -> dict[str, Any]: """ Normalize and validate registry payload structure and package metadata. Parameters ---------- payload : Any JSON-like payload to normalize or persist. updated_at : str Timestamp string written into normalized payload metadata. normalize_package_id : Callable[[str], str] Callback used to validate and normalize package ids. dependency_key : str Registry metadata key used for dependency id storage. dependency_normalizer : Callable[[Any, str], list[str] | None] | None Optional callback used to normalize dependency metadata fields. coerce_non_dict_meta_to_empty : bool Whether non-dict package metadata should be coerced to empty dicts. fallback_active_to_first_package : bool Whether to set `active_package` to first package when missing/invalid. Returns ------- dict[str, Any] Normalized registry payload ready for persistence. """ registry = build_default_registry(updated_at=updated_at) if not isinstance(payload, dict): return registry packages_raw = payload.get("packages") normalized_packages: dict[str, dict[str, Any]] = {} if isinstance(packages_raw, dict): for raw_id, raw_meta in packages_raw.items(): try: package_id = normalize_package_id(str(raw_id)) except Exception: continue if not isinstance(raw_meta, dict): if not coerce_non_dict_meta_to_empty: continue raw_meta = {} meta = dict(raw_meta) meta["id"] = package_id if dependency_normalizer is not None: try: normalized_dependencies = dependency_normalizer( meta.get(dependency_key), package_id ) except Exception: normalized_dependencies = None if normalized_dependencies: meta[dependency_key] = normalized_dependencies else: meta.pop(dependency_key, None) normalized_packages[package_id] = meta registry["packages"] = normalized_packages active_id: str | None = None active_raw = payload.get("active_package") if isinstance(active_raw, str): try: maybe_active = normalize_package_id(active_raw) except Exception: maybe_active = None if maybe_active and maybe_active in normalized_packages: active_id = maybe_active if active_id is None and fallback_active_to_first_package and normalized_packages: active_id = sorted(normalized_packages.keys())[0] registry["active_package"] = active_id payload_updated_at = payload.get("updated_at") if isinstance(payload_updated_at, str) and payload_updated_at: registry["updated_at"] = payload_updated_at return registry
[docs] def load_registry_file( path: Path, *, default_registry: Callable[[], dict[str, Any]], normalize_registry: Callable[[Any], dict[str, Any]], ) -> dict[str, Any]: """ Load and normalize a package registry file from disk. Parameters ---------- path : Path Filesystem path to read/write. default_registry : Callable[[], dict[str, Any]] Factory callback that returns a default registry payload. normalize_registry : Callable[[Any], dict[str, Any]] Callback that validates and normalizes registry payloads. Returns ------- dict[str, Any] Normalized registry payload loaded from disk or defaults. """ if not path.exists(): return default_registry() try: with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) except (OSError, json.JSONDecodeError): return default_registry() return normalize_registry(payload)
[docs] def save_registry_file( path: Path, payload: Any, *, updated_at: str, normalize_registry: Callable[[Any], dict[str, Any]], ) -> dict[str, Any]: """ Normalize and save a package registry payload to disk. Parameters ---------- path : Path Filesystem path to read/write. payload : Any JSON-like payload to normalize or persist. updated_at : str Timestamp string written into normalized payload metadata. normalize_registry : Callable[[Any], dict[str, Any]] Callback that validates and normalizes registry payloads. Returns ------- dict[str, Any] Normalized payload that was persisted to disk. """ normalized = normalize_registry(payload) normalized["updated_at"] = updated_at atomic_write_json(path, normalized) return normalized
[docs] def is_exportable_entry_name(name: str) -> bool: """ Return whether a package entry name is exportable to a workspace overlay. Parameters ---------- name : str Candidate file/directory entry name. Returns ------- bool Whether the entry is eligible for workspace overlay. """ if not name: return False if name.startswith("."): return False if name in SKIP_ENTRY_NAMES: return False if name.casefold() in TEMPLATE_RESERVED_ENTRY_NAMES: return False return True
[docs] def extract_manifest_entry_names(manifest: dict[str, Any] | None) -> set[str]: """ Extract managed overlay entry names from a workspace manifest. Parameters ---------- manifest : dict[str, Any] | None Workspace manifest payload tracking managed overlays/dependencies. Returns ------- set[str] Set of managed overlay entry names. """ names: set[str] = set() if not isinstance(manifest, dict): return names linked = manifest.get("linked_entries") if not isinstance(linked, list): return names for item in linked: if isinstance(item, dict): name = item.get("name") if isinstance(name, str) and name: names.add(name) elif isinstance(item, str) and item: names.add(item) return names
[docs] def extract_manifest_dependency_ids(manifest: dict[str, Any] | None) -> set[str]: """ Extract managed dependency package ids from a workspace manifest. Parameters ---------- manifest : dict[str, Any] | None Workspace manifest payload tracking managed overlays/dependencies. Returns ------- set[str] Set of managed dependency package ids. """ package_ids: set[str] = set() if not isinstance(manifest, dict): return package_ids linked = manifest.get("linked_dependency_packages") if not isinstance(linked, list): return package_ids for item in linked: maybe_id: str | None = None if isinstance(item, dict): raw = item.get("package_id") or item.get("name") if isinstance(raw, str) and raw: maybe_id = raw elif isinstance(item, str) and item: maybe_id = item if not maybe_id: continue try: package_ids.add(normalize_package_id(maybe_id)) except ValueError: continue return package_ids
[docs] def remove_existing_entry(path: Path) -> None: """ Remove an existing file, directory, or symlink path. Parameters ---------- path : Path Filesystem path to read/write. Returns ------- None No return value. """ if path.is_symlink() or path.is_file(): path.unlink(missing_ok=True) return if path.is_dir(): shutil.rmtree(path, ignore_errors=True)
[docs] def link_or_copy_entry(src: Path, dst: Path) -> str: """ Materialize one package entry as a symlink or copy in the workspace. Parameters ---------- src : Path Source path for the entry being linked or copied. dst : Path Destination path where the entry is materialized. Returns ------- str Operation mode used (`symlink` or `copy`). """ if dst.is_symlink() or dst.exists(): return "existing" try: os.symlink(src.resolve(), dst, target_is_directory=src.is_dir()) return "symlink" except OSError: if src.is_dir(): shutil.copytree(src, dst, dirs_exist_ok=True) else: shutil.copy2(src, dst) return "copy"
[docs] def remove_managed_entries( repo_dir: Path, manifest: dict[str, Any] | None, *, only_names: set[str] | None = None, remove_non_symlink_entries: bool = False, remove_existing: Callable[[Path], None] = remove_existing_entry, ) -> None: """ Remove managed overlay entries recorded in workspace manifest state. Parameters ---------- repo_dir : Path Workspace repository path receiving overlaid entries. manifest : dict[str, Any] | None Workspace manifest payload tracking managed overlays/dependencies. only_names : set[str] | None Optional subset of managed entry names to remove. remove_non_symlink_entries : bool Whether to also remove managed non-symlink entries. remove_existing : Callable[[Path], None] Callback used to remove filesystem entries. Returns ------- None No return value. """ if not isinstance(manifest, dict): return linked = manifest.get("linked_entries") if not isinstance(linked, list): return for item in linked: if isinstance(item, dict): name = item.get("name") mode = item.get("mode", "symlink") elif isinstance(item, str): name = item mode = "symlink" else: continue if not isinstance(name, str) or not name: continue if only_names is not None and name not in only_names: continue target = repo_dir / name if mode == "symlink" and target.is_symlink(): target.unlink(missing_ok=True) elif mode != "symlink" and remove_non_symlink_entries and target.exists(): remove_existing(target)
[docs] def overlay_package_into_repo_core( *, repo_dir: Path, workspace_root: Path, package_id: str, package_meta: dict[str, Any], allow_replace_existing: bool, now_iso: Callable[[], str], resolve_package_meta_path: Callable[[dict[str, Any]], Path], normalize_overlay_entries: Callable[[Any], list[str] | None], normalize_dependency_ids: Callable[..., list[str] | None], iter_package_entries: Callable[ [Path, list[str] | None], tuple[list[Path], list[str]] ], load_workspace_manifest: Callable[[Path], dict[str, Any] | None], save_workspace_manifest: Callable[[Path, dict[str, Any]], None], normalize_package_id: Callable[[str], str], get_package_map: Callable[[str, dict[str, Any]], dict[str, Any]], replace_existing_entries_for_previous_names: bool, remove_non_symlink_managed_entries: bool, overlay_entries_key: str = PACKAGE_OVERLAY_ENTRIES_KEY, dependency_ids_key: str = PACKAGE_DEPENDENCY_IDS_KEY, dependencies_dirname: str = PACKAGE_DEPENDENCIES_DIRNAME, remove_existing: Callable[[Path], None] = remove_existing_entry, link_or_copy: Callable[[Path, Path], str] = link_or_copy_entry, ) -> dict[str, Any]: """ Overlay package entries into a workspace and persist manifest ownership. Parameters ---------- repo_dir : Path Workspace repository path receiving overlaid entries. workspace_root : Path Workspace root where manifest state is stored. package_id : str Normalized package identifier. package_meta : dict[str, Any] Installed package metadata record from the registry. allow_replace_existing : bool Whether existing destination entries may be replaced. now_iso : Callable[[], str] Callback that returns the current ISO timestamp. resolve_package_meta_path : Callable[[dict[str, Any]], Path] Callback that resolves package metadata to install directory path. normalize_overlay_entries : Callable[[Any], list[str] | None] Callback that normalizes overlay entry metadata. normalize_dependency_ids : Callable[..., list[str] | None] Callback that normalizes dependency package id metadata. iter_package_entries : Callable[[Path, list[str] | None], tuple[list[Path], list[str]]] Callback that lists installable package entries from package root. load_workspace_manifest : Callable[[Path], dict[str, Any] | None] Callback that loads workspace manifest data. save_workspace_manifest : Callable[[Path, dict[str, Any]], None] Callback that saves workspace manifest data. normalize_package_id : Callable[[str], str] Callback used to validate and normalize package ids. get_package_map : Callable[[str, dict[str, Any]], dict[str, Any]] Callback that retrieves package metadata mappings for dependency resolution. replace_existing_entries_for_previous_names : bool Whether to clear stale managed entry names from previous overlays. remove_non_symlink_managed_entries : bool Whether managed regular files/directories can be removed during cleanup. overlay_entries_key : str Package metadata key that stores overlaid entry names. dependency_ids_key : str Package metadata key that stores dependency package ids. dependencies_dirname : str Workspace subdirectory name used for dependency overlays. remove_existing : Callable[[Path], None] Callback used to remove filesystem entries. link_or_copy : Callable[[Path, Path], str] Callback that links/copies an entry into the workspace. Returns ------- dict[str, Any] Overlay result payload with applied entries and manifest metadata. """ package_root = resolve_package_meta_path(package_meta) configured_entries = normalize_overlay_entries( package_meta.get(overlay_entries_key) ) configured_dependency_ids = ( normalize_dependency_ids( package_meta.get(dependency_ids_key), package_id=package_id ) or [] ) entries, missing_requested_entries = iter_package_entries( package_root, include_names=configured_entries, ) target_entry_names = {entry.name for entry in entries} previous_manifest = load_workspace_manifest(workspace_root) previous_id: str | None = None if isinstance(previous_manifest, dict): previous_raw = previous_manifest.get("package_id") if isinstance(previous_raw, str): try: previous_id = normalize_package_id(previous_raw) except Exception: previous_id = None if previous_id is not None and previous_id != package_id: remove_managed_entries( repo_dir, previous_manifest, remove_non_symlink_entries=remove_non_symlink_managed_entries, remove_existing=remove_existing, ) remove_managed_dependency_links( repo_dir, previous_manifest, normalize_package_id=normalize_package_id, dependencies_dirname=dependencies_dirname, remove_existing=remove_existing, ) elif previous_id == package_id: stale_names = ( extract_manifest_entry_names(previous_manifest) - target_entry_names ) if stale_names: remove_managed_entries( repo_dir, previous_manifest, only_names=stale_names, remove_non_symlink_entries=remove_non_symlink_managed_entries, remove_existing=remove_existing, ) stale_dependency_ids = extract_manifest_dependency_ids(previous_manifest) - set( configured_dependency_ids ) if stale_dependency_ids: remove_managed_dependency_links( repo_dir, previous_manifest, normalize_package_id=normalize_package_id, only_package_ids=stale_dependency_ids, dependencies_dirname=dependencies_dirname, remove_existing=remove_existing, ) previous_names = extract_manifest_entry_names(previous_manifest) previous_dependency_ids = extract_manifest_dependency_ids(previous_manifest) linked_entries: list[dict[str, str]] = [] collisions: list[str] = [] for src in entries: dst = repo_dir / src.name if dst.is_symlink(): try: same_target = dst.resolve() == src.resolve() except OSError: same_target = False if same_target: linked_entries.append( { "name": src.name, "mode": "symlink", "source": str(src.resolve()), } ) continue if src.name in previous_names or allow_replace_existing: dst.unlink(missing_ok=True) else: collisions.append(src.name) continue elif dst.exists(): if allow_replace_existing or ( replace_existing_entries_for_previous_names and src.name in previous_names ): remove_existing(dst) else: collisions.append(src.name) continue mode = link_or_copy(src, dst) if mode == "existing": collisions.append(src.name) continue linked_entries.append( {"name": src.name, "mode": mode, "source": str(src.resolve())} ) package_map = get_package_map(package_id, package_meta) if not isinstance(package_map, dict): package_map = {} if package_id not in package_map: package_map = dict(package_map) package_map[package_id] = package_meta linked_dependency_packages: list[dict[str, str]] = [] missing_dependency_packages: list[str] = [] dependency_collisions: list[str] = [] dependency_root = repo_dir / dependencies_dirname dependency_root_ready = True if configured_dependency_ids: if dependency_root.exists() and not dependency_root.is_dir(): if allow_replace_existing: remove_existing(dependency_root) else: dependency_root_ready = False dependency_collisions.extend(configured_dependency_ids) if dependency_root_ready: dependency_root.mkdir(parents=True, exist_ok=True) for dependency_id in configured_dependency_ids: if not dependency_root_ready: break dependency_meta = package_map.get(dependency_id) dependency_dst = dependency_root / dependency_id if not isinstance(dependency_meta, dict): missing_dependency_packages.append(dependency_id) if dependency_id in previous_dependency_ids: remove_existing(dependency_dst) continue try: dependency_src = resolve_package_meta_path(dependency_meta) except Exception: missing_dependency_packages.append(dependency_id) if dependency_id in previous_dependency_ids: remove_existing(dependency_dst) continue if dependency_dst.is_symlink(): try: same_target = dependency_dst.resolve() == dependency_src.resolve() except OSError: same_target = False if same_target: linked_dependency_packages.append( { "package_id": dependency_id, "mode": "symlink", "source": str(dependency_src.resolve()), } ) continue if dependency_id in previous_dependency_ids or allow_replace_existing: dependency_dst.unlink(missing_ok=True) else: dependency_collisions.append(dependency_id) continue elif dependency_dst.exists(): if dependency_id in previous_dependency_ids or allow_replace_existing: remove_existing(dependency_dst) else: dependency_collisions.append(dependency_id) continue mode = link_or_copy(dependency_src, dependency_dst) if mode == "existing": dependency_collisions.append(dependency_id) continue linked_dependency_packages.append( { "package_id": dependency_id, "mode": mode, "source": str(dependency_src.resolve()), } ) if dependency_root.is_dir(): try: next(dependency_root.iterdir()) except StopIteration: dependency_root.rmdir() manifest = { "version": 1, "package_id": package_id, "package_path": str(package_root.resolve()), "requested_entries": configured_entries, "missing_requested_entries": missing_requested_entries, "linked_entries": linked_entries, "collisions": collisions, "configured_dependency_package_ids": configured_dependency_ids, "linked_dependency_packages": linked_dependency_packages, "missing_dependency_packages": sorted(set(missing_dependency_packages)), "dependency_collisions": sorted(set(dependency_collisions)), "updated_at": now_iso(), } save_workspace_manifest(workspace_root, manifest) return { "package_id": package_id, "package_path": str(package_root.resolve()), "linked_count": len(linked_entries), "collision_count": len(collisions), "collisions": collisions, "requested_entries": configured_entries, "missing_requested_entries": missing_requested_entries, "dependency_package_ids": configured_dependency_ids, "linked_dependency_count": len(linked_dependency_packages), "linked_dependency_packages": linked_dependency_packages, "missing_dependency_packages": sorted(set(missing_dependency_packages)), "dependency_collision_count": len(set(dependency_collisions)), "dependency_collisions": sorted(set(dependency_collisions)), }