Source code for fermilink.services

from __future__ import annotations

import json
import os
import shlex
import signal
import socket
import subprocess
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from fermilink.agent_runtime import (
    ENV_MODEL,
    ENV_PROVIDER,
    ENV_REASONING_EFFORT,
    ENV_SANDBOX_MODE,
    ENV_SANDBOX_POLICY,
    load_agent_runtime_policy,
)
from fermilink.config import resolve_fermilink_home
from fermilink.providers import collect_provider_service_env_overrides


[docs] @dataclass(frozen=True) class ServiceSpec: name: str command: list[str] env: dict[str, str]
def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _pid_is_alive(pid: int) -> bool: if pid <= 0: return False # Reap child zombies when this process is their parent. try: waited_pid, _ = os.waitpid(pid, os.WNOHANG) except ChildProcessError: waited_pid = 0 if waited_pid == pid: return False try: os.kill(pid, 0) except ProcessLookupError: return False except PermissionError: return True else: return True def _state_path(runtime_root: Path, service: str) -> Path: return runtime_root / f"{service}.json" def _read_state(runtime_root: Path, service: str) -> dict[str, Any] | None: path = _state_path(runtime_root, service) if not path.exists(): return None try: with path.open("r", encoding="utf-8") as handle: payload = json.load(handle) except (OSError, json.JSONDecodeError): return None if isinstance(payload, dict): return payload return None def _write_state(runtime_root: Path, service: str, payload: dict[str, Any]) -> None: runtime_root.mkdir(parents=True, exist_ok=True) path = _state_path(runtime_root, service) temp_path = path.with_suffix(path.suffix + ".tmp") with temp_path.open("w", encoding="utf-8") as handle: json.dump(payload, handle, indent=2, sort_keys=True) handle.write("\n") temp_path.replace(path) def _remove_state(runtime_root: Path, service: str) -> None: _state_path(runtime_root, service).unlink(missing_ok=True) def _extract_flag_value(command: list[str], flag: str) -> str | None: for index, token in enumerate(command): if token == flag: if index + 1 < len(command): return command[index + 1] return None if token.startswith(flag + "="): return token.split("=", 1)[1] return None def _extract_host_port(command: list[str]) -> tuple[str, int] | None: host_raw = _extract_flag_value(command, "--host") port_raw = _extract_flag_value(command, "--port") if not port_raw: return None try: port = int(port_raw) except ValueError: return None host = (host_raw or "127.0.0.1").strip() or "127.0.0.1" return host, port def _probe_host_for_connect(host: str) -> str: lowered = host.strip().lower() if lowered in {"0.0.0.0", "::", "*"}: return "127.0.0.1" return host def _is_port_in_use(host: str, port: int) -> bool: probe_host = _probe_host_for_connect(host) with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: sock.settimeout(0.2) try: result = sock.connect_ex((probe_host, int(port))) except OSError: return False return result == 0 def _read_log_tail(path: Path, *, max_lines: int = 30) -> list[str]: if not path.exists(): return [] try: lines = path.read_text(encoding="utf-8", errors="replace").splitlines() except OSError: return [] if max_lines <= 0: return [] return lines[-max_lines:] def _kill_pid(pid: int, *, grace_seconds: float = 8.0) -> tuple[bool, str]: if not _pid_is_alive(pid): return True, "not_running" os.kill(pid, signal.SIGTERM) deadline = time.monotonic() + max(grace_seconds, 0.0) while time.monotonic() < deadline: if not _pid_is_alive(pid): return True, "terminated" time.sleep(0.1) try: os.kill(pid, signal.SIGKILL) except ProcessLookupError: return True, "terminated" deadline = time.monotonic() + 3.0 while time.monotonic() < deadline: if not _pid_is_alive(pid): return True, "killed" time.sleep(0.1) return False, "still_running" def _command_from_env(value: str | None) -> list[str] | None: if value is None: return None stripped = value.strip() if not stripped: return None return shlex.split(stripped)
[docs] def default_service_specs(*, web_app_path: Path) -> dict[str, ServiceSpec]: """ Build default service specifications for FermiLink components. Parameters ---------- web_app_path : Path Path to the web application module entry file. Returns ------- dict[str, ServiceSpec] Mapping from component name to `ServiceSpec`. """ runner_cmd = _command_from_env(os.getenv("FERMILINK_RUNNER_CMD")) or [ "uvicorn", "fermilink.runner.app:app", "--host", "0.0.0.0", "--port", "8000", ] web_cmd = _command_from_env(os.getenv("FERMILINK_WEB_CMD")) or [ "chainlit", "run", str(web_app_path), "--host", "0.0.0.0", "--port", "7860", ] runner_url = os.getenv("FERMILINK_RUNNER_URL", "http://127.0.0.1:8000") chainlit_app_root_raw = os.getenv("FERMILINK_CHAINLIT_APP_ROOT") if not chainlit_app_root_raw or not chainlit_app_root_raw.strip(): chainlit_app_root_raw = str(resolve_fermilink_home()) chainlit_app_root = Path(chainlit_app_root_raw).expanduser() if not chainlit_app_root.is_absolute(): chainlit_app_root = (Path.cwd() / chainlit_app_root).resolve() scipkg_root_raw = os.getenv( "FERMILINK_SCIPKG_ROOT", str(chainlit_app_root / "scientific_packages") ) scipkg_root = Path(scipkg_root_raw).expanduser() if not scipkg_root.is_absolute(): scipkg_root = (Path.cwd() / scipkg_root).resolve() workspaces_root_raw = os.getenv( "FERMILINK_WORKSPACES_ROOT", str(chainlit_app_root / "workspaces") ) workspaces_root = Path(workspaces_root_raw).expanduser() if not workspaces_root.is_absolute(): workspaces_root = (Path.cwd() / workspaces_root).resolve() runner_env = { "FERMILINK_SCIPKG_ROOT": str(scipkg_root), "FERMILINK_WORKSPACES_ROOT": str(workspaces_root), } web_env = { "FERMILINK_RUNNER_URL": runner_url, "FERMILINK_CHAINLIT_APP_ROOT": str(chainlit_app_root), "FERMILINK_SCIPKG_ROOT": str(scipkg_root), "FERMILINK_WORKSPACES_ROOT": str(workspaces_root), } provider_service_env = collect_provider_service_env_overrides(cwd=Path.cwd()) if provider_service_env: runner_env.update(provider_service_env) web_env.update(provider_service_env) runtime_policy = load_agent_runtime_policy() if ENV_PROVIDER not in os.environ: runner_env[ENV_PROVIDER] = runtime_policy.provider web_env[ENV_PROVIDER] = runtime_policy.provider if ENV_SANDBOX_POLICY not in os.environ: runner_env[ENV_SANDBOX_POLICY] = runtime_policy.sandbox_policy web_env[ENV_SANDBOX_POLICY] = runtime_policy.sandbox_policy if ENV_SANDBOX_MODE not in os.environ: runner_env[ENV_SANDBOX_MODE] = runtime_policy.sandbox_mode web_env[ENV_SANDBOX_MODE] = runtime_policy.sandbox_mode if ( ENV_MODEL not in os.environ and isinstance(runtime_policy.model, str) and runtime_policy.model ): runner_env[ENV_MODEL] = runtime_policy.model web_env[ENV_MODEL] = runtime_policy.model if ( ENV_REASONING_EFFORT not in os.environ and isinstance(runtime_policy.reasoning_effort, str) and runtime_policy.reasoning_effort ): runner_env[ENV_REASONING_EFFORT] = runtime_policy.reasoning_effort web_env[ENV_REASONING_EFFORT] = runtime_policy.reasoning_effort return { "runner": ServiceSpec( name="runner", command=runner_cmd, env=runner_env, ), "web": ServiceSpec( name="web", command=web_cmd, env=web_env, ), }
[docs] def normalize_components(values: list[str] | None) -> list[str]: """ Normalize requested service component names. Parameters ---------- values : list[str] | None Requested component names from CLI input. Returns ------- list[str] Normalized component list with canonical names. """ if not values: return ["runner", "web"] normalized: list[str] = [] seen: set[str] = set() for value in values: item = value.strip().lower() if not item: continue if item not in {"runner", "web"}: raise ValueError(f"Unknown component '{item}'. Valid: runner, web") if item in seen: continue seen.add(item) normalized.append(item) return normalized or ["runner", "web"]
[docs] def service_status(runtime_root: Path, service: str) -> dict[str, Any]: """ Collect runtime status for a managed service component. Parameters ---------- runtime_root : Path Runtime root used for service pid/status/log files. service : str Managed service component name. Returns ------- dict[str, Any] Service status payload including pid, health, and metadata. """ state = _read_state(runtime_root, service) if not isinstance(state, dict): return {"service": service, "running": False, "reason": "no_state"} pid = state.get("pid") if not isinstance(pid, int): return {"service": service, "running": False, "reason": "invalid_state"} if not _pid_is_alive(pid): return { "service": service, "running": False, "reason": "stale_pid", "pid": pid, "started_at": state.get("started_at"), } return { "service": service, "running": True, "pid": pid, "command": state.get("command"), "started_at": state.get("started_at"), "log_path": state.get("log_path"), }
[docs] def start_service(runtime_root: Path, spec: ServiceSpec) -> dict[str, Any]: """ Start a managed service process and persist runtime metadata. Parameters ---------- runtime_root : Path Runtime root used for service pid/status/log files. spec : ServiceSpec Service specification describing command and runtime paths. Returns ------- dict[str, Any] Start result payload including final status and process metadata. """ status = service_status(runtime_root, spec.name) if status.get("running"): return { "service": spec.name, "action": "start", "changed": False, "status": "already_running", "pid": status.get("pid"), "log_path": status.get("log_path"), } if status.get("reason") == "stale_pid": _remove_state(runtime_root, spec.name) host_port = _extract_host_port(spec.command) if host_port is not None: host, port = host_port if _is_port_in_use(host, port): return { "service": spec.name, "action": "start", "changed": False, "status": "port_in_use", "host": host, "port": port, "hint": ( f"Port {port} is already in use. Stop the conflicting process " "or override command via FERMILINK_RUNNER_CMD/FERMILINK_WEB_CMD." ), } logs_root = runtime_root / "logs" logs_root.mkdir(parents=True, exist_ok=True) log_path = logs_root / f"{spec.name}.log" merged_env = os.environ.copy() merged_env.update(spec.env) with log_path.open("ab") as log_handle: process = subprocess.Popen( # noqa: S603 spec.command, cwd=Path.cwd(), env=merged_env, stdout=log_handle, stderr=log_handle, start_new_session=True, ) state = { "service": spec.name, "pid": process.pid, "command": spec.command, "started_at": _now_iso(), "log_path": str(log_path), } _write_state(runtime_root, spec.name, state) # Catch immediate startup failures (e.g., bind errors) before reporting success. deadline = time.monotonic() + 2.0 exit_code: int | None = None while time.monotonic() < deadline: polled = process.poll() if polled is not None: exit_code = int(polled) break time.sleep(0.05) if exit_code is not None: _remove_state(runtime_root, spec.name) tail = _read_log_tail(log_path, max_lines=30) return { "service": spec.name, "action": "start", "changed": False, "status": "failed_to_start", "exit_code": exit_code, "command": spec.command, "log_path": str(log_path), "log_tail": tail, } return { "service": spec.name, "action": "start", "changed": True, "status": "started", "pid": process.pid, "command": spec.command, "log_path": str(log_path), }
[docs] def stop_service(runtime_root: Path, service: str) -> dict[str, Any]: """ Stop a managed service process and update runtime metadata. Parameters ---------- runtime_root : Path Runtime root used for service pid/status/log files. service : str Managed service component name. Returns ------- dict[str, Any] Stop result payload including final status and cleanup metadata. """ state = _read_state(runtime_root, service) if not isinstance(state, dict): return { "service": service, "action": "stop", "changed": False, "status": "not_running", "reason": "no_state", } pid = state.get("pid") if not isinstance(pid, int): _remove_state(runtime_root, service) return { "service": service, "action": "stop", "changed": True, "status": "not_running", "reason": "invalid_state", } stopped, stop_mode = _kill_pid(pid) if stopped: _remove_state(runtime_root, service) return { "service": service, "action": "stop", "changed": bool(stopped), "status": "stopped" if stopped else "error", "pid": pid, "reason": stop_mode, }