Source code for fermilink.packages.package_registry

from __future__ import annotations

import json
import os
import re
import shutil
import sys
import tempfile
import time
import urllib.request
import zipfile
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from fermilink.packages import (
    PACKAGE_DEPENDENCIES_DIRNAME,
    PACKAGE_DEPENDENCY_IDS_KEY,
    PACKAGE_OVERLAY_ENTRIES_KEY,
    REGISTRY_FILENAME,
    WORKSPACE_MANIFEST_FILENAME,
    atomic_write_json as _atomic_write_json_shared,
    build_default_registry,
    extract_manifest_dependency_ids,
    extract_manifest_entry_names,
    is_exportable_entry_name,
    link_or_copy_entry as _link_or_copy_entry_shared,
    load_registry_file,
    normalize_package_id as _normalize_package_id,
    normalize_registry_payload,
    overlay_package_into_repo_core,
    remove_existing_entry as _remove_existing_entry_shared,
    remove_managed_dependency_links as _remove_managed_dependency_links_shared,
    remove_managed_entries as _remove_managed_entries_shared,
    save_registry_file,
    TEMPLATE_RESERVED_ENTRY_NAMES,
)

REMOVED_INSTRUCTION_FILENAMES = {"agents.md", "claude.md", "gemini.md"}
REMOVED_ROOT_DIRECTORIES = {"projects"}
PROGRESS_REFRESH_SECONDS = 0.1
PROGRESS_BAR_WIDTH = 24
TRUTHY_ENV_VALUES = {"1", "true", "yes", "on"}
PROGRESS_DOT_FRAMES = (".  ", ".. ", "...")


