from __future__ import annotations
import json
import os
import re
import shutil
from abc import ABC, abstractmethod
from pathlib import Path
from fermilink.agent_runtime import DEFAULT_SANDBOX_POLICY
# ANSI color palette for provider stream rendering.
_ANSI = {
"reset": "\033[0m",
"thinking": "\033[3;37m",
"tool_label": "\033[1;32m",
"tool_cmd": "\033[36m",
"tool_out": "\033[90m",
"text": "\033[0m",
}
_ANSI_OFF = {key: "" for key in _ANSI}
_SYSTEM_BLOCK_RE = re.compile(r"<system-reminder>.*?</system-reminder>", re.DOTALL)
_TOOL_OUTPUT_MAX_LINES = 10
[docs]
def insert_option_before_prompt(command: list[str], *option_tokens: str) -> list[str]:
"""Insert option tokens before the final prompt argument."""
if not command:
return command
prompt_arg = command[-1]
return [*command[:-1], *option_tokens, prompt_arg]
def _truncate_tool_output(text: str, max_lines: int = _TOOL_OUTPUT_MAX_LINES) -> str:
lines = text.splitlines()
if len(lines) <= max_lines:
return text
omitted = len(lines) - max_lines
return "\n".join(lines[:max_lines]) + f"\n… ({omitted} more lines)"
def _strip_thinking_noise(text: str) -> str:
cleaned = _SYSTEM_BLOCK_RE.sub("", text)
cleaned = re.sub(r"\n{3,}", "\n\n", cleaned)
return cleaned.strip()
def _extract_text_like(value: object, *, strip: bool = True) -> str:
if isinstance(value, str):
return value.strip() if strip else value
if isinstance(value, list):
parts = [_extract_text_like(item, strip=strip) for item in value]
return "".join(part for part in parts if part)
if isinstance(value, dict):
parts: list[str] = []
for key in ("text", "content", "message", "raw_content", "summary_text"):
nested = _extract_text_like(value.get(key), strip=strip)
if nested:
parts.append(nested)
if parts:
return "".join(parts)
return ""
def _format_tool_input_preview(payload: object) -> str:
if isinstance(payload, dict):
candidate = (
payload.get("command") or payload.get("file_path") or payload.get("path")
)
if isinstance(candidate, str) and candidate:
return candidate
return json.dumps(payload, ensure_ascii=False)
if payload is None:
return ""
return str(payload)
[docs]
class ProviderAgent(ABC):
"""Base provider contract for binary resolution and command assembly."""
@property
@abstractmethod
def provider(self) -> str:
"""Return canonical provider id (for example ``codex``)."""
@property
@abstractmethod
def bin_env_key(self) -> str:
"""Return environment key used for binary override lookup."""
@property
@abstractmethod
def default_binary(self) -> str:
"""Return provider CLI default binary name."""
[docs]
def provider_id(self) -> str:
return self.provider
[docs]
def resolve_binary(self, *, provider_bin_override: str | None = None) -> str:
"""Resolve executable name/path for this provider."""
del provider_bin_override
raw = os.getenv(self.bin_env_key, self.default_binary)
cleaned = raw.strip() if isinstance(raw, str) else ""
return cleaned or self.default_binary
[docs]
def resolve_binary_override(self, raw_override: str | None = None) -> str | None:
"""Resolve an optional compatibility override for this provider binary."""
del raw_override
return None
[docs]
def build_exec_command(
self,
*,
provider_bin: str,
repo_dir: Path,
prompt: str,
sandbox_policy: str = DEFAULT_SANDBOX_POLICY,
sandbox_mode: str | None = None,
model: str | None = None,
reasoning_effort: str | None = None,
json_output: bool = True,
) -> list[str]:
"""Build provider-specific argv for one exec/chat invocation."""
raise NotImplementedError(
f"Provider '{self.provider_id()}' does not implement build_exec_command()."
)
[docs]
def uses_json_stream(self) -> bool:
"""Return whether shared CLI execution should request stream-json output."""
return False
[docs]
def uses_json_output_for_second_guess(self) -> bool:
"""Return whether second-guess subprocesses should request JSON output."""
return False
[docs]
def supports_direct_terminal_stream(self) -> bool:
"""Return whether direct terminal passthrough is preferred for this provider."""
return False
[docs]
def prepare_shared_turn_command(
self,
command: list[str],
*,
last_message_path: Path,
) -> list[str]:
"""Apply provider-specific command tweaks for shared chat/loop turns."""
del last_message_path
return command
[docs]
def prepare_one_shot_exec_command(self, command: list[str]) -> list[str]:
"""Apply provider-specific command tweaks for one-shot exec runs."""
return command
[docs]
def prepare_final_reply_capture_command(
self,
command: list[str],
*,
last_message_path: Path,
json_output: bool,
) -> list[str]:
"""Apply provider-specific command tweaks for final-reply file capture."""
del last_message_path, json_output
return command
[docs]
def sanitize_process_env(self, env: dict[str, str]) -> dict[str, str]:
"""Apply provider-specific subprocess environment sanitation."""
return env
[docs]
def normalize_process_home(self, env: dict[str, str]) -> dict[str, str]:
"""Normalize any provider-specific writable home paths."""
return env
[docs]
def prepare_runtime_env(
self,
env: dict[str, str],
*,
model: str | None = None,
reasoning_effort: str | None = None,
) -> tuple[dict[str, str], list[Path]]:
"""Apply provider runtime env overrides and return cleanup paths."""
del model, reasoning_effort
return env, []
[docs]
def service_env_overrides(self, *, cwd: Path) -> dict[str, str]:
"""Return provider-specific service env vars derived from the parent env."""
del cwd
return {}
[docs]
def workspace_instruction_alias_name(self) -> str | None:
"""Return provider-native instruction alias filename, if any."""
return None
[docs]
def ensure_workspace_instruction_alias(self, repo_dir: Path) -> None:
"""Ensure the provider instruction alias points to ``AGENTS.md``."""
alias_name = self.workspace_instruction_alias_name()
if not isinstance(alias_name, str) or not alias_name:
return
repo_agents = repo_dir / "AGENTS.md"
if not repo_agents.is_file():
return
repo_alias = repo_dir / alias_name
if repo_alias.is_symlink():
try:
if repo_alias.resolve() == repo_agents.resolve():
return
except OSError:
pass
try:
repo_alias.unlink()
except OSError:
return
elif repo_alias.exists():
return
try:
os.symlink("AGENTS.md", repo_alias)
except OSError:
try:
shutil.copy2(repo_agents, repo_alias)
except OSError:
pass
[docs]
def remove_workspace_instruction_alias_symlink(self, repo_dir: Path) -> None:
"""Remove the provider alias symlink while leaving real files untouched."""
alias_name = self.workspace_instruction_alias_name()
if not isinstance(alias_name, str) or not alias_name:
return
repo_alias = repo_dir / alias_name
if not repo_alias.is_symlink():
return
try:
repo_alias.unlink()
except OSError:
pass
[docs]
def render_stream_event(
self,
event: dict,
*,
use_color: bool = True,
) -> str | None:
"""Convert one provider stream event to human-readable terminal text."""
c = _ANSI if use_color else _ANSI_OFF
reset = c["reset"]
event_type = event.get("type", "")
if event_type == "assistant":
message = event.get("message")
if not isinstance(message, dict):
return None
content = message.get("content")
if not isinstance(content, list):
return None
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
block_type = block.get("type", "")
if block_type == "text":
text = block.get("text", "").strip()
if text:
parts.append(f"{c['text']}{text}{reset}")
elif block_type == "thinking":
raw = _strip_thinking_noise(block.get("thinking", ""))
if raw:
parts.append(f"{c['thinking']}{raw}{reset}")
elif block_type == "tool_use":
name = block.get("name", "")
command_preview = _format_tool_input_preview(block.get("input"))
parts.append(
f"{c['tool_label']}[{name}]{reset} "
f"{c['tool_cmd']}{command_preview}{reset}"
)
return "\n".join(parts) if parts else None
if event_type == "user":
message = event.get("message")
if not isinstance(message, dict):
return None
content = message.get("content")
if not isinstance(content, list):
return None
parts = []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") != "tool_result":
continue
result_content = block.get("content")
if isinstance(result_content, str) and result_content.strip():
output = _truncate_tool_output(result_content.strip())
parts.append(f"{c['tool_out']}{output}{reset}")
elif isinstance(result_content, list):
for entry in result_content:
if isinstance(entry, dict) and entry.get("type") == "text":
text = entry.get("text", "").strip()
if text:
output = _truncate_tool_output(text)
parts.append(f"{c['tool_out']}{output}{reset}")
return "\n".join(parts) if parts else None
if event_type == "message":
role = str(event.get("role") or "").strip().lower()
if role != "assistant":
return None
content = _extract_text_like(event.get("content"), strip=True)
if content:
return f"{c['text']}{content}{reset}"
return None
if event_type == "thought":
thought_value = event.get("value")
if isinstance(thought_value, dict):
subject = str(thought_value.get("subject") or "").strip()
description = str(thought_value.get("description") or "").strip()
if subject and description:
content = f"{subject}\n{description}"
else:
content = subject or description
else:
content = _extract_text_like(thought_value, strip=True)
if content:
cleaned = _strip_thinking_noise(content)
if cleaned:
return f"{c['thinking']}{cleaned}{reset}"
return None
if event_type == "tool_use":
name = str(event.get("tool_name") or event.get("name") or "").strip()
payload = event.get("parameters")
if payload is None:
payload = event.get("input")
command_preview = _format_tool_input_preview(payload)
if not name and not command_preview:
return None
return (
f"{c['tool_label']}[{name}]{reset} "
f"{c['tool_cmd']}{command_preview}{reset}"
).strip()
if event_type == "tool_result":
output = _extract_text_like(event.get("output"), strip=True)
if not output:
error_obj = event.get("error")
if isinstance(error_obj, dict):
output = str(error_obj.get("message") or "").strip()
elif error_obj is not None:
output = str(error_obj).strip()
if not output:
return None
return f"{c['tool_out']}{_truncate_tool_output(output)}{reset}"
if event_type == "error":
severity = str(event.get("severity") or "").strip().lower()
message = str(event.get("message") or "").strip()
if not message:
return None
label = f"[{severity}] " if severity else ""
return f"{c['tool_out']}{label}{message}{reset}"
return None