Source code for fermilink.router_rules

from __future__ import annotations

import functools
import json
from pathlib import Path
from typing import Any

from fermilink.packages.package_registry import load_registry, normalize_package_id


DEFAULT_ROUTER_RULES_FILENAME = "router_rules.json"
FAMILY_HINTS_PATH = (
    Path(__file__).resolve().parent / "data" / "router" / "family_hints.json"
)


[docs] def dedupe(items: list[str]) -> list[str]: """ Remove duplicate terms while preserving first-seen order. Parameters ---------- items : list[str] Term list to deduplicate. Returns ------- list[str] De-duplicated terms in stable order. """ seen: set[str] = set() result: list[str] = [] for item in items: if not isinstance(item, str): continue lowered = item.strip().lower() if not lowered or lowered in seen: continue seen.add(lowered) result.append(lowered) return result
[docs] def normalize_terms(raw: Any) -> list[str]: """ Normalize router rule terms into a clean lowercase list. Parameters ---------- raw : Any Raw value from user input or configuration. Returns ------- list[str] Normalized term list suitable for router rule matching. """ if isinstance(raw, str): return dedupe(raw.split(",")) if isinstance(raw, list): return dedupe([item for item in raw if isinstance(item, str)]) return []
[docs] def package_id_terms(package_id: str) -> list[str]: """ Generate default routing terms derived from a package identifier. Parameters ---------- package_id : str Normalized package identifier. Returns ------- list[str] Terms inferred from `package_id` tokens. """ lowered = package_id.lower() terms = [lowered, lowered.replace("-", " "), lowered.replace("_", " ")] parts: list[str] = [] for chunk in lowered.replace("_", "-").split("-"): if chunk and len(chunk) >= 4 and not chunk.isdigit(): parts.append(chunk) terms.extend(parts) return dedupe(terms)
[docs] @functools.lru_cache(maxsize=1) def load_family_hints() -> dict[str, dict[str, list[str] | str]]: """ Load bundled package-family hint terms for router rule generation. Returns ------- dict[str, dict[str, list[str] | str]] Normalized family-hints payload keyed by family id. """ try: payload = json.loads(FAMILY_HINTS_PATH.read_text(encoding="utf-8")) except (OSError, json.JSONDecodeError) as exc: raise ValueError( f"Invalid router family hints file: {FAMILY_HINTS_PATH}: {exc}" ) from exc if not isinstance(payload, dict): raise ValueError( f"Family hints payload must be a JSON object: {FAMILY_HINTS_PATH}" ) schema_version = payload.get("schema_version") legacy_version = payload.get("version") if schema_version is not None and not isinstance(schema_version, int): raise ValueError( f"Family hints payload has invalid schema_version: {FAMILY_HINTS_PATH}" ) if legacy_version is not None and not isinstance(legacy_version, int): raise ValueError( f"Family hints payload has invalid version: {FAMILY_HINTS_PATH}" ) families_raw = payload.get("families") if not isinstance(families_raw, dict): raise ValueError( f"Family hints payload missing `families` map: {FAMILY_HINTS_PATH}" ) parsed: dict[str, dict[str, list[str] | str]] = {} for family, raw_terms in families_raw.items(): family_id = str(family).strip().lower() if not family_id: continue if not isinstance(raw_terms, dict): continue description_raw = raw_terms.get("description") description = ( description_raw.strip() if isinstance(description_raw, str) and description_raw.strip() else f"Routing hints for {family_id} workflows." ) parsed[family_id] = { "description": description, "strong_keywords": normalize_terms(raw_terms.get("strong_keywords")), "keywords": normalize_terms(raw_terms.get("keywords")), "negative_keywords": normalize_terms(raw_terms.get("negative_keywords")), "package_id_overrides": normalize_terms( raw_terms.get("package_id_overrides") ), } return parsed
[docs] def infer_rule(package_id: str) -> dict[str, list[str]]: """ Infer include/exclude router terms for a package identifier. Parameters ---------- package_id : str Normalized package identifier. Returns ------- dict[str, list[str]] Rule fragment with inferred `include` and `exclude` terms. """ keywords = package_id_terms(package_id) strong_keywords: list[str] = [] negative_keywords: list[str] = [] lowered = package_id.lower() for family, payload in load_family_hints().items(): package_id_overrides = payload.get("package_id_overrides") override_ids = ( [item for item in package_id_overrides if isinstance(item, str)] if isinstance(package_id_overrides, list) else [] ) if family not in lowered and lowered not in override_ids: continue strong_keywords.extend(payload.get("strong_keywords", [])) keywords.extend(payload.get("keywords", [])) negative_keywords.extend(payload.get("negative_keywords", [])) return { "strong_keywords": dedupe(strong_keywords), "keywords": dedupe(keywords), "negative_keywords": dedupe(negative_keywords), }
def _load_json(path: Path) -> Any: if not path.exists(): return None with path.open("r", encoding="utf-8") as handle: return json.load(handle) def _write_json(path: Path, payload: dict[str, Any]) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2) handle.write("\n")
[docs] def build_synced_rules( registry: dict[str, Any], existing_rules: dict[str, Any] | None, *, default_package_id: str | None = None, min_score: int | None = None, min_margin: int | None = None, ) -> tuple[dict[str, Any], dict[str, Any]]: """ Build merged router rules from registry state and existing rules. Parameters ---------- registry : dict[str, Any] Loaded package registry payload. existing_rules : dict[str, Any] | None Previously loaded router rules payload, if available. default_package_id : str | None Optional default package id to set in router rules. min_score : int | None Optional minimum routing score threshold override. min_margin : int | None Optional minimum routing margin threshold override. Returns ------- tuple[dict[str, Any], dict[str, Any]] Tuple of `(rules_payload, summary)` describing synced router state. """ packages_raw = registry.get("packages", {}) installed_ids: list[str] = [] if isinstance(packages_raw, dict): for raw_id in packages_raw.keys(): try: installed_ids.append(normalize_package_id(str(raw_id))) except Exception: continue installed_ids = sorted(set(installed_ids)) installed_set = set(installed_ids) existing = existing_rules if isinstance(existing_rules, dict) else {} existing_packages = existing.get("packages", {}) if not isinstance(existing_packages, dict): existing_packages = {} synced_packages: dict[str, Any] = {} added: list[str] = [] preserved: list[str] = [] for package_id in installed_ids: raw_rule = existing_packages.get(package_id) if isinstance(raw_rule, dict): synced_packages[package_id] = { "strong_keywords": normalize_terms(raw_rule.get("strong_keywords")), "keywords": normalize_terms(raw_rule.get("keywords")), "negative_keywords": normalize_terms(raw_rule.get("negative_keywords")), } preserved.append(package_id) else: synced_packages[package_id] = infer_rule(package_id) added.append(package_id) removed = sorted( package_id for package_id in existing_packages.keys() if package_id not in installed_set ) active_raw = registry.get("active_package") if isinstance(active_raw, str): try: active_id = normalize_package_id(active_raw) except Exception: active_id = None else: active_id = None if active_id not in installed_set: active_id = None existing_default_raw = existing.get("default_package_id") if isinstance(existing_default_raw, str): try: existing_default = normalize_package_id(existing_default_raw) except Exception: existing_default = None else: existing_default = None if default_package_id is not None: try: chosen_default = normalize_package_id(default_package_id) except Exception: chosen_default = None elif existing_default in installed_set: chosen_default = existing_default elif active_id in installed_set: chosen_default = active_id else: chosen_default = installed_ids[0] if installed_ids else None if chosen_default not in installed_set: chosen_default = None existing_min_score = existing.get("min_score") existing_min_margin = existing.get("min_margin") final_min_score = ( min_score if isinstance(min_score, int) else (existing_min_score if isinstance(existing_min_score, int) else 2) ) final_min_margin = ( min_margin if isinstance(min_margin, int) else (existing_min_margin if isinstance(existing_min_margin, int) else 1) ) payload = { "default_package_id": chosen_default, "min_score": final_min_score, "min_margin": final_min_margin, "packages": synced_packages, } summary = { "installed_count": len(installed_ids), "added_rules": added, "preserved_rules": preserved, "removed_rules": removed, "default_package_id": chosen_default, } return payload, summary
[docs] def sync_router_rules( scipkg_root: Path, *, router_rules_filename: str = DEFAULT_ROUTER_RULES_FILENAME, default_package_id: str | None = None, min_score: int | None = None, min_margin: int | None = None, dry_run: bool = False, ) -> dict[str, Any]: """ Synchronize router rules on disk with installed package metadata. Parameters ---------- scipkg_root : Path Scientific package root containing registry and package files. router_rules_filename : str Router rules filename relative to `scipkg_root`. default_package_id : str | None Optional default package id to set in router rules. min_score : int | None Optional minimum routing score threshold override. min_margin : int | None Optional minimum routing margin threshold override. dry_run : bool When `True`, compute sync results without writing files. Returns ------- dict[str, Any] Summary payload describing sync changes and output location. """ registry = load_registry(scipkg_root) router_rules_path = scipkg_root / router_rules_filename try: existing_rules = _load_json(router_rules_path) except Exception: existing_rules = None payload, summary = build_synced_rules( registry=registry, existing_rules=existing_rules if isinstance(existing_rules, dict) else None, default_package_id=default_package_id, min_score=min_score, min_margin=min_margin, ) if not dry_run: _write_json(router_rules_path, payload) return { "scipkg_root": str(scipkg_root), "router_rules_path": str(router_rules_path), "dry_run": dry_run, "summary": summary, "payload": payload, }