[docs] class PackageError(RuntimeError): """Base package-management error."""
[docs] class PackageNotFoundError(PackageError): """Raised when a package id cannot be found."""
[docs] class PackageValidationError(PackageError): """Raised when package metadata is invalid."""
def _now_iso() -> str: return datetime.now(timezone.utc).isoformat()
[docs] def normalize_package_id(value: str) -> str: """ Normalize and validate a package identifier. Parameters ---------- value : str Raw value to normalize. Returns ------- str Normalized package id. """ try: return _normalize_package_id(value) except ValueError as exc: raise PackageValidationError(str(exc)) from exc
[docs] def packages_root(scipkg_root: Path) -> Path: """ Return the package storage root and ensure it exists. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. Returns ------- Path Directory path where installed packages are stored. """ root = scipkg_root / "packages" root.mkdir(parents=True, exist_ok=True) return root
[docs] def registry_path(scipkg_root: Path) -> Path: """ Return the package registry file path. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. Returns ------- Path Path to the registry JSON file. """ return scipkg_root / REGISTRY_FILENAME
[docs] def workspace_manifest_path(workspace_root: Path) -> Path: """ Return the workspace manifest file path. Parameters ---------- workspace_root : Path Workspace root where manifest state is stored. Returns ------- Path Path to the workspace manifest JSON file. """ return workspace_root / WORKSPACE_MANIFEST_FILENAME
def _default_registry() -> dict[str, Any]: return build_default_registry(updated_at=_now_iso()) def _atomic_write_json(path: Path, payload: dict[str, Any]) -> None: _atomic_write_json_shared(path, payload) def _normalize_registry(payload: Any) -> dict[str, Any]: return normalize_registry_payload( payload, updated_at=_now_iso(), normalize_package_id=normalize_package_id, coerce_non_dict_meta_to_empty=True, fallback_active_to_first_package=True, )
[docs] def load_registry(scipkg_root: Path) -> dict[str, Any]: """ Load the package registry for a scientific package root. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. Returns ------- dict[str, Any] Normalized registry payload. """ path = registry_path(scipkg_root) return load_registry_file( path, default_registry=_default_registry, normalize_registry=_normalize_registry, )
[docs] def save_registry(scipkg_root: Path, payload: dict[str, Any]) -> dict[str, Any]: """ Save and normalize package registry state for a scientific package root. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. payload : dict[str, Any] JSON-like payload to normalize or persist. Returns ------- dict[str, Any] Normalized registry payload after persistence. """ return save_registry_file( registry_path(scipkg_root), payload, updated_at=_now_iso(), normalize_registry=_normalize_registry, )
[docs] def list_packages(scipkg_root: Path) -> dict[str, Any]: """ Return package metadata entries from the registry. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. Returns ------- dict[str, Any] Package metadata mapping keyed by package id. """ return load_registry(scipkg_root).get("packages", {})
def _resolve_meta_installed_path(package_meta: dict[str, Any]) -> Path: raw_path = package_meta.get("installed_path") if not isinstance(raw_path, str) or not raw_path: raise PackageValidationError("Package metadata is missing installed_path.") path = Path(raw_path).expanduser() if not path.is_absolute(): path = Path.cwd() / path if not path.exists() or not path.is_dir(): raise PackageValidationError(f"Installed package path is invalid: {path}") return path
[docs] def register_package( scipkg_root: Path, package_id: str, *, installed_path: Path, source: str, title: str | None = None, activate: bool = False, extra: dict[str, Any] | None = None, ) -> dict[str, Any]: """ Register or update a package entry in the registry. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. installed_path : Path Filesystem path of the installed package content. source : str Source label recorded in package metadata. title : str | None Optional human-readable package title. activate : bool Whether to mark the package as active after operation completion. extra : dict[str, Any] | None Additional metadata fields merged into the package record. Returns ------- dict[str, Any] Package metadata for the registered package. """ normalized_id = normalize_package_id(package_id) resolved_path = installed_path.expanduser().resolve() if not resolved_path.exists() or not resolved_path.is_dir(): raise PackageValidationError(f"Installed path is invalid: {resolved_path}") registry = load_registry(scipkg_root) packages = registry.setdefault("packages", {}) current_meta = packages.get(normalized_id) if not isinstance(current_meta, dict): current_meta = {} meta = dict(current_meta) meta.update( { "id": normalized_id, "title": title or current_meta.get("title") or normalized_id, "installed_path": str(resolved_path), "source": source, "status": "installed", "updated_at": _now_iso(), } ) if "installed_at" not in meta: meta["installed_at"] = _now_iso() if extra: meta.update(extra) packages[normalized_id] = meta if activate: registry["active_package"] = normalized_id elif registry.get("active_package") not in packages: registry["active_package"] = normalized_id save_registry(scipkg_root, registry) return meta
[docs] def activate_package(scipkg_root: Path, package_id: str) -> dict[str, Any]: """ Set a package as the active package in registry state. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. Returns ------- dict[str, Any] Package metadata for the newly active package. """ normalized_id = normalize_package_id(package_id) registry = load_registry(scipkg_root) packages = registry.get("packages", {}) meta = packages.get(normalized_id) if not isinstance(meta, dict): raise PackageNotFoundError(f"Package not found: {normalized_id}") registry["active_package"] = normalized_id save_registry(scipkg_root, registry) return meta
[docs] def delete_package( scipkg_root: Path, package_id: str, *, remove_files: bool = True, ) -> dict[str, Any]: """ Delete a package from registry state and optionally remove files. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. remove_files : bool Whether installed package files should be deleted from disk. Returns ------- dict[str, Any] Package metadata for the deleted package. """ normalized_id = normalize_package_id(package_id) registry = load_registry(scipkg_root) packages = registry.get("packages", {}) package_meta = packages.get(normalized_id) if not isinstance(package_meta, dict): raise PackageNotFoundError(f"Package not found: {normalized_id}") removed_files = False skipped_reason: str | None = None installed_path: Path | None = None raw_path = package_meta.get("installed_path") if isinstance(raw_path, str) and raw_path: installed_path = Path(raw_path).expanduser() if not installed_path.is_absolute(): installed_path = Path.cwd() / installed_path if remove_files: managed_root = packages_root(scipkg_root).resolve() if installed_path is None: skipped_reason = "missing_installed_path" elif not installed_path.exists(): removed_files = True else: resolved = installed_path.resolve() try: resolved.relative_to(managed_root) inside_managed_root = True except ValueError: inside_managed_root = False if not inside_managed_root: skipped_reason = "installed_path_outside_managed_packages" else: shutil.rmtree(resolved, ignore_errors=True) removed_files = not resolved.exists() if not removed_files: skipped_reason = "failed_to_remove_installed_path" packages.pop(normalized_id, None) active = registry.get("active_package") if active == normalized_id or active not in packages: registry["active_package"] = sorted(packages.keys())[0] if packages else None save_registry(scipkg_root, registry) return { "package_id": normalized_id, "removed_from_registry": True, "removed_files": removed_files, "file_removal_requested": remove_files, "file_removal_skipped_reason": skipped_reason, "active_package": registry.get("active_package"), }
def _normalize_overlay_entry_name(raw: str) -> str: value = raw.strip() if not value: raise PackageValidationError("Overlay entry names cannot be empty.") if value in {".", ".."} or "/" in value or "\\" in value: raise PackageValidationError( "Overlay entry names must be top-level names without path separators." ) if value.casefold() in TEMPLATE_RESERVED_ENTRY_NAMES: raise PackageValidationError(f"Overlay entry is reserved: {value}") if value.startswith("."): raise PackageValidationError(f"Overlay entry is hidden: {value}") return value def _normalize_overlay_entries(raw: Any) -> list[str] | None: if raw is None: return None if isinstance(raw, str): candidates = raw.split(",") elif isinstance(raw, list): candidates = raw else: raise PackageValidationError("overlay_entries must be a list or csv string.") normalized: list[str] = [] seen: set[str] = set() for item in candidates: if not isinstance(item, str): raise PackageValidationError("overlay_entries can only contain strings.") if not item.strip(): continue entry = _normalize_overlay_entry_name(item) if entry in seen: continue seen.add(entry) normalized.append(entry) return normalized def _normalize_dependency_ids( raw: Any, *, package_id: str | None = None, ) -> list[str] | None: if raw is None: return None if isinstance(raw, str): candidates = raw.split(",") elif isinstance(raw, list): candidates = raw else: raise PackageValidationError( "dependency_package_ids must be a list or csv string." ) owner = normalize_package_id(package_id) if package_id else None normalized: list[str] = [] seen: set[str] = set() for item in candidates: if not isinstance(item, str): raise PackageValidationError( "dependency_package_ids can only contain strings." ) if not item.strip(): continue dep_id = normalize_package_id(item) if owner and dep_id == owner: continue if dep_id in seen: continue seen.add(dep_id) normalized.append(dep_id) return normalized
[docs] def set_package_overlay_entries( scipkg_root: Path, package_id: str, entries: list[str] | None, ) -> dict[str, Any]: """ Persist overlay entry names for an installed package. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. entries : list[str] | None Overlay entry names to persist for this package. Returns ------- dict[str, Any] Updated package metadata after overlay entry persistence. """ normalized_id = normalize_package_id(package_id) normalized_entries = _normalize_overlay_entries(entries) registry = load_registry(scipkg_root) packages = registry.get("packages", {}) meta = packages.get(normalized_id) if not isinstance(meta, dict): raise PackageNotFoundError(f"Package not found: {normalized_id}") if normalized_entries is None: meta.pop(PACKAGE_OVERLAY_ENTRIES_KEY, None) else: meta[PACKAGE_OVERLAY_ENTRIES_KEY] = normalized_entries meta["updated_at"] = _now_iso() save_registry(scipkg_root, registry) return meta
[docs] def set_package_dependency_ids( scipkg_root: Path, package_id: str, dependency_package_ids: list[str] | None, ) -> dict[str, Any]: """ Persist dependency package ids for an installed package. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. dependency_package_ids : list[str] | None Dependency package ids to persist for this package. Returns ------- dict[str, Any] Updated package metadata after dependency persistence. """ normalized_id = normalize_package_id(package_id) normalized_dependencies = _normalize_dependency_ids( dependency_package_ids, package_id=normalized_id, ) registry = load_registry(scipkg_root) packages = registry.get("packages", {}) meta = packages.get(normalized_id) if not isinstance(meta, dict): raise PackageNotFoundError(f"Package not found: {normalized_id}") missing = [] if normalized_dependencies: for dep_id in normalized_dependencies: if dep_id not in packages: missing.append(dep_id) if missing: missing_text = ", ".join(sorted(set(missing))) raise PackageNotFoundError(f"Dependency package(s) not found: {missing_text}") if normalized_dependencies: meta[PACKAGE_DEPENDENCY_IDS_KEY] = normalized_dependencies else: meta.pop(PACKAGE_DEPENDENCY_IDS_KEY, None) meta["updated_at"] = _now_iso() save_registry(scipkg_root, registry) return meta
def _download_zip(url: str, destination: Path, max_bytes: int) -> int: def _truthy_env(name: str) -> bool: raw = os.getenv(name) if not isinstance(raw, str): return False return raw.strip().lower() in TRUTHY_ENV_VALUES def _should_show_progress() -> bool: if _truthy_env("FERMILINK_NO_PROGRESS"): return False if _truthy_env("FERMILINK_PROGRESS"): return True try: return bool(sys.stderr.isatty()) except Exception: return False def _format_size(value: int) -> str: units = ("B", "KB", "MB", "GB", "TB") size = float(max(0, value)) for unit in units: if size < 1024.0 or unit == units[-1]: if unit == "B": return f"{int(size)} {unit}" return f"{size:.1f} {unit}" size /= 1024.0 return f"{int(size)} B" def _render_progress( downloaded: int, total_bytes: int | None, started_at: float ) -> str: elapsed = max(time.monotonic() - started_at, 1e-6) speed = int(downloaded / elapsed) speed_text = f"{_format_size(speed)}/s" downloaded_text = _format_size(downloaded) if isinstance(total_bytes, int) and total_bytes > 0: progress = min(downloaded / total_bytes, 1.0) filled = int(progress * PROGRESS_BAR_WIDTH) bar = ("#" * filled) + ("-" * max(0, PROGRESS_BAR_WIDTH - filled)) total_text = _format_size(total_bytes) return ( f"Downloading [{bar}] {progress * 100:6.2f}% " f"{downloaded_text}/{total_text} {speed_text}" ) dot_index = int(elapsed / 0.25) % len(PROGRESS_DOT_FRAMES) dots = PROGRESS_DOT_FRAMES[dot_index] return f"Downloading{dots} {downloaded_text} {speed_text}" def _write_progress_line(line: str, *, previous_length: int) -> int: padding = " " * max(0, previous_length - len(line)) sys.stderr.write("\r") sys.stderr.write(line) if padding: sys.stderr.write(padding) sys.stderr.flush() return len(line) def _extract_content_length(headers: Any) -> int | None: if headers is None or not hasattr(headers, "get"): return None raw = headers.get("Content-Length") if not isinstance(raw, str): return None raw_value = raw.strip() if not raw_value: return None try: parsed = int(raw_value) except ValueError: return None if parsed <= 0: return None return parsed def _extract_content_range_total(headers: Any) -> int | None: if headers is None or not hasattr(headers, "get"): return None raw = headers.get("Content-Range") if not isinstance(raw, str): return None match = re.search(r"/\s*(\d+)\s*$", raw) if match is None: return None try: parsed = int(match.group(1)) except ValueError: return None if parsed <= 0: return None return parsed request_headers = { "User-Agent": "fermilink-installer/0.2", "Accept": "application/zip, application/octet-stream", } def _probe_total_bytes() -> int | None: try: head_req = urllib.request.Request( url, headers=request_headers, method="HEAD" ) with urllib.request.urlopen(head_req) as response: total_bytes = _extract_content_length( getattr(response, "headers", None) ) if isinstance(total_bytes, int) and total_bytes > 0: return total_bytes ranged_total = _extract_content_range_total( getattr(response, "headers", None) ) if isinstance(ranged_total, int) and ranged_total > 0: return ranged_total except Exception: pass try: range_headers = dict(request_headers) range_headers["Range"] = "bytes=0-0" range_req = urllib.request.Request(url, headers=range_headers) with urllib.request.urlopen(range_req) as response: ranged_total = _extract_content_range_total( getattr(response, "headers", None) ) if isinstance(ranged_total, int) and ranged_total > 0: return ranged_total total_bytes = _extract_content_length( getattr(response, "headers", None) ) if isinstance(total_bytes, int) and total_bytes > 1: return total_bytes except Exception: pass return None req = urllib.request.Request(url, headers=request_headers) total = 0 show_progress = _should_show_progress() progress_length = 0 progress_rendered = False started_at = time.monotonic() last_progress_emit = 0.0 preflight_total_bytes = _probe_total_bytes() if show_progress else None with urllib.request.urlopen(req) as response, destination.open("wb") as handle: headers = getattr(response, "headers", None) total_bytes = _extract_content_length(headers) if total_bytes is None: total_bytes = _extract_content_range_total(headers) if total_bytes is None: total_bytes = preflight_total_bytes try: if show_progress: progress_length = _write_progress_line( _render_progress(0, total_bytes, started_at), previous_length=progress_length, ) progress_rendered = True while True: chunk = response.read(1024 * 1024) if not chunk: break total += len(chunk) if max_bytes > 0 and total > max_bytes: raise PackageError( f"Zip download exceeded max size {max_bytes} bytes." ) handle.write(chunk) if show_progress: now = time.monotonic() if now - last_progress_emit >= PROGRESS_REFRESH_SECONDS: progress_length = _write_progress_line( _render_progress(total, total_bytes, started_at), previous_length=progress_length, ) progress_rendered = True last_progress_emit = now if show_progress: progress_length = _write_progress_line( _render_progress(total, total_bytes, started_at), previous_length=progress_length, ) progress_rendered = True finally: if show_progress and progress_rendered: sys.stderr.write("\n") sys.stderr.flush() return total def _safe_extract_zip(zip_path: Path, extract_root: Path) -> None: with zipfile.ZipFile(zip_path, "r") as archive: for member in archive.infolist(): member_path = Path(member.filename) if member_path.is_absolute(): raise PackageError(f"Zip contains absolute path: {member.filename}") if ".." in member_path.parts: raise PackageError(f"Zip contains unsafe path: {member.filename}") archive.extractall(extract_root) def _detect_extracted_root(extract_root: Path) -> Path: children = [child for child in extract_root.iterdir() if child.name != "__MACOSX"] if len(children) == 1 and children[0].is_dir(): return children[0] return extract_root def _strip_instruction_files(package_root: Path) -> list[str]: removed: list[str] = [] for path in package_root.rglob("*"): if not path.is_file(): continue if path.name.casefold() not in REMOVED_INSTRUCTION_FILENAMES: continue rel = path.relative_to(package_root) path.unlink(missing_ok=True) removed.append(str(rel)) return removed def _strip_root_directories(package_root: Path) -> list[str]: removed: list[str] = [] for name in REMOVED_ROOT_DIRECTORIES: path = package_root / name if not path.exists(): continue if path.is_symlink() or path.is_file(): path.unlink(missing_ok=True) elif path.is_dir(): shutil.rmtree(path, ignore_errors=True) removed.append(name) return removed
[docs] def install_from_zip( scipkg_root: Path, package_id: str, *, zip_url: str, title: str | None = None, activate: bool = False, force: bool = False, max_zip_bytes: int = 800 * 1024 * 1024, ) -> dict[str, Any]: """ Install a package from a zip archive URL and register it. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. zip_url : str URL of the zip archive to download and install. title : str | None Optional human-readable package title. activate : bool Whether to mark the package as active after operation completion. force : bool Whether existing package ids may be overwritten. max_zip_bytes : int Maximum allowed zip size in bytes before aborting download/install. Returns ------- dict[str, Any] Package metadata for the installed package. """ normalized_id = normalize_package_id(package_id) target_dir = packages_root(scipkg_root) / normalized_id if target_dir.exists(): if not force: raise PackageError( f"Target package directory already exists: {target_dir}. Use --force to download again." ) shutil.rmtree(target_dir) with tempfile.TemporaryDirectory(prefix="fermilink-install-") as temp_dir: temp_root = Path(temp_dir) zip_path = temp_root / "package.zip" extract_root = temp_root / "extract" extract_root.mkdir(parents=True, exist_ok=True) _download_zip(zip_url, zip_path, max_zip_bytes) _safe_extract_zip(zip_path, extract_root) source_root = _detect_extracted_root(extract_root) if not source_root.exists() or not source_root.is_dir(): raise PackageError(f"Extracted source root is invalid: {source_root}") shutil.copytree(source_root, target_dir) removed_instruction_files = _strip_instruction_files(target_dir) removed_root_directories = _strip_root_directories(target_dir) return register_package( scipkg_root, normalized_id, installed_path=target_dir, source=zip_url, title=title, activate=activate, extra={ "removed_instruction_files": removed_instruction_files, "removed_root_directories": removed_root_directories, }, )
[docs] def install_from_local_path( scipkg_root: Path, package_id: str, *, local_path: Path, title: str | None = None, activate: bool = False, force: bool = False, ) -> dict[str, Any]: """ Install a package from a local path and register it. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. package_id : str Normalized package identifier. local_path : Path Local package directory path to install from. title : str | None Optional human-readable package title. activate : bool Whether to mark the package as active after operation completion. force : bool Whether existing package ids may be overwritten. Returns ------- dict[str, Any] Package metadata for the installed package. """ normalized_id = normalize_package_id(package_id) source = local_path.expanduser().resolve() if not source.exists() or not source.is_dir(): raise PackageError(f"Local source path is invalid: {source}") target_dir = packages_root(scipkg_root) / normalized_id same_source_target = target_dir.exists() and source == target_dir.resolve() if target_dir.exists(): if not force: raise PackageError( f"Target package directory already exists: {target_dir}. Use --force to download again." ) if not same_source_target: shutil.rmtree(target_dir) if not same_source_target: shutil.copytree(source, target_dir) return register_package( scipkg_root, normalized_id, installed_path=target_dir, source=f"local-path:{source}", title=title, activate=activate, )
[docs] def load_workspace_manifest(workspace_root: Path) -> dict[str, Any] | None: """ Load workspace overlay manifest state from disk. Parameters ---------- workspace_root : Path Workspace root where manifest state is stored. Returns ------- dict[str, Any] | None Workspace manifest payload, or `None` when absent/invalid. """ path = workspace_manifest_path(workspace_root) if not path.exists(): return None try: with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) except (OSError, json.JSONDecodeError): return None if not isinstance(payload, dict): return None return payload
[docs] def save_workspace_manifest(workspace_root: Path, payload: dict[str, Any]) -> None: """ Save workspace overlay manifest state to disk. Parameters ---------- workspace_root : Path Workspace root where manifest state is stored. payload : dict[str, Any] JSON-like payload to normalize or persist. Returns ------- None No return value. """ workspace_root.mkdir(parents=True, exist_ok=True) _atomic_write_json(workspace_manifest_path(workspace_root), payload)
[docs] def resolve_session_package( scipkg_root: Path, workspace_root: Path, requested_package_id: str | None = None, ) -> tuple[str, dict[str, Any]] | tuple[None, None]: """ Resolve the package to use for the current session/workspace. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. workspace_root : Path Workspace root where manifest state is stored. requested_package_id : str | None Optional package id explicitly requested for the session. Returns ------- tuple[str, dict[str, Any]] | tuple[None, None] Tuple containing resolved package id and metadata, or `(None, None)`. """ registry = load_registry(scipkg_root) packages = registry.get("packages", {}) if not isinstance(packages, dict) or not packages: return None, None if requested_package_id: requested_id = normalize_package_id(requested_package_id) requested_meta = packages.get(requested_id) if not isinstance(requested_meta, dict): raise PackageNotFoundError(f"Requested package not found: {requested_id}") _resolve_meta_installed_path(requested_meta) return requested_id, requested_meta manifest = load_workspace_manifest(workspace_root) if isinstance(manifest, dict): pinned = manifest.get("package_id") if isinstance(pinned, str): try: pinned_id = normalize_package_id(pinned) except PackageValidationError: pinned_id = None if pinned_id: pinned_meta = packages.get(pinned_id) if isinstance(pinned_meta, dict): try: _resolve_meta_installed_path(pinned_meta) return pinned_id, pinned_meta except PackageValidationError: pass env_active = os.getenv("FERMILINK_SCIPKG_ACTIVE", "").strip() if env_active: try: env_id = normalize_package_id(env_active) env_meta = packages.get(env_id) if isinstance(env_meta, dict): _resolve_meta_installed_path(env_meta) return env_id, env_meta except PackageValidationError: pass active = registry.get("active_package") if isinstance(active, str): try: active_id = normalize_package_id(active) except PackageValidationError: active_id = None if active_id: active_meta = packages.get(active_id) if isinstance(active_meta, dict): _resolve_meta_installed_path(active_meta) return active_id, active_meta return None, None
def _entry_is_exportable(entry: Path) -> bool: return is_exportable_entry_name(entry.name)
[docs] def iter_package_entries( package_root: Path, include_names: list[str] | None = None, ) -> tuple[list[Path], list[str]]: """ Enumerate installable package entries from a package directory. Parameters ---------- package_root : Path Root directory of one installed package. include_names : list[str] | None Optional allowlist of entry names to include when enumerating package contents. Returns ------- tuple[list[Path], list[str]] Tuple of `(entries, skipped_names)` from package directory traversal. """ if not package_root.exists() or not package_root.is_dir(): raise PackageValidationError(f"Package root is invalid: {package_root}") exportable = [ entry for entry in sorted(package_root.iterdir(), key=lambda item: item.name) if _entry_is_exportable(entry) ] if include_names is None: return exportable, [] by_name = {entry.name: entry for entry in exportable} selected: list[Path] = [] missing: list[str] = [] for name in include_names: hit = by_name.get(name) if hit is None: missing.append(name) else: selected.append(hit) return selected, missing
def _remove_existing_entry(path: Path) -> None: _remove_existing_entry_shared(path) def _link_or_copy_entry(src: Path, dst: Path) -> str: return _link_or_copy_entry_shared(src, dst) def _manifest_entry_names(manifest: dict[str, Any] | None) -> set[str]: return extract_manifest_entry_names(manifest) def _manifest_dependency_ids(manifest: dict[str, Any] | None) -> set[str]: return extract_manifest_dependency_ids(manifest) def _remove_managed_symlinks( repo_dir: Path, manifest: dict[str, Any] | None, *, only_names: set[str] | None = None, ) -> None: _remove_managed_entries_shared( repo_dir, manifest, only_names=only_names, remove_non_symlink_entries=False, remove_existing=_remove_existing_entry, ) def _remove_managed_dependency_links( repo_dir: Path, manifest: dict[str, Any] | None, *, only_package_ids: set[str] | None = None, ) -> None: _remove_managed_dependency_links_shared( repo_dir, manifest, normalize_package_id=normalize_package_id, only_package_ids=only_package_ids, dependencies_dirname=PACKAGE_DEPENDENCIES_DIRNAME, remove_existing=_remove_existing_entry, )
[docs] def overlay_package_into_repo( repo_dir: Path, workspace_root: Path, package_id: str, package_meta: dict[str, Any], *, scipkg_root: Path, allow_replace_existing: bool = False, ) -> dict[str, Any]: """ Overlay an installed package into a workspace repository. Parameters ---------- repo_dir : Path Workspace repository path receiving overlaid entries. workspace_root : Path Workspace root where manifest state is stored. package_id : str Normalized package identifier. package_meta : dict[str, Any] Installed package metadata record from the registry. scipkg_root : Path Scientific package root containing registry and package files. allow_replace_existing : bool Whether existing destination entries may be replaced. Returns ------- dict[str, Any] Overlay result payload with applied entries and manifest metadata. """ def _load_package_map( _package_id: str, _package_meta: dict[str, Any] ) -> dict[str, Any]: registry = load_registry(scipkg_root) maybe_packages = registry.get("packages", {}) if isinstance(maybe_packages, dict): return maybe_packages return {} return overlay_package_into_repo_core( repo_dir=repo_dir, workspace_root=workspace_root, package_id=package_id, package_meta=package_meta, allow_replace_existing=allow_replace_existing, now_iso=_now_iso, resolve_package_meta_path=_resolve_meta_installed_path, normalize_overlay_entries=_normalize_overlay_entries, normalize_dependency_ids=_normalize_dependency_ids, iter_package_entries=iter_package_entries, load_workspace_manifest=load_workspace_manifest, save_workspace_manifest=save_workspace_manifest, normalize_package_id=normalize_package_id, get_package_map=_load_package_map, replace_existing_entries_for_previous_names=True, remove_non_symlink_managed_entries=False, overlay_entries_key=PACKAGE_OVERLAY_ENTRIES_KEY, dependency_ids_key=PACKAGE_DEPENDENCY_IDS_KEY, dependencies_dirname=PACKAGE_DEPENDENCIES_DIRNAME, remove_existing=_remove_existing_entry, link_or_copy=_link_or_copy_entry, )