Source code for fermilink.packages.curated_channels

from __future__ import annotations

import functools
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any


[docs] @dataclass(frozen=True) class ChannelPackage: package_id: str title: str zip_url: str description: str | None = None upstream_repo_url: str | None = None homepage_url: str | None = None tags: tuple[str, ...] = () default_version: str = "branch-head" versions: tuple["ChannelPackageVersion", ...] = ()
[docs] @dataclass(frozen=True) class ChannelPackageVersion: version_id: str source_archive_url: str source_ref_type: str | None = None source_ref_value: str | None = None verified: bool = False notes: str | None = None
DATA_DIR = Path(__file__).resolve().parents[1] / "data" / "curated_channels" CHANNEL_ALIASES = { "tel": "skilled-scipkg", "scipkg": "skilled-scipkg", "skiled-scipkg": "skilled-scipkg", "skilled-scipkg": "skilled-scipkg", } @functools.lru_cache(maxsize=1) def _available_channel_ids() -> tuple[str, ...]: if not DATA_DIR.exists(): return ("skilled-scipkg",) channel_ids = sorted(path.stem.strip().lower() for path in DATA_DIR.glob("*.json")) cleaned = tuple(channel_id for channel_id in channel_ids if channel_id) if cleaned: return cleaned return ("skilled-scipkg",) def _normalize_tag_list(raw: Any) -> tuple[str, ...]: if isinstance(raw, str): items = [raw] elif isinstance(raw, list): items = [str(item) for item in raw if isinstance(item, str)] else: items = [] deduped: list[str] = [] seen: set[str] = set() for item in items: value = item.strip() key = value.lower() if not value or key in seen: continue seen.add(key) deduped.append(value) return tuple(deduped) def _parse_versions( item: dict[str, Any], zip_url: str, package_id: str ) -> tuple[ChannelPackageVersion, ...]: versions_raw = item.get("versions") parsed_versions: list[ChannelPackageVersion] = [] if isinstance(versions_raw, list): for version_index, raw_version in enumerate(versions_raw, start=1): if not isinstance(raw_version, dict): raise ValueError( f"Package '{package_id}' has invalid version at index {version_index} (not an object)" ) version_id = str(raw_version.get("version_id") or "").strip() source_archive_url = str( raw_version.get("source_archive_url") or "" ).strip() if not version_id or not source_archive_url: raise ValueError( f"Package '{package_id}' version at index {version_index} missing version_id/source_archive_url" ) source_ref_raw = raw_version.get("source_ref") source_ref_type: str | None = None source_ref_value: str | None = None if isinstance(source_ref_raw, dict): ref_type = str(source_ref_raw.get("type") or "").strip() ref_value = str(source_ref_raw.get("value") or "").strip() source_ref_type = ref_type or None source_ref_value = ref_value or None verified_raw = raw_version.get("verified") verified = bool(verified_raw) if isinstance(verified_raw, bool) else False notes_raw = raw_version.get("notes") notes = ( notes_raw.strip() if isinstance(notes_raw, str) and notes_raw.strip() else None ) parsed_versions.append( ChannelPackageVersion( version_id=version_id, source_archive_url=source_archive_url, source_ref_type=source_ref_type, source_ref_value=source_ref_value, verified=verified, notes=notes, ) ) if not parsed_versions: if not zip_url: raise ValueError( f"Package '{package_id}' must provide either versions[] or zip_url" ) parsed_versions.append( ChannelPackageVersion( version_id="branch-head", source_archive_url=zip_url, source_ref_type=None, source_ref_value=None, verified=False, ) ) return tuple(parsed_versions)
[docs] def select_package_version( package: ChannelPackage, *, version_id: str | None = None, ) -> ChannelPackageVersion: """ Select a curated package version, optionally pinned by version id. Parameters ---------- package : ChannelPackage Curated package definition containing available versions. version_id : str | None Optional curated package version id to select. Returns ------- ChannelPackageVersion Selected curated package version metadata. """ versions = package.versions if not versions: return ChannelPackageVersion( version_id="branch-head", source_archive_url=package.zip_url, verified=False, ) selected_id = (version_id or package.default_version or "").strip() if not selected_id: return versions[0] selected_key = selected_id.lower() for version in versions: if version.version_id.strip().lower() == selected_key: return version available = ", ".join(version.version_id for version in versions) raise ValueError( f"Version '{selected_id}' not found for package '{package.package_id}'. Available versions: {available}" )
@functools.lru_cache(maxsize=None) def _load_channel_packages(channel_id: str) -> dict[str, ChannelPackage]: channel_path = DATA_DIR / f"{channel_id}.json" if not channel_path.is_file(): raise ValueError(f"Missing curated channel file: {channel_path}") try: payload = json.loads(channel_path.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: raise ValueError( f"Invalid curated channel file: {channel_path}: {exc}" ) from exc if not isinstance(payload, dict): raise ValueError(f"Curated channel file must be a JSON object: {channel_path}") schema_version = payload.get("schema_version") if schema_version is not None and not isinstance(schema_version, int): raise ValueError( f"Invalid schema_version in curated channel file: {channel_path}" ) packages_raw = payload.get("packages") if not isinstance(packages_raw, list): raise ValueError( f"Curated channel file missing `packages` list: {channel_path}" ) packages: dict[str, ChannelPackage] = {} for index, item in enumerate(packages_raw, start=1): if not isinstance(item, dict): raise ValueError( f"Curated package at index {index} is not an object in {channel_path}" ) package_id = str(item.get("package_id") or "").strip().lower() title = str(item.get("title") or "").strip() zip_url = str(item.get("zip_url") or "").strip() description = item.get("description") description_text = ( description.strip() if isinstance(description, str) and description.strip() else None ) upstream_repo_url = item.get("upstream_repo_url") upstream_repo_url_text = ( upstream_repo_url.strip() if isinstance(upstream_repo_url, str) and upstream_repo_url.strip() else None ) homepage_url = item.get("homepage_url") homepage_url_text = ( homepage_url.strip() if isinstance(homepage_url, str) and homepage_url.strip() else None ) tags = _normalize_tag_list(item.get("tags")) versions = _parse_versions(item, zip_url, package_id) default_version = str(item.get("default_version") or "").strip() if not default_version: default_version = versions[0].version_id version_ids = {version.version_id for version in versions} if default_version not in version_ids: raise ValueError( f"Package '{package_id}' default_version '{default_version}' not in versions in {channel_path}" ) resolved_zip_url = zip_url if not resolved_zip_url: for version in versions: if version.version_id == default_version: resolved_zip_url = version.source_archive_url break if not package_id or not title or not resolved_zip_url: raise ValueError( f"Curated package at index {index} missing package_id/title/zip_url in {channel_path}" ) packages[package_id] = ChannelPackage( package_id=package_id, title=title, zip_url=resolved_zip_url, description=description_text, upstream_repo_url=upstream_repo_url_text, homepage_url=homepage_url_text, tags=tags, default_version=default_version, versions=versions, ) return packages
[docs] def normalize_channel_id(channel: str | None) -> str: """ Normalize and validate a curated channel identifier. Parameters ---------- channel : str | None Curated channel identifier used to resolve package definitions. Returns ------- str Normalized curated channel id. """ value = (channel or "skilled-scipkg").strip().lower() return CHANNEL_ALIASES.get(value, value)
[docs] def list_curated_packages(*, channel: str | None = None) -> dict[str, ChannelPackage]: """ Load curated package definitions for a channel. Parameters ---------- channel : str | None Curated channel identifier used to resolve package definitions. Returns ------- dict[str, ChannelPackage] Curated packages keyed by normalized package id. """ normalized_channel = normalize_channel_id(channel) channels = _available_channel_ids() if normalized_channel not in channels: valid = ", ".join(sorted(channels)) raise ValueError( f"Unknown channel '{normalized_channel}'. Available channels: {valid}" ) return _load_channel_packages(normalized_channel)
[docs] def resolve_curated_package( package_id: str, *, channel: str | None = None ) -> ChannelPackage: """ Resolve one curated package definition by package id. Parameters ---------- package_id : str Normalized package identifier. channel : str | None Curated channel identifier used to resolve package definitions. Returns ------- ChannelPackage Curated package definition for `package_id`. """ packages = list_curated_packages(channel=channel) normalized_channel = normalize_channel_id(channel) package_key = package_id.strip().lower() payload = packages.get(package_key) if payload is None: supported = ", ".join(sorted(packages.keys())) raise ValueError( f"Package '{package_key}' is not published in channel '{normalized_channel}'. Supported packages: {supported}" ) return payload