Source code for fermilink.runner.app

import asyncio
import json
import os
import shutil
import subprocess
import time
import uuid
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from pathlib import Path

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import PlainTextResponse, StreamingResponse
from pydantic import BaseModel, Field
from fermilink.agent_runtime import resolve_agent_runtime_policy
from fermilink.agents import get_default_agent_registry, get_provider_agent
from fermilink.cli.commands import workflows as workflow_commands
from fermilink.cli.workflow_prompts import UNIFIED_MEMORY_PROMPT_PREFIX
from fermilink.config import resolve_workspaces_root as resolve_default_workspaces_root
from fermilink.providers import (
    build_exec_command,
    provider_bin_env_key,
    resolve_provider_binary,
)
from fermilink.runner.admission import QueueFullError, RunAdmissionController
from fermilink.runner.scientific_packages import (
    PackageNotFoundError,
    PackageValidationError,
    bootstrap_legacy_maxwelllink_package,
    overlay_package_into_repo,
    resolve_scipkg_root,
    resolve_session_package,
)


[docs] def find_project_root(start: Path) -> Path: """ Find the repository root by walking upward from a start path. Parameters ---------- start : Path Starting path for upward project-root discovery. Returns ------- Path Detected repository/project root path. """ cur = start.resolve() for p in [cur.parent, *cur.parents]: if (p / "pyproject.toml").exists() or (p / ".git").exists(): return p # Installed wheel/sdist layouts may not include project markers. return Path.cwd()
PROJECT_ROOT = find_project_root(Path(__file__)) PACKAGE_SOFTWARE_ROOT = Path(__file__).resolve().parents[1] / "software" def _get_int_env(name: str, default: int, minimum: int | None = None) -> int: """Read integer environment variables with fallback bounds. Parameters ---------- name : str Environment variable name. default : int Fallback value when the variable is unset or invalid. minimum : int or None Optional minimum bound applied after parsing. Returns ------- int Parsed value clamped to `minimum` when provided. """ raw = os.getenv(name) if raw is None or raw == "": value = default else: try: value = int(raw) except ValueError: value = default if minimum is not None and value < minimum: return minimum return value DEFAULT_PROVIDER_BINARY_OVERRIDE = os.getenv("FERMILINK_CODEX_BIN", "codex") MAX_RUNTIME_SECONDS = int(os.getenv("FERMILINK_RUNNER_MAX_RUNTIME_SECONDS", "600")) MAX_PROMPT_CHARS = 10_000 RUNNER_GLOBAL_CONCURRENT_RUNS = _get_int_env( "FERMILINK_RUNNER_GLOBAL_CONCURRENT_RUNS", 200, minimum=1 ) RUNNER_PER_USER_CONCURRENT_RUNS = _get_int_env( "FERMILINK_RUNNER_PER_USER_CONCURRENT_RUNS", 10, minimum=1 ) RUNNER_MAX_QUEUE_SIZE = _get_int_env("FERMILINK_RUNNER_MAX_QUEUE_SIZE", 2000, minimum=0) RUNNER_MAX_PENDING_PER_USER = _get_int_env( "FERMILINK_RUNNER_MAX_PENDING_PER_USER", 10, minimum=0 ) RUNNER_METRICS_TOKEN = os.getenv("FERMILINK_RUNNER_METRICS_TOKEN", "").strip() RUNNER_SUBPROCESS_STREAM_LIMIT = _get_int_env( "FERMILINK_RUNNER_SUBPROCESS_STREAM_LIMIT", 1024 * 1024, minimum=64 * 1024 ) ALLOWED_REQUEST_SANDBOXES = {"read-only", "workspace-write"} RUN_ADMISSION_CONTROLLER = RunAdmissionController( global_limit=RUNNER_GLOBAL_CONCURRENT_RUNS, per_user_limit=RUNNER_PER_USER_CONCURRENT_RUNS, max_queue_size=RUNNER_MAX_QUEUE_SIZE, max_pending_per_user=RUNNER_MAX_PENDING_PER_USER, )
[docs] class RunRequest(BaseModel): """Payload accepted by the runner execution endpoint.""" session_id: str | None = None user_id: str | None = None user_prompt: str = Field(..., min_length=1) package_id: str | None = None sandbox: str | None = None provider: str | None = None
[docs] def verify_provider_bin() -> None: """Validate that the configured provider CLI binary is available. Raises ------ RuntimeError Raised when provider CLI binary cannot be resolved on `PATH`. """ policy = resolve_agent_runtime_policy() provider_bin = resolve_provider_binary( policy.provider, provider_bin_override=DEFAULT_PROVIDER_BINARY_OVERRIDE, ) if shutil.which(provider_bin) is None: env_key = provider_bin_env_key(policy.provider) raise RuntimeError( f"{policy.provider} CLI not found. Install it in the runner image or set " f"{env_key} to its path." )
@asynccontextmanager async def _lifespan(_app: FastAPI): verify_provider_bin() yield app = FastAPI(lifespan=_lifespan) @app.get("/ops/concurrency") async def ops_concurrency(request: Request) -> dict[str, object]: """Return current in-process concurrency/backpressure metrics as JSON.""" _ensure_metrics_access(request) return await _build_concurrency_metrics_payload() @app.get("/ops/concurrency.prom", response_class=PlainTextResponse) async def ops_concurrency_prometheus(request: Request) -> str: """Return current in-process concurrency metrics in Prometheus format.""" _ensure_metrics_access(request) payload = await _build_concurrency_metrics_payload() return _format_prometheus_metrics(payload) @app.get("/ops/admission") async def ops_admission( request: Request, user_id: str | None = None, session_id: str | None = None ) -> dict[str, object]: """Return per-user admission readiness snapshot. Parameters ---------- request : Request FastAPI request object. user_id : str or None, optional Authenticated user identifier when available. session_id : str or None, optional Session identifier fallback when `user_id` is not provided. Returns ------- dict[str, object] Admission counters and whether a run can start immediately. """ _ensure_metrics_access(request) fallback_session = (session_id or "").strip() or "anonymous" user_key = _resolve_run_user_key(user_id, fallback_session) snapshot = await RUN_ADMISSION_CONTROLLER.snapshot_for_user(user_key) payload: dict[str, object] = dict(snapshot) payload["timestamp_utc"] = _now_utc_iso() return payload
[docs] def sse(event: str, data: str | dict) -> str: """Format a server-sent event frame. Parameters ---------- event : str SSE event name. data : str or dict Event payload text or an object that will be JSON serialized. Returns ------- str Wire-format SSE frame terminated by a blank line. """ payload = data if isinstance(data, str) else json.dumps(data) return f"event: {event}\ndata: {payload}\n\n"
def _resolve_path(env_key: str, default: str) -> Path: """Resolve a path from an environment variable with fallback. Parameters ---------- env_key : str Environment variable name to read. default : str Fallback path when the variable is unset. Returns ------- Path Absolute/expanded path when env is set, otherwise the fallback path. """ raw = os.getenv(env_key) if raw: path = Path(raw).expanduser() if not path.is_absolute(): path = Path.cwd() / path return path return Path(default) def _now_utc_iso() -> str: """Return current UTC timestamp in ISO-8601 with trailing `Z`.""" return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") def _ensure_metrics_access(request: Request) -> None: """Enforce optional token guard for operational metrics endpoints. Metrics are public only when `RUNNER_METRICS_TOKEN` is unset. When set, callers must provide matching header `X-Runner-Metrics-Token`. """ if not RUNNER_METRICS_TOKEN: return presented = request.headers.get("X-Runner-Metrics-Token", "").strip() if not presented: raise HTTPException(status_code=401, detail="Missing X-Runner-Metrics-Token.") if presented != RUNNER_METRICS_TOKEN: raise HTTPException(status_code=403, detail="Invalid X-Runner-Metrics-Token.") async def _build_concurrency_metrics_payload() -> dict[str, object]: """Build current in-process runner concurrency metrics payload.""" snapshot = await RUN_ADMISSION_CONTROLLER.snapshot() active_total = int(snapshot["active_total"]) global_limit = int(snapshot["global_limit"]) pending_total = int(snapshot["pending_total"]) per_user_limit = int(snapshot["per_user_limit"]) queue_bounded = RUNNER_MAX_QUEUE_SIZE > 0 queue_capacity = RUNNER_MAX_QUEUE_SIZE if queue_bounded else None payload: dict[str, object] = { "active_total": active_total, "global_limit": global_limit, "global_utilization": active_total / global_limit if global_limit > 0 else 0.0, "pending_total": pending_total, "queue_bounded": queue_bounded, "queue_capacity": queue_capacity, "queue_utilization": ( pending_total / RUNNER_MAX_QUEUE_SIZE if queue_bounded else None ), "per_user_limit": per_user_limit, "timestamp_utc": _now_utc_iso(), } return payload def _format_prometheus_metrics(payload: dict[str, object]) -> str: """Render concurrency metrics payload in Prometheus exposition format.""" active_total = int(payload["active_total"]) global_limit = int(payload["global_limit"]) global_utilization = float(payload["global_utilization"]) pending_total = int(payload["pending_total"]) per_user_limit = int(payload["per_user_limit"]) queue_bounded = bool(payload["queue_bounded"]) queue_capacity = payload["queue_capacity"] queue_utilization = payload["queue_utilization"] lines = [ "# HELP runner_active_runs Active admitted runs in this runner process.", "# TYPE runner_active_runs gauge", f"runner_active_runs {active_total}", "# HELP runner_global_run_limit Configured global active-run limit.", "# TYPE runner_global_run_limit gauge", f"runner_global_run_limit {global_limit}", "# HELP runner_global_run_utilization Active/global limit ratio.", "# TYPE runner_global_run_utilization gauge", f"runner_global_run_utilization {global_utilization}", "# HELP runner_pending_runs Number of runs waiting in the admission queue.", "# TYPE runner_pending_runs gauge", f"runner_pending_runs {pending_total}", "# HELP runner_per_user_run_limit Configured per-user active-run limit.", "# TYPE runner_per_user_run_limit gauge", f"runner_per_user_run_limit {per_user_limit}", "# HELP runner_queue_bounded Whether queue capacity is bounded (1) or unbounded (0).", "# TYPE runner_queue_bounded gauge", f"runner_queue_bounded {1 if queue_bounded else 0}", ] if queue_bounded and isinstance(queue_capacity, int) and queue_capacity > 0: lines.extend( [ "# HELP runner_queue_capacity Configured max number of pending queued runs.", "# TYPE runner_queue_capacity gauge", f"runner_queue_capacity {queue_capacity}", "# HELP runner_queue_utilization Pending/queue capacity ratio.", "# TYPE runner_queue_utilization gauge", f"runner_queue_utilization {float(queue_utilization)}", ] ) return "\n".join(lines) + "\n" async def _iter_stream_lines( stream: asyncio.StreamReader, *, chunk_size: int = 64 * 1024 ): """Yield logical lines from a subprocess stream without line-length limits.""" pending = bytearray() while True: chunk = await stream.read(chunk_size) if not chunk: if pending: yield bytes(pending) break pending.extend(chunk) while True: newline_index = pending.find(b"\n") if newline_index < 0: break line = bytes(pending[: newline_index + 1]) del pending[: newline_index + 1] yield line def _signal_process_safely( process: asyncio.subprocess.Process, signal_name: str ) -> None: """Best-effort subprocess signaling that ignores already-exited races.""" signal_fn = getattr(process, signal_name, None) if not callable(signal_fn): return try: signal_fn() except (OSError, ProcessLookupError): return def _ensure_dir(path: Path) -> Path: """Create a directory tree if needed and return it. Parameters ---------- path : Path Directory to ensure. Returns ------- Path The same input path. """ path.mkdir(parents=True, exist_ok=True) return path def _ensure_dir_safe(path: Path) -> Path | None: """Attempt to create a directory tree without raising on OS errors. Parameters ---------- path : Path Directory to create. Returns ------- Path or None The path when creation succeeds, otherwise `None`. """ try: path.mkdir(parents=True, exist_ok=True) return path except OSError: return None def _resolve_workspaces_root() -> Path: """Resolve a writable workspace root for session repositories. Returns ------- Path Configured workspace root, or a local fallback under the current working directory. """ try: return _ensure_dir(resolve_default_workspaces_root()) except OSError: # Fall back to a local workspace root when the configured global root # cannot be created (e.g., permission-restricted filesystem). fallback = Path.cwd() / "workspaces" return _ensure_dir(fallback) def _resolve_source_dir() -> Path: """Resolve the software template source directory. Returns ------- Path The configured software root when present, otherwise the repository `software/` directory. """ source = _resolve_path("FERMILINK_SOFTWARE_ROOT", "/opt/software") if source.exists(): return source if PACKAGE_SOFTWARE_ROOT.exists(): return PACKAGE_SOFTWARE_ROOT return PROJECT_ROOT / "software" def _resolve_template_agents_path(source_dir: Path) -> Path | None: """Find the AGENTS template file used for workspace repos. Parameters ---------- source_dir : Path Resolved software source directory. Returns ------- Path or None First matching `AGENTS.md` candidate, else `None`. """ candidates = [ PACKAGE_SOFTWARE_ROOT / "AGENTS.md", PROJECT_ROOT / "software" / "AGENTS.md", source_dir / "AGENTS.md", ] for candidate in candidates: if candidate.is_file(): return candidate return None def _sync_provider_agent_md_alias(repo_dir: Path) -> None: """Provision only the active provider alias for ``AGENTS.md`` in the repo. The active provider is read from the current runtime policy. Inactive provider alias symlinks are removed so the workspace reflects the current agent selection, while real files are left untouched. """ policy = resolve_agent_runtime_policy() active_provider = policy.provider for agent in get_default_agent_registry().all(): if agent.provider_id() == active_provider: continue agent.remove_workspace_instruction_alias_symlink(repo_dir) get_provider_agent(active_provider).ensure_workspace_instruction_alias(repo_dir) def _ensure_template_agents_file(source_dir: Path, repo_dir: Path) -> None: """Synchronize `AGENTS.md` into the workspace repository root. Also ensures only the active provider alias (currently ``CLAUDE.md`` for Claude or ``GEMINI.md`` for Gemini) points to ``AGENTS.md`` so provider-native instruction discovery follows the same policy contract. Parameters ---------- source_dir : Path Software source directory used to find the template. repo_dir : Path Workspace repository path receiving the file. Returns ------- None This function updates files in place when needed. """ template_agents = _resolve_template_agents_path(source_dir) if template_agents is None: return repo_agents = repo_dir / "AGENTS.md" if repo_agents.is_dir(): shutil.rmtree(repo_agents, ignore_errors=True) if repo_agents.exists(): return else: try: repo_agents.unlink(missing_ok=True) except OSError: return shutil.copy2(template_agents, repo_agents) _sync_provider_agent_md_alias(repo_dir) def _ensure_repo_memory_file(repo_dir: Path, user_prompt: str) -> None: """Ensure shared memory file exists and matches current schema.""" normalized_prompt = str(user_prompt or "") if normalized_prompt.startswith(UNIFIED_MEMORY_PROMPT_PREFIX): normalized_prompt = normalized_prompt[ len(UNIFIED_MEMORY_PROMPT_PREFIX) : ].lstrip() if not normalized_prompt.strip(): normalized_prompt = "(request unavailable)" try: workflow_commands._ensure_loop_memory( repo_dir=repo_dir, user_prompt=normalized_prompt, prompt_file=None, overwrite=False, ) except Exception as exc: # defensive: convert any memory init failure to HTTP 500 raise HTTPException( status_code=500, detail=f"Failed to initialize workspace memory file: {exc}", ) from exc def _is_valid_git_repo(path: Path) -> bool: """Check whether a directory contains a usable Git repository. Parameters ---------- path : Path Repository candidate directory. Returns ------- bool `True` when `.git` is either a valid directory repo or gitdir pointer. """ git_entry = path / ".git" if git_entry.is_dir(): return (git_entry / "HEAD").exists() if git_entry.is_file(): try: content = git_entry.read_text(encoding="utf-8", errors="replace").strip() except OSError: return False return content.startswith("gitdir:") return False def _ensure_git_repo(path: Path) -> None: """Ensure a workspace directory is initialized as a Git repository. Parameters ---------- path : Path Repository directory to validate or initialize. Raises ------ RuntimeError Raised if Git is unavailable or repository initialization fails. """ if _is_valid_git_repo(path): return git_bin = shutil.which("git") if not git_bin: raise RuntimeError( f"Git is required but not found on PATH, and {path} is not a valid git repo." ) proc = subprocess.run( [git_bin, "init", "-q"], cwd=str(path), text=True, capture_output=True, check=False, ) if proc.returncode != 0: detail = (proc.stderr or proc.stdout or "").strip() or "unknown error" raise RuntimeError(f"Failed to initialize git repo at {path}: {detail}") if not _is_valid_git_repo(path): raise RuntimeError(f"Failed to initialize a valid git repo at {path}") def _sanitize_env(env: dict) -> dict: """Remove disallowed or placeholder API keys from process environment. Parameters ---------- env : dict Environment mapping copied for the provider subprocess. Returns ------- dict Sanitized environment mapping. """ return get_provider_agent("codex").sanitize_process_env(env) def _normalize_provider_home(env: dict, provider: str) -> dict: """Normalize any provider-specific writable home paths.""" return get_provider_agent(provider).normalize_process_home(env) def _resolve_run_user_key(user_id: str | None, session_id: str) -> str: """Resolve runner admission identity for per-user concurrency tracking. Parameters ---------- user_id : str or None User identifier from the web service. session_id : str Session id associated with this run. Returns ------- str Stable key for per-user run-limiting. Falls back to per-session key for unauthenticated requests. """ cleaned = (user_id or "").strip().lower() if cleaned: return f"user:{cleaned}" return f"session:{session_id}" def _resolve_run_policy( req: RunRequest, ) -> tuple[str, str, str | None, str | None, str | None]: """Resolve effective provider and sandbox policy for one run request.""" policy = resolve_agent_runtime_policy() provider = policy.provider if isinstance(req.provider, str) and req.provider.strip(): requested_provider = req.provider.strip().lower() if requested_provider != provider: # Provider is admin-controlled via policy/config. _ = requested_provider sandbox_policy = policy.sandbox_policy sandbox_mode: str | None = ( policy.sandbox_mode if sandbox_policy == "enforce" else None ) requested_sandbox = (req.sandbox or "").strip() if sandbox_policy == "enforce" and requested_sandbox: lowered = requested_sandbox.lower() if lowered in ALLOWED_REQUEST_SANDBOXES: if lowered == "read-only" or lowered == policy.sandbox_mode: sandbox_mode = lowered model = policy.model reasoning_effort = policy.reasoning_effort return provider, sandbox_policy, sandbox_mode, model, reasoning_effort async def _read_stream( stream: asyncio.StreamReader, event_type: str, queue: asyncio.Queue ) -> None: """Read subprocess output lines and forward them into an event queue. Parameters ---------- stream : asyncio.StreamReader Stream to consume (`stdout` or `stderr`). event_type : str Event type label pushed to the queue. queue : asyncio.Queue Queue receiving `(event_type, payload)` tuples. Returns ------- None A sentinel `_reader_done` marker is always pushed before returning. """ try: async for line in _iter_stream_lines(stream): text = line.decode(errors="replace").rstrip("\n") if not text: continue if event_type in {"agent", "codex"}: payload = text else: payload = json.dumps({"text": text}) await queue.put((event_type, payload)) finally: await queue.put(("_reader_done", event_type)) @app.post("/run") async def run(req: RunRequest): """Execute one provider run inside a session workspace and stream SSE events. Parameters ---------- req : RunRequest Request payload with session id, prompt text, package selection, and sandbox mode. Returns ------- StreamingResponse Event-stream response containing runner metadata, provider output, logs, and a final exit event. Raises ------ HTTPException Raised for invalid input, package resolution errors, or overlay errors. """ if len(req.user_prompt) > MAX_PROMPT_CHARS: raise HTTPException(status_code=413, detail="Prompt too long.") session_id = req.session_id or uuid.uuid4().hex user_key = _resolve_run_user_key(req.user_id, session_id) try: await RUN_ADMISSION_CONTROLLER.acquire(user_key) except QueueFullError as exc: raise HTTPException(status_code=429, detail=str(exc)) from exc slot_released = False async def _release_slot_once() -> None: """Release one admission slot only once for this request.""" nonlocal slot_released if slot_released: return slot_released = True await RUN_ADMISSION_CONTROLLER.release(user_key) try: workspaces_root = _resolve_workspaces_root() workspace_root = workspaces_root / session_id repo_dir = workspace_root / "repo" source_dir = _resolve_source_dir() scipkg_root = resolve_scipkg_root() workspace_root.mkdir(parents=True, exist_ok=True) bootstrap_legacy_maxwelllink_package(scipkg_root) created_repo = False if not repo_dir.exists(): created_repo = True shutil.copytree(source_dir, repo_dir, dirs_exist_ok=True) _ensure_template_agents_file(source_dir, repo_dir) _ensure_git_repo(repo_dir) try: package_id, package_meta = resolve_session_package( scipkg_root=scipkg_root, workspace_root=workspace_root, requested_package_id=req.package_id, ) except PackageNotFoundError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc except PackageValidationError as exc: status = 400 if req.package_id else 500 raise HTTPException(status_code=status, detail=str(exc)) from exc package_overlay: dict | None = None if package_id and package_meta: try: package_overlay = overlay_package_into_repo( repo_dir=repo_dir, workspace_root=workspace_root, package_id=package_id, package_meta=package_meta, scipkg_root=scipkg_root, allow_replace_existing=created_repo, ) except PackageValidationError as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc # Enforce template AGENTS.md even if a stale overlay path attempts to replace it. _ensure_template_agents_file(source_dir, repo_dir) _ensure_repo_memory_file(repo_dir, req.user_prompt) (repo_dir / "outputs").mkdir(parents=True, exist_ok=True) provider, sandbox_policy, sandbox_mode, model, reasoning_effort = ( _resolve_run_policy(req) ) provider_bin = resolve_provider_binary( provider, provider_bin_override=DEFAULT_PROVIDER_BINARY_OVERRIDE, ) try: cmd = build_exec_command( provider=provider, provider_bin=provider_bin, repo_dir=repo_dir, prompt=req.user_prompt, sandbox_policy=sandbox_policy, sandbox_mode=sandbox_mode, model=model, reasoning_effort=reasoning_effort, json_output=True, ) except NotImplementedError as exc: raise HTTPException(status_code=501, detail=str(exc)) from exc env = os.environ.copy() env = _sanitize_env(env) env = _normalize_provider_home(env, provider) process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, cwd=str(repo_dir), limit=RUNNER_SUBPROCESS_STREAM_LIMIT, ) queue: asyncio.Queue = asyncio.Queue() stdout_task = asyncio.create_task(_read_stream(process.stdout, "agent", queue)) stderr_task = asyncio.create_task(_read_stream(process.stderr, "log", queue)) wait_task = asyncio.create_task(process.wait()) except asyncio.CancelledError: await _release_slot_once() raise except Exception: await _release_slot_once() raise async def event_generator(): """Yield normalized SSE messages for the current subprocess lifecycle. Yields ------ str SSE frames containing metadata, command/log output, and terminal exit status. """ start = time.monotonic() readers_done = 0 timed_out = False try: meta_payload: dict[str, object] = {"session_id": session_id} if package_overlay: meta_payload["package"] = package_overlay meta_payload["agent"] = { "provider": provider, "sandbox_policy": sandbox_policy, "sandbox_mode": sandbox_mode, "model": model, "reasoning_effort": reasoning_effort, } yield sse("meta", meta_payload) while True: if time.monotonic() - start > MAX_RUNTIME_SECONDS: timed_out = True if process.returncode is None: _signal_process_safely(process, "terminate") break try: event_type, payload = await asyncio.wait_for( queue.get(), timeout=0.1 ) except asyncio.TimeoutError: event_type = None payload = None if event_type == "_reader_done": readers_done += 1 elif event_type: yield sse(event_type, payload) if wait_task.done() and readers_done >= 2 and queue.empty(): break if timed_out and not wait_task.done(): try: await asyncio.wait_for(wait_task, timeout=5) except asyncio.TimeoutError: _signal_process_safely(process, "kill") try: await asyncio.wait_for(wait_task, timeout=5) except asyncio.TimeoutError: pass reason = "timeout" if timed_out else "completed" yield sse( "runner.exit", {"reason": reason, "return_code": process.returncode} ) finally: try: if process.returncode is None: _signal_process_safely(process, "terminate") try: await asyncio.wait_for(wait_task, timeout=5) except asyncio.TimeoutError: _signal_process_safely(process, "kill") try: await asyncio.wait_for(wait_task, timeout=5) except asyncio.TimeoutError: pass finally: for task in (stdout_task, stderr_task, wait_task): if not task.done(): task.cancel() # Ensure admission slot release still completes under cancellation. release_task = asyncio.create_task(_release_slot_once()) try: await release_task except asyncio.CancelledError: await release_task raise headers = { "Cache-Control": "no-cache", "X-Accel-Buffering": "no", } return StreamingResponse( event_generator(), media_type="text/event-stream", headers=headers )