Source code for fermilink.agents.base

from __future__ import annotations

import json
import os
import re
import shutil
from abc import ABC, abstractmethod
from pathlib import Path

from fermilink.agent_runtime import DEFAULT_SANDBOX_POLICY


# ANSI color palette for provider stream rendering.
_ANSI = {
    "reset": "\033[0m",
    "thinking": "\033[3;37m",
    "tool_label": "\033[1;32m",
    "tool_cmd": "\033[36m",
    "tool_out": "\033[90m",
    "text": "\033[0m",
}
_ANSI_OFF = {key: "" for key in _ANSI}
_SYSTEM_BLOCK_RE = re.compile(r"<system-reminder>.*?</system-reminder>", re.DOTALL)
_TOOL_OUTPUT_MAX_LINES = 10


[docs] def insert_option_before_prompt(command: list[str], *option_tokens: str) -> list[str]: """Insert option tokens before the final prompt argument.""" if not command: return command prompt_arg = command[-1] return [*command[:-1], *option_tokens, prompt_arg]
def _truncate_tool_output(text: str, max_lines: int = _TOOL_OUTPUT_MAX_LINES) -> str: lines = text.splitlines() if len(lines) <= max_lines: return text omitted = len(lines) - max_lines return "\n".join(lines[:max_lines]) + f"\n… ({omitted} more lines)" def _strip_thinking_noise(text: str) -> str: cleaned = _SYSTEM_BLOCK_RE.sub("", text) cleaned = re.sub(r"\n{3,}", "\n\n", cleaned) return cleaned.strip() def _extract_text_like(value: object, *, strip: bool = True) -> str: if isinstance(value, str): return value.strip() if strip else value if isinstance(value, list): parts = [_extract_text_like(item, strip=strip) for item in value] return "".join(part for part in parts if part) if isinstance(value, dict): parts: list[str] = [] for key in ("text", "content", "message", "raw_content", "summary_text"): nested = _extract_text_like(value.get(key), strip=strip) if nested: parts.append(nested) if parts: return "".join(parts) return "" def _format_tool_input_preview(payload: object) -> str: if isinstance(payload, dict): candidate = ( payload.get("command") or payload.get("file_path") or payload.get("path") ) if isinstance(candidate, str) and candidate: return candidate return json.dumps(payload, ensure_ascii=False) if payload is None: return "" return str(payload)
[docs] class ProviderAgent(ABC): """Base provider contract for binary resolution and command assembly.""" @property @abstractmethod def provider(self) -> str: """Return canonical provider id (for example ``codex``).""" @property @abstractmethod def bin_env_key(self) -> str: """Return environment key used for binary override lookup.""" @property @abstractmethod def default_binary(self) -> str: """Return provider CLI default binary name."""
[docs] def provider_id(self) -> str: return self.provider
[docs] def resolve_binary(self, *, provider_bin_override: str | None = None) -> str: """Resolve executable name/path for this provider.""" del provider_bin_override raw = os.getenv(self.bin_env_key, self.default_binary) cleaned = raw.strip() if isinstance(raw, str) else "" return cleaned or self.default_binary
[docs] def resolve_binary_override(self, raw_override: str | None = None) -> str | None: """Resolve an optional compatibility override for this provider binary.""" del raw_override return None
[docs] def build_exec_command( self, *, provider_bin: str, repo_dir: Path, prompt: str, sandbox_policy: str = DEFAULT_SANDBOX_POLICY, sandbox_mode: str | None = None, model: str | None = None, reasoning_effort: str | None = None, json_output: bool = True, ) -> list[str]: """Build provider-specific argv for one exec/chat invocation.""" raise NotImplementedError( f"Provider '{self.provider_id()}' does not implement build_exec_command()." )
[docs] def uses_json_stream(self) -> bool: """Return whether shared CLI execution should request stream-json output.""" return False
[docs] def uses_json_output_for_second_guess(self) -> bool: """Return whether second-guess subprocesses should request JSON output.""" return False
[docs] def supports_direct_terminal_stream(self) -> bool: """Return whether direct terminal passthrough is preferred for this provider.""" return False
[docs] def prepare_shared_turn_command( self, command: list[str], *, last_message_path: Path, ) -> list[str]: """Apply provider-specific command tweaks for shared chat/loop turns.""" del last_message_path return command
[docs] def prepare_one_shot_exec_command(self, command: list[str]) -> list[str]: """Apply provider-specific command tweaks for one-shot exec runs.""" return command
[docs] def prepare_final_reply_capture_command( self, command: list[str], *, last_message_path: Path, json_output: bool, ) -> list[str]: """Apply provider-specific command tweaks for final-reply file capture.""" del last_message_path, json_output return command
[docs] def sanitize_process_env(self, env: dict[str, str]) -> dict[str, str]: """Apply provider-specific subprocess environment sanitation.""" return env
[docs] def normalize_process_home(self, env: dict[str, str]) -> dict[str, str]: """Normalize any provider-specific writable home paths.""" return env
[docs] def prepare_runtime_env( self, env: dict[str, str], *, model: str | None = None, reasoning_effort: str | None = None, ) -> tuple[dict[str, str], list[Path]]: """Apply provider runtime env overrides and return cleanup paths.""" del model, reasoning_effort return env, []
[docs] def supports_auto_compile_metadata_generation(self) -> bool: """Return whether this provider supports auto-compile metadata generation.""" return False
[docs] def service_env_overrides(self, *, cwd: Path) -> dict[str, str]: """Return provider-specific service env vars derived from the parent env.""" del cwd return {}
[docs] def workspace_instruction_alias_name(self) -> str | None: """Return provider-native instruction alias filename, if any.""" return None
[docs] def ensure_workspace_instruction_alias(self, repo_dir: Path) -> None: """Ensure the provider instruction alias points to ``AGENTS.md``.""" alias_name = self.workspace_instruction_alias_name() if not isinstance(alias_name, str) or not alias_name: return repo_agents = repo_dir / "AGENTS.md" if not repo_agents.is_file(): return repo_alias = repo_dir / alias_name if repo_alias.is_symlink(): try: if repo_alias.resolve() == repo_agents.resolve(): return except OSError: pass try: repo_alias.unlink() except OSError: return elif repo_alias.exists(): return try: os.symlink("AGENTS.md", repo_alias) except OSError: try: shutil.copy2(repo_agents, repo_alias) except OSError: pass
[docs] def render_stream_event( self, event: dict, *, use_color: bool = True, ) -> str | None: """Convert one provider stream event to human-readable terminal text.""" c = _ANSI if use_color else _ANSI_OFF reset = c["reset"] event_type = event.get("type", "") if event_type == "assistant": message = event.get("message") if not isinstance(message, dict): return None content = message.get("content") if not isinstance(content, list): return None parts: list[str] = [] for block in content: if not isinstance(block, dict): continue block_type = block.get("type", "") if block_type == "text": text = block.get("text", "").strip() if text: parts.append(f"{c['text']}{text}{reset}") elif block_type == "thinking": raw = _strip_thinking_noise(block.get("thinking", "")) if raw: parts.append(f"{c['thinking']}{raw}{reset}") elif block_type == "tool_use": name = block.get("name", "") command_preview = _format_tool_input_preview(block.get("input")) parts.append( f"{c['tool_label']}[{name}]{reset} " f"{c['tool_cmd']}{command_preview}{reset}" ) return "\n".join(parts) if parts else None if event_type == "user": message = event.get("message") if not isinstance(message, dict): return None content = message.get("content") if not isinstance(content, list): return None parts = [] for block in content: if not isinstance(block, dict): continue if block.get("type") != "tool_result": continue result_content = block.get("content") if isinstance(result_content, str) and result_content.strip(): output = _truncate_tool_output(result_content.strip()) parts.append(f"{c['tool_out']}{output}{reset}") elif isinstance(result_content, list): for entry in result_content: if isinstance(entry, dict) and entry.get("type") == "text": text = entry.get("text", "").strip() if text: output = _truncate_tool_output(text) parts.append(f"{c['tool_out']}{output}{reset}") return "\n".join(parts) if parts else None if event_type == "message": role = str(event.get("role") or "").strip().lower() if role != "assistant": return None content = _extract_text_like(event.get("content"), strip=True) if content: return f"{c['text']}{content}{reset}" return None if event_type == "thought": thought_value = event.get("value") if isinstance(thought_value, dict): subject = str(thought_value.get("subject") or "").strip() description = str(thought_value.get("description") or "").strip() if subject and description: content = f"{subject}\n{description}" else: content = subject or description else: content = _extract_text_like(thought_value, strip=True) if content: cleaned = _strip_thinking_noise(content) if cleaned: return f"{c['thinking']}{cleaned}{reset}" return None if event_type == "tool_use": name = str(event.get("tool_name") or event.get("name") or "").strip() payload = event.get("parameters") if payload is None: payload = event.get("input") command_preview = _format_tool_input_preview(payload) if not name and not command_preview: return None return ( f"{c['tool_label']}[{name}]{reset} " f"{c['tool_cmd']}{command_preview}{reset}" ).strip() if event_type == "tool_result": output = _extract_text_like(event.get("output"), strip=True) if not output: error_obj = event.get("error") if isinstance(error_obj, dict): output = str(error_obj.get("message") or "").strip() elif error_obj is not None: output = str(error_obj).strip() if not output: return None return f"{c['tool_out']}{_truncate_tool_output(output)}{reset}" if event_type == "error": severity = str(event.get("severity") or "").strip().lower() message = str(event.get("message") or "").strip() if not message: return None label = f"[{severity}] " if severity else "" return f"{c['tool_out']}{label}{message}{reset}" return None
[docs] def extract_assistant_text_chunk(self, event: dict) -> tuple[str, bool]: """Extract one assistant text chunk and whether it is a delta chunk.""" if not isinstance(event, dict): return "", False event_type = event.get("type") if event_type == "assistant": message = event.get("message") if not isinstance(message, dict): return "", False content = message.get("content") if not isinstance(content, list): return "", False parts: list[str] = [] for block in content: if not isinstance(block, dict): continue if block.get("type") != "text": continue text = block.get("text") if isinstance(text, str): cleaned = text.strip() if cleaned: parts.append(cleaned) return "\n".join(parts).strip(), False if event_type == "message": role = str(event.get("role") or "").strip().lower() if role != "assistant": return "", False content = _extract_text_like(event.get("content"), strip=False) if content: return content, bool(event.get("delta")) return "", False