from __future__ import annotations
import argparse
import hashlib
import json
import math
import os
import re
import shlex
import shutil
import subprocess
from collections import Counter, defaultdict
from collections.abc import Callable
from datetime import datetime, timezone
from pathlib import Path
from fermilink.cli.workflow_prompts import (
LOOP_MEMORY_DIRNAME,
LOOP_MEMORY_FILENAME,
LOOP_PID_TOKEN_RE,
LOOP_SLURM_JOB_TOKEN_RE,
LOOP_WAIT_TOKEN_RE,
REPRODUCE_AUDITOR_PROMPT_PREFIX,
REPRODUCE_LOGS_DIRNAME,
REPRODUCE_PLAN_FILENAME,
REPRODUCE_PLAN_TAG,
REPRODUCE_PLAN_TOKEN_RE,
REPRODUCE_PLANNER_PROMPT_PREFIX,
REPRODUCE_PROMPTS_DIRNAME,
REPRODUCE_STATE_FILENAME,
RESEARCH_AUDITOR_PROMPT_PREFIX,
RESEARCH_PLAN_TAG,
RESEARCH_PLAN_TOKEN_RE,
RESEARCH_PLANNER_PROMPT_PREFIX,
WORKFLOW_DATA_AUDITOR_PROMPT_PREFIX,
WORKFLOW_DATA_DIRNAME,
WORKFLOW_DATA_MANIFEST_FULL_FILENAME,
WORKFLOW_DATA_MANIFEST_FILENAME,
WORKFLOW_DATA_SUMMARY_FILENAME,
WORKFLOW_REPORT_AUDITOR_PROMPT_PREFIX,
WORKFLOW_REPORT_FILENAME,
WORKFLOW_REPORT_GENERATOR_PROMPT_PREFIX,
WORKFLOW_SUMMARIES_DIRNAME,
WORKFLOW_TASK_DATA_MAP_FILENAME,
WORKFLOW_TASK_DATA_MAP_TAG,
WORKFLOW_TASK_DATA_MAP_TOKEN_RE,
)
def _cli():
from fermilink import cli
return cli
def _utc_now_z() -> str:
return datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
WorkflowStatusHook = Callable[[str], None]
def _emit_workflow_status(
workflow_status_hook: WorkflowStatusHook | None, mode_text: str
) -> None:
if not callable(workflow_status_hook):
return
status_text = str(mode_text or "").strip()
if not status_text:
return
try:
workflow_status_hook(status_text)
except Exception:
# Keep workflow execution resilient if optional status hooks fail.
return
def _write_json_atomic(path: Path, payload: dict[str, object]) -> None:
cli = _cli()
path.parent.mkdir(parents=True, exist_ok=True)
temp_path = path.with_suffix(path.suffix + ".tmp")
try:
temp_path.write_text(json.dumps(payload, indent=2) + "\n", encoding="utf-8")
temp_path.replace(path)
except OSError as exc:
raise cli.PackageError(f"Failed to write file: {path}: {exc}") from exc
_GIT_COMMIT_MAX_FILES = 500
_GIT_COMMIT_MAX_BYTES = 100 * 1024 * 1024 # 100 MB
def _default_checkpoint_payload() -> dict[str, str]:
return {
"status": "noop",
"sha": "",
"error": "",
"memory_only": "false",
}
def _normalize_checkpoint_payload(payload: object) -> dict[str, str]:
normalized = _default_checkpoint_payload()
if not isinstance(payload, dict):
return normalized
for key in normalized:
raw_value = payload.get(key)
if raw_value is None:
continue
normalized[key] = str(raw_value)
return normalized
def _sum_changed_file_bytes(repo_dir: Path, porcelain_lines: list[str]) -> int:
"""Return total on-disk bytes of new/modified files listed in git status --porcelain."""
total = 0
for line in porcelain_lines:
if len(line) < 4:
continue
path_part = line[3:]
# Rename format: "old -> new" — the new path is what lives on disk.
if " -> " in path_part:
path_part = path_part.split(" -> ", 1)[1]
path_part = path_part.strip().strip('"')
try:
total += (repo_dir / path_part).stat().st_size
except OSError:
# Deleted files or unresolvable paths — skip.
pass
return total
def _workflow_checkpoint_commit(
*,
repo_dir: Path,
commit_message: str,
) -> dict[str, str]:
"""Create a best-effort git checkpoint commit for workflow/session surfaces."""
payload = _default_checkpoint_payload()
if not (repo_dir / ".git").exists():
return payload
git_bin = shutil.which("git")
if not git_bin:
payload["status"] = "failed"
payload["error"] = "git binary not found on PATH"
return payload
def _run_git(args: list[str]) -> subprocess.CompletedProcess[str]:
return subprocess.run(
[git_bin, *args],
cwd=str(repo_dir),
text=True,
capture_output=True,
check=False,
)
def _detail(proc: subprocess.CompletedProcess[str]) -> str:
text = (proc.stderr or proc.stdout or "").strip()
if not text:
text = f"git command failed with exit code {proc.returncode}"
return text[:500]
repo_check = _run_git(["rev-parse", "--is-inside-work-tree"])
if repo_check.returncode != 0:
return payload
# Inspect the working tree before touching the index so we never stage
# large datasets into .git/objects.
status_proc = _run_git(["status", "--porcelain"])
if status_proc.returncode != 0:
payload["status"] = "failed"
payload["error"] = _detail(status_proc)
return payload
changed_lines = [l for l in (status_proc.stdout or "").splitlines() if l.strip()]
file_count = len(changed_lines)
total_bytes = _sum_changed_file_bytes(repo_dir, changed_lines)
over_limit = (
file_count > _GIT_COMMIT_MAX_FILES or total_bytes > _GIT_COMMIT_MAX_BYTES
)
if over_limit:
memory_path = repo_dir / LOOP_MEMORY_DIRNAME / LOOP_MEMORY_FILENAME
if not memory_path.is_file():
# Nothing useful to commit; leave status as "noop".
return payload
memory_rel = str(memory_path.relative_to(repo_dir))
add_proc = _run_git(["add", "--", memory_rel])
commit_message = (
f"{commit_message} [memory-only: {file_count} files / "
f"{total_bytes // (1024 * 1024)} MB exceeded limit]"
)
payload["memory_only"] = "true"
else:
add_proc = _run_git(["add", "-A"])
if add_proc.returncode != 0:
payload["status"] = "failed"
payload["error"] = _detail(add_proc)
return payload
staged_proc = _run_git(["diff", "--cached", "--quiet"])
if staged_proc.returncode == 0:
return payload
if staged_proc.returncode != 1:
payload["status"] = "failed"
payload["error"] = _detail(staged_proc)
return payload
commit_proc = _run_git(["commit", "-m", commit_message])
if commit_proc.returncode != 0:
payload["status"] = "failed"
payload["error"] = _detail(commit_proc)
return payload
sha_proc = _run_git(["rev-parse", "--verify", "HEAD"])
if sha_proc.returncode != 0:
payload["status"] = "failed"
payload["error"] = _detail(sha_proc)
return payload
payload["status"] = "committed"
payload["sha"] = (sha_proc.stdout or "").strip()
return payload
def _workflow_completion_commit(
*,
repo_dir: Path,
mode_name: str,
) -> dict[str, str]:
"""Create a best-effort repository checkpoint when a mode finishes."""
mode_label = str(mode_name or "").strip() or "unknown"
commit_message = f"fermilink {mode_label}: completion checkpoint {_utc_now_z()}"
return _workflow_checkpoint_commit(
repo_dir=repo_dir,
commit_message=commit_message,
)
def _attempt_mode_completion_commit(
*,
repo_dir: Path,
args: argparse.Namespace,
mode_name: str,
) -> dict[str, str]:
"""Best-effort completion commit shared by all modes (exec, loop, research, reproduce)."""
if bool(getattr(args, "_fermilink_disable_completion_commit", False)):
return _default_checkpoint_payload()
try:
return _normalize_checkpoint_payload(
_workflow_completion_commit(
repo_dir=repo_dir,
mode_name=mode_name,
)
)
except Exception as exc:
# Keep command completion resilient if best-effort commit fails unexpectedly.
payload = _default_checkpoint_payload()
payload["status"] = "failed"
payload["error"] = str(exc)[:500]
return payload
DEFAULT_DATA_MAX_FILES = 4000
DEFAULT_DATA_MAX_TOTAL_BYTES = 1_073_741_824
DEFAULT_DATA_MAX_FILE_BYTES = 67_108_864
DEFAULT_DATA_HASH_MAX_BYTES = 1_048_576
DEFAULT_DATA_PROMPT_MAX_FILES = 240
DEFAULT_DATA_LARGE_THRESHOLD_FILES = 500
DEFAULT_DATA_LARGE_THRESHOLD_CHARS = 125_000
DEFAULT_DATA_TASK_SLICE_MAX_FILES = 120
DEFAULT_DATA_TASK_SLICE_MAX_CHARS = 45_000
DEFAULT_DATA_TASK_FALLBACK_FILES = 8
DATA_MANIFEST_SCHEMA_VERSION = 2
DATA_FILTER_RULES_VERSION = 1
DATA_MAPPING_MODE_GLOBAL = "global"
DATA_MAPPING_MODE_PER_TASK_LARGE = "per_task_large"
NOISE_DIR_NAMES = {
"__pycache__",
".pytest_cache",
".mypy_cache",
".ruff_cache",
".ipynb_checkpoints",
}
NOISE_BASENAME_EXACT = {".ds_store", "nohup.out"}
NOISE_SUFFIXES = {".tmp", ".swp", ".bak"}
SLURM_BASENAME_RE = re.compile(r"^slurm-[^/]+\.(?:out|err)$")
SBATCH_TOKEN_RE = re.compile(r"(^|[^A-Za-z0-9_])sbatch($|[^A-Za-z0-9_])")
SBATCH_PARSABLE_RE = re.compile(r"--parsable(?:\s|$)")
SBATCH_AFTEROK_RE = re.compile(r"--dependency\s*=\s*afterok\s*:")
WORKFLOW_FINAL_JOB_ID_MARKER = "FERMILINK_FINAL_JOB_ID="
WORKFLOW_UPSTREAM_JOB_ID_ENV = "FERMILINK_UPSTREAM_JOB_ID"
WORKFLOW_ALLINONE_BATCH_SCRIPT_FILENAME = "00_run_all.sh"
WORKFLOW_SIMULATION_BATCH_SCRIPT_FILENAME = "01_run_simulations.sh"
WORKFLOW_POSTPROCESS_BATCH_SCRIPT_FILENAME = "02_run_postprocess.sh"
WORKFLOW_PLOT_BATCH_SCRIPT_FILENAME = "03_run_plots.sh"
WORKFLOW_SIMULATION_JOB_MAP_FILENAME = "simulation_job_ids.tsv"
WORKFLOW_POSTPROCESS_JOB_MAP_FILENAME = "postprocess_job_ids.tsv"
WORKFLOW_PLOT_JOB_MAP_FILENAME = "plot_job_ids.tsv"
WORKFLOW_HPC_CONTRACT_ERRORS_FILENAME = "hpc_contract_errors.json"
WORKFLOW_HPC_VALIDATION_MAX_ATTEMPTS = 6
WORKFLOW_HPC_VALIDATION_STALL_LIMIT = 2
WORKFLOW_HPC_FEEDBACK_MAX_ISSUES = 80
WORKFLOW_TASK_SIMULATION_SCRIPT_FILENAME = "run_simulation.sh"
WORKFLOW_TASK_POSTPROCESS_SCRIPT_FILENAME = "run_postprocess.sh"
WORKFLOW_TASK_PLOT_SCRIPT_FILENAME = "run_plot.sh"
WORKFLOW_UNIFIED_MEMORY_STAGE_INSTRUCTIONS = (
"Unified-memory requirements (apply in this stage):\n"
"- Before acting, read `projects/memory.md`.\n"
"- After completing this stage, update `projects/memory.md` with concise entries:\n"
" - `## Short-Term Memory (Operational) -> ### Plan`\n"
" - `## Short-Term Memory (Operational) -> ### Progress log`\n"
"- Do not create duplicate memory section headings; update existing sections in place.\n"
"- Update long-term sections only when there is durable information:\n"
" - `### File map` for stable file/purpose mapping changes.\n"
" - `### Simulation history` for run/job milestones and outcomes.\n"
" - `### Key results` for validated, reproducible outcomes (include artifact paths).\n"
" - `### Parameter source mapping` for simulation parameter/setting provenance.\n"
" - `### Simulation uncertainty` for uncertainty, assumptions, and confidence gaps.\n"
" - `### Suggested skills updates` for recurring failure patterns and concrete fixes.\n"
)
USEFUL_SUFFIXES = {
".json",
".yaml",
".yml",
".csv",
".py",
".ipynb",
".xyz",
".cif",
".npy",
".npz",
".h5",
}
BOOST_PATH_TOKENS = {
"input",
"inputs",
"config",
"configs",
"geometry",
"geom",
"postprocess",
"post_process",
"plot",
"plots",
"script",
"scripts",
}
PENALTY_PATH_TOKENS = {"dump", "stderr", "stdout", "debug", "tmp", "temp", "cache"}
TASK_SLICE_COVERAGE_CATEGORIES = {
"input",
"config",
"postprocess",
"plot",
"script",
}
def _repo_relative_path(repo_dir: Path, path: Path) -> str:
try:
return str(path.relative_to(repo_dir))
except ValueError:
return str(path)
def _normalize_positive_int(
raw_value: object, *, flag_name: str, minimum: int = 1
) -> int:
cli = _cli()
try:
value = int(raw_value)
except (TypeError, ValueError) as exc:
raise cli.PackageError(f"{flag_name} must be an integer.") from exc
if value < minimum:
raise cli.PackageError(f"{flag_name} must be >= {minimum}.")
return value
def _normalize_data_scan_limits(args: argparse.Namespace) -> dict[str, int]:
max_files = _normalize_positive_int(
getattr(args, "data_max_files", DEFAULT_DATA_MAX_FILES),
flag_name="--data-max-files",
minimum=1,
)
max_total_bytes = _normalize_positive_int(
getattr(args, "data_max_total_bytes", DEFAULT_DATA_MAX_TOTAL_BYTES),
flag_name="--data-max-total-bytes",
minimum=1,
)
max_file_bytes = _normalize_positive_int(
getattr(args, "data_max_file_bytes", DEFAULT_DATA_MAX_FILE_BYTES),
flag_name="--data-max-file-bytes",
minimum=1,
)
hash_max_bytes = _normalize_positive_int(
getattr(args, "data_hash_max_bytes", DEFAULT_DATA_HASH_MAX_BYTES),
flag_name="--data-hash-max-bytes",
minimum=1,
)
if max_file_bytes > max_total_bytes:
max_file_bytes = max_total_bytes
if hash_max_bytes > max_file_bytes:
hash_max_bytes = max_file_bytes
return {
"max_files": max_files,
"max_total_bytes": max_total_bytes,
"max_file_bytes": max_file_bytes,
"hash_max_bytes": hash_max_bytes,
}
def _default_data_filter_settings() -> dict[str, object]:
return {
"ignore_noise": True,
"drop_slurm_outputs": True,
"drop_cache_dirs": True,
"drop_ephemeral_suffixes": True,
"family_collapse": {
"enabled": True,
"max_examples_per_family": 3,
},
}
def _default_data_mapping_thresholds() -> dict[str, int]:
return {
"large_threshold_files": DEFAULT_DATA_LARGE_THRESHOLD_FILES,
"large_threshold_chars": DEFAULT_DATA_LARGE_THRESHOLD_CHARS,
"task_slice_max_files": DEFAULT_DATA_TASK_SLICE_MAX_FILES,
"task_slice_max_chars": DEFAULT_DATA_TASK_SLICE_MAX_CHARS,
}
def _default_local_hpc_context() -> dict[str, object]:
return {
"enabled": False,
"mode": "local",
"scheduler": "none",
"source": "default_local",
"profile": {},
}
def _normalize_hpc_profile(
*, raw_profile: dict[str, object], profile_path: Path
) -> dict[str, object]:
cli = _cli()
profile_label = str(profile_path)
required_keys = (
"slurm_default_partition",
"slurm_defaults",
"slurm_resource_policy",
)
normalized: dict[str, object] = {}
for key in required_keys:
raw_value = raw_profile.get(key)
if raw_value is None:
raise cli.PackageError(
f"--hpc-profile missing required `{key}` in {profile_label}."
)
if not isinstance(raw_value, str):
raise cli.PackageError(
f"--hpc-profile `{key}` must be a non-empty string in {profile_label}."
)
text_value = " ".join(raw_value.strip().split())
if not text_value:
raise cli.PackageError(
f"--hpc-profile `{key}` must be a non-empty string in {profile_label}."
)
normalized[key] = text_value
return normalized
def _resolve_invocation_hpc_context(
*, repo_dir: Path, args: argparse.Namespace
) -> dict[str, object]:
cli = _cli()
raw_hpc_profile = str(getattr(args, "hpc_profile", "") or "").strip()
if not raw_hpc_profile:
return _default_local_hpc_context()
resolved_profile_path = cli._resolve_project_path(raw_hpc_profile)
if not resolved_profile_path.exists():
raise cli.PackageError(f"--hpc-profile does not exist: {resolved_profile_path}")
if not resolved_profile_path.is_file():
raise cli.PackageError(f"--hpc-profile must be a file: {resolved_profile_path}")
try:
payload = json.loads(resolved_profile_path.read_text(encoding="utf-8"))
except OSError as exc:
raise cli.PackageError(
f"Failed to read --hpc-profile: {resolved_profile_path}: {exc}"
) from exc
except json.JSONDecodeError as exc:
raise cli.PackageError(
f"--hpc-profile must contain valid JSON: {resolved_profile_path}: {exc}"
) from exc
if not isinstance(payload, dict):
raise cli.PackageError(
f"--hpc-profile root JSON must be an object: {resolved_profile_path}"
)
normalized_profile = _normalize_hpc_profile(
raw_profile=payload, profile_path=resolved_profile_path
)
fingerprint = hashlib.sha256(
json.dumps(normalized_profile, sort_keys=True, separators=(",", ":")).encode(
"utf-8"
)
).hexdigest()
return {
"enabled": True,
"mode": "hpc_slurm",
"scheduler": "slurm",
"source": "cli_hpc_profile",
"profile_path": str(resolved_profile_path),
"profile_path_input": raw_hpc_profile,
"profile_relpath": _repo_relative_path(repo_dir, resolved_profile_path),
"fingerprint": fingerprint,
"profile": normalized_profile,
}
def _is_hpc_context_enabled(hpc_context: object) -> bool:
return isinstance(hpc_context, dict) and bool(hpc_context.get("enabled"))
def _coerce_saved_hpc_context(state: dict[str, object]) -> dict[str, object]:
raw = state.get("hpc_context")
if not isinstance(raw, dict):
return _default_local_hpc_context()
enabled = bool(raw.get("enabled"))
if not enabled:
normalized_local = _default_local_hpc_context()
source = str(raw.get("source") or "").strip()
if source:
normalized_local["source"] = source
return normalized_local
profile = raw.get("profile")
normalized: dict[str, object] = {
"enabled": True,
"mode": "hpc_slurm",
"scheduler": "slurm",
"source": str(raw.get("source") or "cli_hpc_profile").strip()
or "cli_hpc_profile",
"profile": profile if isinstance(profile, dict) else {},
}
profile_path = str(raw.get("profile_path") or "").strip()
if profile_path:
normalized["profile_path"] = profile_path
profile_path_input = str(raw.get("profile_path_input") or "").strip()
if profile_path_input:
normalized["profile_path_input"] = profile_path_input
profile_relpath = str(raw.get("profile_relpath") or "").strip()
if profile_relpath:
normalized["profile_relpath"] = profile_relpath
fingerprint = str(raw.get("fingerprint") or "").strip()
if fingerprint:
normalized["fingerprint"] = fingerprint
return normalized
def _assert_hpc_context_compatible(
*,
run_id: str,
workflow_name: str,
state_hpc_context: dict[str, object],
invocation_hpc_context: dict[str, object],
) -> None:
cli = _cli()
state_enabled = bool(state_hpc_context.get("enabled"))
invocation_enabled = bool(invocation_hpc_context.get("enabled"))
if state_enabled != invocation_enabled:
expected = "--hpc-profile <json>" if state_enabled else "without --hpc-profile"
current = (
"--hpc-profile <json>" if invocation_enabled else "without --hpc-profile"
)
raise cli.PackageError(
f"Run {run_id} was created {expected}; current invocation is {current}. "
"Use --restart or rerun with matching --hpc-profile mode."
)
if not state_enabled:
return
state_scheduler = str(state_hpc_context.get("scheduler") or "").strip().lower()
invocation_scheduler = (
str(invocation_hpc_context.get("scheduler") or "").strip().lower()
)
if state_scheduler != invocation_scheduler:
raise cli.PackageError(
f"Run {run_id} was created with scheduler={state_scheduler!r}; "
f"current {workflow_name} invocation uses {invocation_scheduler!r}. "
"Use --restart or rerun with matching --hpc-profile."
)
state_profile_path = str(state_hpc_context.get("profile_path") or "").strip()
invocation_profile_path = str(
invocation_hpc_context.get("profile_path") or ""
).strip()
if (
state_profile_path
and invocation_profile_path
and state_profile_path != invocation_profile_path
):
raise cli.PackageError(
f"Run {run_id} was created with --hpc-profile={state_profile_path!r}; "
f"current {workflow_name} invocation uses {invocation_profile_path!r}. "
"Use --restart or rerun with matching --hpc-profile."
)
state_fingerprint = str(state_hpc_context.get("fingerprint") or "").strip()
invocation_fingerprint = str(
invocation_hpc_context.get("fingerprint") or ""
).strip()
if (
state_fingerprint
and invocation_fingerprint
and state_fingerprint != invocation_fingerprint
):
raise cli.PackageError(
f"Run {run_id} was created with a different --hpc-profile content fingerprint. "
"Use --restart or rerun with matching --hpc-profile content."
)
def _build_hpc_prompt_lines(hpc_context: dict[str, object] | None) -> list[str]:
context = (
hpc_context if isinstance(hpc_context, dict) else _default_local_hpc_context()
)
lines: list[str] = []
if not bool(context.get("enabled")):
lines.extend(
[
"- execution_target: local machine (workflow default).",
"- Do not assume SLURM/HPC resources by default.",
"- Generate local-run-ready commands/scripts by default.",
]
)
return lines
profile = context.get("profile") if isinstance(context.get("profile"), dict) else {}
default_partition = str(profile.get("slurm_default_partition") or "").strip()
slurm_defaults = str(profile.get("slurm_defaults") or "").strip()
slurm_resource_policy = str(profile.get("slurm_resource_policy") or "").strip()
lines.extend(
[
"- execution_target: HPC SLURM.",
(
f"- slurm_default_partition: `{default_partition}`."
if default_partition
else "- slurm_default_partition: not specified."
),
]
)
lines.append(
(
f"- slurm_defaults: `{slurm_defaults}`."
if slurm_defaults
else "- slurm_defaults: not specified."
)
)
if slurm_resource_policy:
if slurm_resource_policy.endswith((".", "!", "?")):
lines.append(f"- slurm_resource_policy: {slurm_resource_policy}")
else:
lines.append(f"- slurm_resource_policy: {slurm_resource_policy}.")
else:
lines.append("- slurm_resource_policy: not specified.")
lines.append(
"- Generate SLURM-ready scripts and machine-tuned run instructions using this profile."
)
return lines
def _resolve_invocation_data_context(
*,
repo_dir: Path,
run_dir: Path,
workflow_name: str,
args: argparse.Namespace,
) -> dict[str, object]:
cli = _cli()
limits = _normalize_data_scan_limits(args)
filter_settings = _default_data_filter_settings()
mapping_thresholds = _default_data_mapping_thresholds()
raw_data_dir = str(getattr(args, "data_dir", "") or "").strip()
if not raw_data_dir:
return {
"enabled": False,
"workflow": workflow_name,
"read_only": True,
"limits": limits,
"filter_settings": filter_settings,
"mapping_thresholds": mapping_thresholds,
"artifacts": {},
}
resolved_data_dir = cli._resolve_project_path(raw_data_dir)
if not resolved_data_dir.exists():
raise cli.PackageError(f"--data-dir does not exist: {resolved_data_dir}")
if not resolved_data_dir.is_dir():
raise cli.PackageError(f"--data-dir must be a directory: {resolved_data_dir}")
try:
# Force an explicit readability check instead of silently degrading.
next(resolved_data_dir.iterdir(), None)
except OSError as exc:
raise cli.PackageError(
f"--data-dir is not readable: {resolved_data_dir}: {exc}"
) from exc
data_artifacts_root = run_dir / WORKFLOW_DATA_DIRNAME
data_manifest_full_path = data_artifacts_root / WORKFLOW_DATA_MANIFEST_FULL_FILENAME
data_manifest_path = data_artifacts_root / WORKFLOW_DATA_MANIFEST_FILENAME
data_summary_path = data_artifacts_root / WORKFLOW_DATA_SUMMARY_FILENAME
task_data_map_path = data_artifacts_root / WORKFLOW_TASK_DATA_MAP_FILENAME
return {
"enabled": True,
"workflow": workflow_name,
"source_path": str(resolved_data_dir),
"source_path_input": raw_data_dir,
"read_only": not bool(getattr(args, "data_writable", False)),
"limits": limits,
"filter_settings": filter_settings,
"mapping_thresholds": mapping_thresholds,
"artifacts": {
"root": _repo_relative_path(repo_dir, data_artifacts_root),
"manifest_full": _repo_relative_path(repo_dir, data_manifest_full_path),
"manifest_compact": _repo_relative_path(repo_dir, data_manifest_path),
"manifest": _repo_relative_path(repo_dir, data_manifest_path),
"summary": _repo_relative_path(repo_dir, data_summary_path),
"task_map": _repo_relative_path(repo_dir, task_data_map_path),
},
}
def _is_data_context_enabled(data_context: object) -> bool:
return isinstance(data_context, dict) and bool(data_context.get("enabled"))
def _coerce_saved_data_context(state: dict[str, object]) -> dict[str, object]:
raw = state.get("data_context")
if not isinstance(raw, dict):
return {
"enabled": False,
"read_only": True,
"limits": {},
"filter_settings": _default_data_filter_settings(),
"mapping_thresholds": _default_data_mapping_thresholds(),
"artifacts": {},
}
enabled = bool(raw.get("enabled"))
artifacts = raw.get("artifacts")
normalized_artifacts: dict[str, object] = {}
if isinstance(artifacts, dict):
normalized_artifacts = dict(artifacts)
manifest_rel = str(artifacts.get("manifest") or "").strip()
if manifest_rel and not str(artifacts.get("manifest_compact") or "").strip():
normalized_artifacts["manifest_compact"] = manifest_rel
manifest_full_rel = str(artifacts.get("manifest_full") or "").strip()
if manifest_full_rel and not manifest_rel:
normalized_artifacts["manifest"] = manifest_full_rel
normalized: dict[str, object] = {
"enabled": enabled,
"read_only": bool(raw.get("read_only", True)),
"limits": raw.get("limits") if isinstance(raw.get("limits"), dict) else {},
"filter_settings": (
raw.get("filter_settings")
if isinstance(raw.get("filter_settings"), dict)
else _default_data_filter_settings()
),
"mapping_thresholds": (
raw.get("mapping_thresholds")
if isinstance(raw.get("mapping_thresholds"), dict)
else _default_data_mapping_thresholds()
),
"artifacts": normalized_artifacts,
}
source_path = str(raw.get("source_path") or "").strip()
if source_path:
normalized["source_path"] = source_path
source_path_input = str(raw.get("source_path_input") or "").strip()
if source_path_input:
normalized["source_path_input"] = source_path_input
manifest_fingerprint = str(raw.get("manifest_fingerprint") or "").strip()
if manifest_fingerprint:
normalized["manifest_fingerprint"] = manifest_fingerprint
manifest_full_fingerprint = str(raw.get("manifest_full_fingerprint") or "").strip()
if manifest_full_fingerprint:
normalized["manifest_full_fingerprint"] = manifest_full_fingerprint
manifest_compact_fingerprint = str(
raw.get("manifest_compact_fingerprint") or ""
).strip()
if manifest_compact_fingerprint:
normalized["manifest_compact_fingerprint"] = manifest_compact_fingerprint
mapping_mode = str(raw.get("mapping_mode") or "").strip()
if mapping_mode:
normalized["mapping_mode"] = mapping_mode
manifest_stats = raw.get("manifest_stats")
if isinstance(manifest_stats, dict):
normalized["manifest_stats"] = manifest_stats
return normalized
def _assert_data_context_compatible(
*,
run_id: str,
workflow_name: str,
state_data_context: dict[str, object],
invocation_data_context: dict[str, object],
) -> None:
cli = _cli()
state_enabled = bool(state_data_context.get("enabled"))
invocation_enabled = bool(invocation_data_context.get("enabled"))
if state_enabled != invocation_enabled:
expected = "--data-dir <path>" if state_enabled else "without --data-dir"
current = "--data-dir <path>" if invocation_enabled else "without --data-dir"
raise cli.PackageError(
f"Run {run_id} was created {expected}; current invocation is {current}. "
"Use --restart or rerun with matching data-dir mode."
)
if not state_enabled:
return
state_source = str(state_data_context.get("source_path") or "").strip()
invocation_source = str(invocation_data_context.get("source_path") or "").strip()
if state_source != invocation_source:
raise cli.PackageError(
f"Run {run_id} was created with --data-dir={state_source!r}; "
f"current {workflow_name} invocation uses {invocation_source!r}. "
"Use --restart or rerun with matching --data-dir."
)
if bool(state_data_context.get("read_only", True)) != bool(
invocation_data_context.get("read_only", True)
):
expected = (
"read-only"
if bool(state_data_context.get("read_only", True))
else "writable"
)
current = (
"read-only"
if bool(invocation_data_context.get("read_only", True))
else "writable"
)
raise cli.PackageError(
f"Run {run_id} data mode is {expected}; current invocation is {current}. "
"Use --restart or rerun with matching data write policy."
)
state_limits = (
state_data_context.get("limits")
if isinstance(state_data_context.get("limits"), dict)
else {}
)
invocation_limits = (
invocation_data_context.get("limits")
if isinstance(invocation_data_context.get("limits"), dict)
else {}
)
if state_limits != invocation_limits:
raise cli.PackageError(
f"Run {run_id} was created with different data scan limits. "
"Use --restart or rerun with matching --data-max-* flags."
)
state_filter_settings = (
state_data_context.get("filter_settings")
if isinstance(state_data_context.get("filter_settings"), dict)
else {}
)
invocation_filter_settings = (
invocation_data_context.get("filter_settings")
if isinstance(invocation_data_context.get("filter_settings"), dict)
else {}
)
if state_filter_settings != invocation_filter_settings:
raise cli.PackageError(
f"Run {run_id} was created with different data filtering settings. "
"Use --restart or rerun with matching filter configuration."
)
state_mapping_thresholds = (
state_data_context.get("mapping_thresholds")
if isinstance(state_data_context.get("mapping_thresholds"), dict)
else {}
)
invocation_mapping_thresholds = (
invocation_data_context.get("mapping_thresholds")
if isinstance(invocation_data_context.get("mapping_thresholds"), dict)
else {}
)
if state_mapping_thresholds != invocation_mapping_thresholds:
raise cli.PackageError(
f"Run {run_id} was created with different data mapping thresholds. "
"Use --restart or rerun with matching thresholds."
)
def _infer_data_file_type(path: Path) -> str:
ext = path.suffix.lower()
if ext in {".csv", ".tsv", ".txt", ".dat"}:
return "tabular_or_text"
if ext in {".json", ".yaml", ".yml", ".toml", ".ini", ".xml"}:
return "structured_text"
if ext in {
".py",
".ipynb",
".jl",
".m",
".r",
".c",
".h",
".hpp",
".cpp",
".f90",
}:
return "code"
if ext in {".md", ".rst", ".tex", ".pdf"}:
return "document"
if ext in {".png", ".jpg", ".jpeg", ".svg", ".gif", ".webp", ".tif", ".tiff"}:
return "image"
if ext in {".npy", ".npz", ".h5", ".hdf5", ".mat", ".bin"}:
return "binary_data"
if ext in {".log", ".out"}:
return "log"
if ext in {".zip", ".tar", ".gz", ".bz2", ".xz", ".7z"}:
return "archive"
return "unknown"
def _data_file_usefulness_score(entry: dict[str, object]) -> float:
file_type = str(entry.get("type") or "unknown")
size = int(entry.get("size") or 0)
path = str(entry.get("path") or "").replace("\\", "/")
suffix = Path(path).suffix.lower()
basename = Path(path).name.lower()
score = 0.0
if file_type == "tabular_or_text":
score += 4.0
elif file_type == "structured_text":
score += 3.5
elif file_type == "binary_data":
score += 3.0
elif file_type == "document":
score += 2.5
elif file_type == "code":
score += 2.0
elif file_type == "log":
score += 1.5
elif file_type == "unknown":
score += 0.8
lowered = path.lower()
if any(token in lowered for token in BOOST_PATH_TOKENS):
score += 1.5
if any(
token in lowered
for token in (
"param",
"dataset",
"data",
"reference",
"figure",
"result",
)
):
score += 1.2
if suffix in USEFUL_SUFFIXES:
score += 1.0
if any(token in lowered for token in PENALTY_PATH_TOKENS):
score -= 1.2
if basename.startswith("slurm-") and basename.endswith((".out", ".err")):
score -= 4.0
if basename in {"nohup.out"}:
score -= 4.0
if size == 0:
score -= 1.0
elif size > 0 and size <= 1024:
score -= 0.3
return score
def _scan_data_dir_inventory(
*,
data_dir: Path,
max_files: int,
max_total_bytes: int,
max_file_bytes: int,
hash_max_bytes: int,
include_hash: bool,
) -> dict[str, object]:
files: list[dict[str, object]] = []
skipped: list[dict[str, object]] = []
indexed_bytes = 0
truncated_reason = ""
fingerprint = hashlib.sha256()
fingerprint.update(f"root={data_dir}\n".encode("utf-8"))
fingerprint.update(
(
"limits="
f"{max_files}:{max_total_bytes}:{max_file_bytes}:{hash_max_bytes}\n"
).encode("utf-8")
)
stop_scan = False
for root, dir_names, file_names in os.walk(data_dir):
dir_names.sort()
file_names.sort()
root_path = Path(root)
for file_name in file_names:
file_path = root_path / file_name
try:
file_stat = file_path.stat()
except OSError as exc:
rel_unreadable = _repo_relative_path(data_dir, file_path).replace(
"\\", "/"
)
skipped_item = {
"path": rel_unreadable,
"reason": "unreadable",
"error": str(exc),
}
skipped.append(skipped_item)
fingerprint.update(f"S|{rel_unreadable}|unreadable\n".encode("utf-8"))
continue
rel_path = _repo_relative_path(data_dir, file_path).replace("\\", "/")
size = int(file_stat.st_size)
mtime_ns = int(file_stat.st_mtime_ns)
if size > max_file_bytes:
skipped_item = {
"path": rel_path,
"size": size,
"mtime_ns": mtime_ns,
"reason": "file_too_large",
}
skipped.append(skipped_item)
fingerprint.update(
f"S|{rel_path}|{size}|{mtime_ns}|file_too_large\n".encode("utf-8")
)
continue
if len(files) >= max_files:
truncated_reason = "max_files"
stop_scan = True
break
if indexed_bytes + size > max_total_bytes:
truncated_reason = "max_total_bytes"
stop_scan = True
break
item: dict[str, object] = {
"path": rel_path,
"size": size,
"mtime_ns": mtime_ns,
"type": _infer_data_file_type(file_path),
}
if include_hash and size <= hash_max_bytes:
try:
item["sha256"] = hashlib.sha256(file_path.read_bytes()).hexdigest()
except OSError as exc:
item["hash_error"] = str(exc)
files.append(item)
indexed_bytes += size
fingerprint.update(f"F|{rel_path}|{size}|{mtime_ns}\n".encode("utf-8"))
if stop_scan:
break
fingerprint.update(f"truncated_reason={truncated_reason}\n".encode("utf-8"))
return {
"files": files,
"skipped": skipped,
"indexed_bytes": indexed_bytes,
"truncated": bool(truncated_reason),
"truncated_reason": truncated_reason,
"fingerprint": fingerprint.hexdigest(),
}
def _stable_payload_fingerprint(payload: object) -> str:
serialized = json.dumps(
payload,
sort_keys=True,
separators=(",", ":"),
ensure_ascii=True,
)
return hashlib.sha256(serialized.encode("utf-8")).hexdigest()
def _noise_exclusion_rule_for_path(
rel_path: str, *, filter_settings: dict[str, object]
) -> str:
normalized = rel_path.replace("\\", "/").strip("/")
if not normalized:
return "invalid_path"
parts = [part for part in normalized.split("/") if part]
basename = parts[-1]
basename_lower = basename.lower()
drop_cache_dirs = bool(filter_settings.get("drop_cache_dirs", True))
if drop_cache_dirs and any(part in NOISE_DIR_NAMES for part in parts):
return "cache_dir_noise"
ignore_noise = bool(filter_settings.get("ignore_noise", True))
if ignore_noise and basename_lower in NOISE_BASENAME_EXACT:
return "system_noise"
drop_slurm_outputs = bool(filter_settings.get("drop_slurm_outputs", True))
if drop_slurm_outputs and SLURM_BASENAME_RE.match(basename_lower):
return "slurm_output_noise"
drop_ephemeral_suffixes = bool(filter_settings.get("drop_ephemeral_suffixes", True))
suffix = Path(basename).suffix.lower()
if drop_ephemeral_suffixes and suffix in NOISE_SUFFIXES:
return "ephemeral_suffix_noise"
return ""
def _normalize_family_stem(stem: str) -> str:
normalized = stem.lower().strip()
normalized = re.sub(r"[a-f0-9]{8,}", "{hash}", normalized)
normalized = re.sub(
r"(job|run|seed|step|iter|trial|chunk|batch)[-_]?\d+",
r"\1_{num}",
normalized,
)
normalized = re.sub(r"\d+", "{num}", normalized)
normalized = re.sub(r"[_-]{2,}", "_", normalized)
normalized = normalized.strip("_-")
return normalized or "file"
def _build_compact_data_manifest(
*,
full_manifest: dict[str, object],
filter_settings: dict[str, object],
) -> dict[str, object]:
full_files = _manifest_file_items(full_manifest)
excluded_count_by_rule: Counter[str] = Counter()
retained_files: list[dict[str, object]] = []
for item in full_files:
path = str(item.get("path") or "").strip().replace("\\", "/")
if not path:
excluded_count_by_rule["invalid_path"] += 1
continue
rule = _noise_exclusion_rule_for_path(path, filter_settings=filter_settings)
if rule:
excluded_count_by_rule[rule] += 1
continue
retained_files.append(dict(item))
grouped: dict[tuple[str, str, str], list[dict[str, object]]] = defaultdict(list)
for item in retained_files:
path = str(item.get("path") or "").strip().replace("\\", "/")
if not path:
continue
parsed = Path(path)
parent_dir = str(parsed.parent).replace("\\", "/")
if parent_dir == ".":
parent_dir = ""
key = (parent_dir, _normalize_family_stem(parsed.stem), parsed.suffix.lower())
grouped[key].append(item)
family_settings = (
filter_settings.get("family_collapse")
if isinstance(filter_settings.get("family_collapse"), dict)
else {}
)
max_examples = int(family_settings.get("max_examples_per_family") or 3)
compact_files: list[dict[str, object]] = []
collapsed_file_count = 0
for family_key in sorted(grouped.keys()):
family_items = grouped[family_key]
ranked_family = sorted(
family_items,
key=lambda item: (
-_data_file_usefulness_score(item),
str(item.get("path") or ""),
),
)
representative = dict(ranked_family[0])
representative_path = str(representative.get("path") or "").strip()
family_paths = sorted(
str(item.get("path") or "").strip()
for item in ranked_family
if str(item.get("path") or "").strip()
)
family_examples = [
path for path in family_paths if path and path != representative_path
][:max_examples]
family_count = len(family_paths)
collapsed_file_count += max(0, family_count - 1)
representative["family_count"] = family_count
representative["family_examples"] = family_examples
representative["representative_score"] = round(
_data_file_usefulness_score(representative), 4
)
compact_files.append(representative)
compact_files.sort(
key=lambda item: (
-_data_file_usefulness_score(item),
str(item.get("path") or ""),
)
)
excluded_by_rule_sorted = {
key: int(excluded_count_by_rule[key]) for key in sorted(excluded_count_by_rule)
}
prompt_char_estimate = sum(
len(str(item.get("path") or "")) + 96 for item in compact_files
)
full_stats = full_manifest.get("stats")
if not isinstance(full_stats, dict):
full_stats = {}
compact_stats = {
"indexed_files": len(compact_files),
"indexed_bytes": sum(int(item.get("size") or 0) for item in compact_files),
"skipped_files": len(full_manifest.get("skipped") or []),
"truncated": bool(full_stats.get("truncated")),
"truncated_reason": str(full_stats.get("truncated_reason") or ""),
"full_indexed_files": int(full_stats.get("indexed_files") or len(full_files)),
"full_indexed_bytes": int(full_stats.get("indexed_bytes") or 0),
"excluded_files": int(sum(excluded_count_by_rule.values())),
"excluded_count_by_rule": excluded_by_rule_sorted,
"family_collapsed_files": collapsed_file_count,
"family_group_count": len(grouped),
"prompt_char_estimate": prompt_char_estimate,
}
compact_payload: dict[str, object] = {
"manifest_kind": "compact",
"schema_version": DATA_MANIFEST_SCHEMA_VERSION,
"filter_rules_version": DATA_FILTER_RULES_VERSION,
"generated_at_utc": _utc_now_z(),
"data_dir": str(full_manifest.get("data_dir") or ""),
"scan_limits": (
full_manifest.get("scan_limits")
if isinstance(full_manifest.get("scan_limits"), dict)
else {}
),
"filter_settings": filter_settings,
"full_manifest_fingerprint": str(full_manifest.get("fingerprint") or ""),
"files": compact_files,
"skipped": (
full_manifest.get("skipped")
if isinstance(full_manifest.get("skipped"), list)
else []
),
"stats": compact_stats,
}
compact_payload["fingerprint"] = _stable_payload_fingerprint(
{
"manifest_kind": "compact",
"schema_version": DATA_MANIFEST_SCHEMA_VERSION,
"filter_rules_version": DATA_FILTER_RULES_VERSION,
"full_manifest_fingerprint": compact_payload["full_manifest_fingerprint"],
"filter_settings": filter_settings,
"files": compact_files,
"stats": compact_stats,
}
)
return compact_payload
def _is_manifest_payload_compatible(
payload: dict[str, object],
*,
manifest_kind: str,
source_data_dir: Path,
limits: dict[str, object],
fingerprint: str | None = None,
filter_settings: dict[str, object] | None = None,
) -> bool:
if str(payload.get("manifest_kind") or "") != manifest_kind:
return False
if int(payload.get("schema_version") or 0) != DATA_MANIFEST_SCHEMA_VERSION:
return False
if int(payload.get("filter_rules_version") or 0) != DATA_FILTER_RULES_VERSION:
return False
if str(payload.get("data_dir") or "") != str(source_data_dir):
return False
payload_limits = payload.get("scan_limits")
if not isinstance(payload_limits, dict) or payload_limits != limits:
return False
if isinstance(filter_settings, dict):
payload_filters = payload.get("filter_settings")
if not isinstance(payload_filters, dict) or payload_filters != filter_settings:
return False
if isinstance(fingerprint, str) and fingerprint:
if str(payload.get("fingerprint") or "") != fingerprint:
return False
return True
def _load_json_if_exists(path: Path) -> dict[str, object] | None:
if not path.is_file():
return None
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
return payload if isinstance(payload, dict) else None
def _write_text_file(path: Path, content: str) -> None:
cli = _cli()
path.parent.mkdir(parents=True, exist_ok=True)
try:
path.write_text(content, encoding="utf-8")
except OSError as exc:
raise cli.PackageError(f"Failed to write file: {path}: {exc}") from exc
def _render_data_summary_markdown(manifest: dict[str, object]) -> str:
files = manifest.get("files")
if not isinstance(files, list):
files = []
skipped = manifest.get("skipped")
if not isinstance(skipped, list):
skipped = []
stats = manifest.get("stats")
if not isinstance(stats, dict):
stats = {}
type_counts: Counter[str] = Counter()
for item in files:
if not isinstance(item, dict):
continue
type_counts[str(item.get("type") or "unknown")] += 1
sorted_files = [
item for item in files if isinstance(item, dict) and str(item.get("path") or "")
]
sorted_files.sort(
key=lambda item: (
-_data_file_usefulness_score(item),
str(item.get("path") or ""),
)
)
likely_useful = sorted_files[:12]
unknown_files = [
item for item in sorted_files if str(item.get("type") or "") == "unknown"
][:12]
manifest_kind = str(manifest.get("manifest_kind") or "legacy")
indexed_files = int(stats.get("indexed_files") or len(files))
indexed_bytes = int(stats.get("indexed_bytes") or 0)
truncated = bool(stats.get("truncated"))
truncated_reason = str(stats.get("truncated_reason") or "").strip()
full_indexed_files = int(stats.get("full_indexed_files") or indexed_files)
full_indexed_bytes = int(stats.get("full_indexed_bytes") or indexed_bytes)
excluded_files = int(stats.get("excluded_files") or 0)
family_collapsed_files = int(stats.get("family_collapsed_files") or 0)
prompt_char_estimate = int(stats.get("prompt_char_estimate") or 0)
compact_ratio = (
(indexed_files / full_indexed_files) if full_indexed_files > 0 else 1.0
)
excluded_count_by_rule = stats.get("excluded_count_by_rule")
if not isinstance(excluded_count_by_rule, dict):
excluded_count_by_rule = {}
lines = [
"# Data Summary",
"",
"## Scope",
f"- source_data_dir: {manifest.get('data_dir')}",
f"- manifest_kind: {manifest_kind}",
f"- schema_version: {manifest.get('schema_version')}",
f"- filter_rules_version: {manifest.get('filter_rules_version')}",
f"- indexed_files: {indexed_files}",
f"- indexed_bytes: {indexed_bytes}",
f"- full_indexed_files: {full_indexed_files}",
f"- full_indexed_bytes: {full_indexed_bytes}",
f"- compact_ratio: {compact_ratio:.4f}",
f"- excluded_files: {excluded_files}",
f"- family_collapsed_files: {family_collapsed_files}",
f"- prompt_char_estimate: {prompt_char_estimate}",
f"- skipped_files: {len(skipped)}",
f"- inventory_truncated: {truncated}",
]
if truncated_reason:
lines.append(f"- truncated_reason: {truncated_reason}")
lines.extend(["", "## Type Breakdown"])
if type_counts:
for type_name, count in sorted(type_counts.items(), key=lambda item: item[0]):
lines.append(f"- {type_name}: {count}")
else:
lines.append("- no indexed files")
lines.extend(["", "## Likely Useful Subsets"])
if likely_useful:
for item in likely_useful:
lines.append(
f"- {item.get('path')} ({item.get('type')}, {item.get('size')} bytes)"
)
else:
lines.append("- none")
lines.extend(["", "## Unknown/Low-Confidence Areas"])
if unknown_files:
for item in unknown_files:
lines.append(f"- {item.get('path')} (type unknown)")
else:
lines.append("- none detected")
if skipped:
lines.extend(["", "## Skipped Files"])
for item in skipped[:12]:
if isinstance(item, dict):
path = str(item.get("path") or "")
reason = str(item.get("reason") or "skipped")
size = item.get("size")
if isinstance(size, int):
lines.append(f"- {path} ({reason}, {size} bytes)")
else:
lines.append(f"- {path} ({reason})")
if len(skipped) > 12:
lines.append(f"- ... {len(skipped) - 12} more skipped entries")
if excluded_count_by_rule:
lines.extend(["", "## Excluded By Rule"])
for rule, count in sorted(
excluded_count_by_rule.items(), key=lambda item: item[0]
):
lines.append(f"- {rule}: {int(count)}")
return "\n".join(lines).strip() + "\n"
def _prepare_workflow_data_artifacts(
*,
repo_dir: Path,
run_dir: Path,
data_context: dict[str, object],
) -> dict[str, object]:
cli = _cli()
if not _is_data_context_enabled(data_context):
return data_context
source_path = str(data_context.get("source_path") or "").strip()
if not source_path:
raise cli.PackageError("Data context is missing source_path.")
source_data_dir = Path(source_path)
limits = data_context.get("limits")
if not isinstance(limits, dict):
raise cli.PackageError("Data context is missing scan limits.")
filter_settings = data_context.get("filter_settings")
if not isinstance(filter_settings, dict):
filter_settings = _default_data_filter_settings()
mapping_thresholds = data_context.get("mapping_thresholds")
if not isinstance(mapping_thresholds, dict):
mapping_thresholds = _default_data_mapping_thresholds()
max_files = int(limits.get("max_files") or DEFAULT_DATA_MAX_FILES)
max_total_bytes = int(limits.get("max_total_bytes") or DEFAULT_DATA_MAX_TOTAL_BYTES)
max_file_bytes = int(limits.get("max_file_bytes") or DEFAULT_DATA_MAX_FILE_BYTES)
hash_max_bytes = int(limits.get("hash_max_bytes") or DEFAULT_DATA_HASH_MAX_BYTES)
data_root = run_dir / WORKFLOW_DATA_DIRNAME
data_root.mkdir(parents=True, exist_ok=True)
full_manifest_path = data_root / WORKFLOW_DATA_MANIFEST_FULL_FILENAME
compact_manifest_path = data_root / WORKFLOW_DATA_MANIFEST_FILENAME
summary_path = data_root / WORKFLOW_DATA_SUMMARY_FILENAME
fast_scan = _scan_data_dir_inventory(
data_dir=source_data_dir,
max_files=max_files,
max_total_bytes=max_total_bytes,
max_file_bytes=max_file_bytes,
hash_max_bytes=hash_max_bytes,
include_hash=False,
)
fast_fingerprint = str(fast_scan.get("fingerprint") or "")
existing_full_manifest = _load_json_if_exists(full_manifest_path)
existing_compact_manifest = _load_json_if_exists(compact_manifest_path)
use_cached_manifest = False
full_manifest_payload: dict[str, object] = {}
compact_manifest_payload: dict[str, object] = {}
if isinstance(existing_full_manifest, dict) and isinstance(
existing_compact_manifest, dict
):
full_ok = _is_manifest_payload_compatible(
existing_full_manifest,
manifest_kind="full",
source_data_dir=source_data_dir,
limits=limits,
fingerprint=fast_fingerprint,
)
compact_ok = _is_manifest_payload_compatible(
existing_compact_manifest,
manifest_kind="compact",
source_data_dir=source_data_dir,
limits=limits,
filter_settings=filter_settings,
)
linked = str(existing_compact_manifest.get("full_manifest_fingerprint") or "")
full_fingerprint = str(existing_full_manifest.get("fingerprint") or "")
if full_ok and compact_ok and linked == full_fingerprint:
full_manifest_payload = existing_full_manifest
compact_manifest_payload = existing_compact_manifest
use_cached_manifest = True
if not use_cached_manifest:
full_scan = _scan_data_dir_inventory(
data_dir=source_data_dir,
max_files=max_files,
max_total_bytes=max_total_bytes,
max_file_bytes=max_file_bytes,
hash_max_bytes=hash_max_bytes,
include_hash=True,
)
full_manifest_payload = {
"manifest_kind": "full",
"schema_version": DATA_MANIFEST_SCHEMA_VERSION,
"filter_rules_version": DATA_FILTER_RULES_VERSION,
"generated_at_utc": _utc_now_z(),
"data_dir": str(source_data_dir),
"scan_limits": limits,
"filter_settings": filter_settings,
"fingerprint": str(full_scan.get("fingerprint") or ""),
"files": (
full_scan.get("files")
if isinstance(full_scan.get("files"), list)
else []
),
"skipped": (
full_scan.get("skipped")
if isinstance(full_scan.get("skipped"), list)
else []
),
"stats": {
"indexed_files": len(full_scan.get("files") or []),
"indexed_bytes": int(full_scan.get("indexed_bytes") or 0),
"skipped_files": len(full_scan.get("skipped") or []),
"truncated": bool(full_scan.get("truncated")),
"truncated_reason": str(full_scan.get("truncated_reason") or ""),
},
}
compact_manifest_payload = _build_compact_data_manifest(
full_manifest=full_manifest_payload,
filter_settings=filter_settings,
)
_write_json_atomic(full_manifest_path, full_manifest_payload)
_write_json_atomic(compact_manifest_path, compact_manifest_payload)
summary_text = _render_data_summary_markdown(compact_manifest_payload)
_write_text_file(summary_path, summary_text)
stats = compact_manifest_payload.get("stats")
data_context["artifacts"] = {
"root": _repo_relative_path(repo_dir, data_root),
"manifest_full": _repo_relative_path(repo_dir, full_manifest_path),
"manifest_compact": _repo_relative_path(repo_dir, compact_manifest_path),
"manifest": _repo_relative_path(repo_dir, compact_manifest_path),
"summary": _repo_relative_path(repo_dir, summary_path),
"task_map": _repo_relative_path(
repo_dir, data_root / WORKFLOW_TASK_DATA_MAP_FILENAME
),
}
data_context["filter_settings"] = filter_settings
data_context["mapping_thresholds"] = mapping_thresholds
data_context["manifest_full_fingerprint"] = str(
full_manifest_payload.get("fingerprint") or ""
)
data_context["manifest_compact_fingerprint"] = str(
compact_manifest_payload.get("fingerprint") or ""
)
data_context["manifest_fingerprint"] = str(
compact_manifest_payload.get("fingerprint") or ""
)
data_context["guard_fingerprint"] = fast_fingerprint
if isinstance(stats, dict):
data_context["manifest_stats"] = stats
return data_context
def _normalize_string_list(raw: object) -> list[str]:
if isinstance(raw, list):
values: list[str] = []
for item in raw:
text = str(item).strip()
if text:
values.append(text)
return values
if isinstance(raw, str):
text = raw.strip()
return [text] if text else []
return []
def _sanitize_task_id(raw_id: object, index: int, used: set[str]) -> str:
candidate = str(raw_id).strip().lower() if raw_id is not None else ""
if not candidate:
candidate = f"task_{index:03d}"
candidate = re.sub(r"[^a-z0-9_-]+", "_", candidate).strip("_")
if not candidate:
candidate = f"task_{index:03d}"
if not candidate.startswith("task_"):
candidate = f"task_{candidate}"
deduped = candidate
suffix = 2
while deduped in used:
deduped = f"{candidate}_{suffix}"
suffix += 1
used.add(deduped)
return deduped
def _render_reproduce_task_prompt(
task: dict[str, object],
*,
hpc_context: dict[str, object] | None = None,
) -> str:
task_id = str(task.get("id") or "task")
title = str(task.get("title") or "Reproduce task").strip()
objective = str(task.get("objective") or "").strip()
figure_targets = _normalize_string_list(task.get("figure_targets"))
simulation_requirements = _normalize_string_list(
task.get("simulation_requirements")
)
parameter_constraints = _normalize_string_list(task.get("parameter_constraints"))
plot_requirements = _normalize_string_list(task.get("plot_requirements"))
acceptance_checks = _normalize_string_list(task.get("acceptance_checks"))
lines: list[str] = [
f"# Reproduce Task {task_id}: {title}",
"",
"## Objective",
objective or "Reproduce the requested scientific result for this task.",
]
if figure_targets:
lines.extend(
["", "## Figure targets", *[f"- {item}" for item in figure_targets]]
)
if simulation_requirements:
lines.extend(
[
"",
"## Simulation requirements",
*[f"- {item}" for item in simulation_requirements],
]
)
if parameter_constraints:
lines.extend(
[
"",
"## Parameter constraints",
*[f"- {item}" for item in parameter_constraints],
]
)
if plot_requirements:
lines.extend(
["", "## Plot requirements", *[f"- {item}" for item in plot_requirements]]
)
if acceptance_checks:
lines.extend(
["", "## Acceptance checks", *[f"- {item}" for item in acceptance_checks]]
)
lines.extend(["", "## Execution target", *_build_hpc_prompt_lines(hpc_context)])
lines.extend(
[
"",
"## Execution notes",
"- Keep scripts, data, and plots reproducible.",
"- Save run details and blockers in projects/memory.md.",
"- Execute simulation work required by the task plan.",
]
)
return "\n".join(lines).strip() + "\n"
def _extract_tagged_json_payload(
assistant_text: str, *, token_re: re.Pattern[str]
) -> dict[str, object] | None:
if not isinstance(assistant_text, str) or not assistant_text.strip():
return None
matches = token_re.findall(assistant_text)
if not matches:
return None
raw_payload = matches[-1].strip()
if not raw_payload:
return None
try:
parsed = json.loads(raw_payload)
except json.JSONDecodeError:
return None
if not isinstance(parsed, dict):
return None
return parsed
def _extract_reproduce_plan_payload(assistant_text: str) -> dict[str, object] | None:
return _extract_tagged_json_payload(
assistant_text, token_re=REPRODUCE_PLAN_TOKEN_RE
)
def _extract_research_plan_payload(assistant_text: str) -> dict[str, object] | None:
return _extract_tagged_json_payload(assistant_text, token_re=RESEARCH_PLAN_TOKEN_RE)
def _extract_task_data_map_payload(assistant_text: str) -> dict[str, object] | None:
return _extract_tagged_json_payload(
assistant_text, token_re=WORKFLOW_TASK_DATA_MAP_TOKEN_RE
)
def _coerce_confidence(raw: object, *, default: float = 0.5) -> float:
try:
value = float(raw)
except (TypeError, ValueError):
return default
if value < 0.0:
return 0.0
if value > 1.0:
return 1.0
return value
def _manifest_paths_set(manifest_payload: dict[str, object]) -> set[str]:
files = manifest_payload.get("files")
if not isinstance(files, list):
return set()
paths: set[str] = set()
for item in files:
if not isinstance(item, dict):
continue
path = str(item.get("path") or "").strip().replace("\\", "/")
if path:
paths.add(path)
return paths
def _manifest_file_items(
manifest_payload: dict[str, object],
) -> list[dict[str, object]]:
files = manifest_payload.get("files")
if not isinstance(files, list):
return []
return [item for item in files if isinstance(item, dict)]
def _tokenize_task_text(raw_text: str) -> set[str]:
return {
token
for token in re.findall(r"[a-z0-9]{3,}", raw_text.lower())
if token not in {"task", "with", "from", "that", "this", "figure"}
}
def _task_blob(task: dict[str, object]) -> str:
return " ".join(
[
str(task.get("id") or ""),
str(task.get("title") or ""),
str(task.get("objective") or ""),
" ".join(_normalize_string_list(task.get("figure_targets"))),
" ".join(_normalize_string_list(task.get("simulation_requirements"))),
" ".join(_normalize_string_list(task.get("parameter_constraints"))),
" ".join(_normalize_string_list(task.get("plot_requirements"))),
" ".join(_normalize_string_list(task.get("acceptance_checks"))),
]
)
def _manifest_entry_category(item: dict[str, object]) -> str:
path = str(item.get("path") or "").lower()
suffix = Path(path).suffix.lower()
if any(token in path for token in ("input", "inputs", "dataset", "raw")):
return "input"
if any(token in path for token in ("config", "configs", "param", "setting")):
return "config"
if any(token in path for token in ("postprocess", "post_process", "analysis")):
return "postprocess"
if any(token in path for token in ("plot", "plots", "figure", "fig")):
return "plot"
if any(token in path for token in ("script", "scripts", "code")) or suffix in {
".py",
".ipynb",
}:
return "script"
return "other"
def _task_manifest_relevance_score(
*,
task_tokens: set[str],
file_item: dict[str, object],
) -> float:
path = str(file_item.get("path") or "").replace("/", " ")
path_tokens = _tokenize_task_text(path)
overlap = len(task_tokens.intersection(path_tokens))
score = _data_file_usefulness_score(file_item) + overlap * 1.4
if overlap > 0:
score += 1.5
return score
def _select_task_manifest_slice(
*,
task: dict[str, object],
manifest_payload: dict[str, object],
mapping_thresholds: dict[str, object],
) -> list[dict[str, object]]:
manifest_files = _manifest_file_items(manifest_payload)
if not manifest_files:
return []
task_tokens = _tokenize_task_text(_task_blob(task))
scored = sorted(
(
(
_task_manifest_relevance_score(
task_tokens=task_tokens,
file_item=item,
),
item,
)
for item in manifest_files
),
key=lambda pair: (-pair[0], str(pair[1].get("path") or "")),
)
max_files = int(
mapping_thresholds.get("task_slice_max_files")
or DEFAULT_DATA_TASK_SLICE_MAX_FILES
)
max_chars = int(
mapping_thresholds.get("task_slice_max_chars")
or DEFAULT_DATA_TASK_SLICE_MAX_CHARS
)
selected: list[dict[str, object]] = []
selected_paths: set[str] = set()
selected_chars = 0
def _try_add(item: dict[str, object]) -> bool:
nonlocal selected_chars
path = str(item.get("path") or "").strip()
if not path or path in selected_paths:
return False
estimated_chars = len(path) + 128
if len(selected) >= max_files:
return False
if selected and selected_chars + estimated_chars > max_chars:
return False
selected.append(item)
selected_paths.add(path)
selected_chars += estimated_chars
return True
for category in sorted(TASK_SLICE_COVERAGE_CATEGORIES):
for _, item in scored:
if _manifest_entry_category(item) != category:
continue
if _try_add(item):
break
for _, item in scored:
_try_add(item)
if len(selected) >= max_files or selected_chars >= max_chars:
break
if not selected:
for _, item in scored[:max_files]:
if not _try_add(item):
break
return selected
def _estimate_mapping_prompt_chars(
*,
manifest_payload: dict[str, object],
planner_plan: dict[str, object],
summary_text: str,
) -> int:
stats = manifest_payload.get("stats")
prompt_char_estimate = 0
if isinstance(stats, dict):
prompt_char_estimate = int(stats.get("prompt_char_estimate") or 0)
if prompt_char_estimate <= 0:
prompt_char_estimate = sum(
len(str(item.get("path") or "")) + 96
for item in _manifest_file_items(manifest_payload)
)
plan_chars = len(json.dumps(planner_plan, separators=(",", ":"), sort_keys=True))
return prompt_char_estimate + len(summary_text) + plan_chars + 2_000
def _select_data_mapping_mode(
*,
manifest_payload: dict[str, object],
planner_plan: dict[str, object],
summary_text: str,
mapping_thresholds: dict[str, object],
) -> str:
file_count = len(_manifest_file_items(manifest_payload))
estimated_chars = _estimate_mapping_prompt_chars(
manifest_payload=manifest_payload,
planner_plan=planner_plan,
summary_text=summary_text,
)
large_threshold_files = int(
mapping_thresholds.get("large_threshold_files")
or DEFAULT_DATA_LARGE_THRESHOLD_FILES
)
large_threshold_chars = int(
mapping_thresholds.get("large_threshold_chars")
or DEFAULT_DATA_LARGE_THRESHOLD_CHARS
)
if file_count > large_threshold_files or estimated_chars > large_threshold_chars:
return DATA_MAPPING_MODE_PER_TASK_LARGE
return DATA_MAPPING_MODE_GLOBAL
def _normalize_task_data_map(
raw_payload: object,
*,
plan_tasks: list[dict[str, object]],
manifest_payload: dict[str, object],
source_stage: str,
allowed_paths: set[str] | None = None,
) -> dict[str, object]:
cli = _cli()
if not isinstance(raw_payload, dict):
raise cli.PackageError("Task data map must be a JSON object.")
manifest_paths = (
allowed_paths
if isinstance(allowed_paths, set)
else _manifest_paths_set(manifest_payload)
)
raw_tasks = raw_payload.get("tasks")
if not isinstance(raw_tasks, list):
raise cli.PackageError("Task data map must include `tasks` list.")
raw_by_id: dict[str, dict[str, object]] = {}
for item in raw_tasks:
if not isinstance(item, dict):
continue
task_id = str(item.get("id") or "").strip()
if task_id and task_id not in raw_by_id:
raw_by_id[task_id] = item
normalized_tasks: list[dict[str, object]] = []
for index, task in enumerate(plan_tasks, start=1):
if not isinstance(task, dict):
continue
task_id = str(task.get("id") or f"task_{index:03d}").strip()
raw_task = raw_by_id.get(task_id, {})
files_payload = raw_task.get("files")
normalized_files: list[dict[str, object]] = []
seen_paths: set[str] = set()
if isinstance(files_payload, list):
for file_item in files_payload:
path = ""
rationale = ""
confidence = 0.5
if isinstance(file_item, dict):
path = str(file_item.get("path") or "").strip().replace("\\", "/")
rationale = str(file_item.get("rationale") or "").strip()
confidence = _coerce_confidence(file_item.get("confidence"))
elif isinstance(file_item, str):
path = file_item.strip().replace("\\", "/")
if not path or path in seen_paths:
continue
if path not in manifest_paths:
raise cli.PackageError(
f"Task {task_id} references unknown manifest path: {path}"
)
seen_paths.add(path)
normalized_files.append(
{
"path": path,
"rationale": rationale or "Mapped by workflow data auditor.",
"confidence": confidence,
}
)
normalized_task = {
"id": task_id,
"files": normalized_files,
"unknowns": _normalize_string_list(raw_task.get("unknowns")),
"notes": _normalize_string_list(raw_task.get("notes")),
}
normalized_tasks.append(normalized_task)
normalized_payload = {
"version": 1,
"source_stage": source_stage,
"generated_at_utc": _utc_now_z(),
"tasks": normalized_tasks,
"global_unknowns": _normalize_string_list(raw_payload.get("global_unknowns")),
}
mapping_mode = str(raw_payload.get("mapping_mode") or "").strip()
if mapping_mode:
normalized_payload["mapping_mode"] = mapping_mode
return normalized_payload
def _build_fallback_task_data_map(
*,
plan_tasks: list[dict[str, object]],
manifest_payload: dict[str, object],
reason: str,
) -> dict[str, object]:
manifest_files = _manifest_file_items(manifest_payload)
ranked_manifest = sorted(
manifest_files,
key=lambda item: (
-_data_file_usefulness_score(item),
str(item.get("path") or ""),
),
)
default_candidates = ranked_manifest[:12]
normalized_tasks: list[dict[str, object]] = []
for index, task in enumerate(plan_tasks, start=1):
if not isinstance(task, dict):
continue
task_id = str(task.get("id") or f"task_{index:03d}").strip()
task_blob = " ".join(
[
str(task.get("title") or ""),
str(task.get("objective") or ""),
" ".join(_normalize_string_list(task.get("figure_targets"))),
" ".join(_normalize_string_list(task.get("simulation_requirements"))),
" ".join(_normalize_string_list(task.get("parameter_constraints"))),
]
)
task_tokens = _tokenize_task_text(task_blob)
scored: list[tuple[float, dict[str, object]]] = []
for file_item in ranked_manifest:
path = str(file_item.get("path") or "")
path_tokens = _tokenize_task_text(path.replace("/", " "))
overlap = len(task_tokens.intersection(path_tokens))
score = _data_file_usefulness_score(file_item) + overlap * 1.2
if overlap > 0:
score += 1.0
scored.append((score, file_item))
scored.sort(key=lambda item: (-item[0], str(item[1].get("path") or "")))
selected = [item for _, item in scored[:DEFAULT_DATA_TASK_FALLBACK_FILES]]
if not selected:
selected = default_candidates[:DEFAULT_DATA_TASK_FALLBACK_FILES]
files_payload: list[dict[str, object]] = []
for file_item in selected:
path = str(file_item.get("path") or "").strip()
if not path:
continue
files_payload.append(
{
"path": path,
"rationale": "Heuristic fallback selection from manifest metadata.",
"confidence": 0.25,
}
)
normalized_tasks.append(
{
"id": task_id,
"files": files_payload,
"unknowns": [
"LLM task-data mapping fallback was used; verify file relevance manually."
],
"notes": [reason],
}
)
return {
"version": 1,
"source_stage": "heuristic_fallback",
"generated_at_utc": _utc_now_z(),
"tasks": normalized_tasks,
"global_unknowns": [reason],
}
def _build_manifest_prompt_excerpt(
manifest_payload: dict[str, object],
*,
file_items: list[dict[str, object]] | None = None,
max_files: int = DEFAULT_DATA_PROMPT_MAX_FILES,
max_chars: int | None = None,
) -> str:
files = (
[item for item in file_items if isinstance(item, dict)]
if isinstance(file_items, list)
else _manifest_file_items(manifest_payload)
)
files = sorted(
files,
key=lambda item: (
-_data_file_usefulness_score(item),
str(item.get("path") or ""),
),
)
lines: list[str] = []
char_budget = max_chars if isinstance(max_chars, int) and max_chars > 0 else None
used_chars = 0
used_files = 0
for item in files:
if used_files >= max_files:
break
path = str(item.get("path") or "").strip()
if not path:
continue
family_count = int(item.get("family_count") or 1)
examples = _normalize_string_list(item.get("family_examples"))
line = (
f"- {path} | type={item.get('type')} | size={item.get('size')} | "
f"mtime_ns={item.get('mtime_ns')} | family_count={family_count}"
)
if examples:
line += f" | more_like_this={', '.join(examples)}"
projected = used_chars + len(line) + 1
if char_budget is not None and used_files > 0 and projected > char_budget:
break
lines.append(line)
used_chars = projected
used_files += 1
if len(files) > used_files:
lines.append(f"- ... {len(files) - used_files} more indexed files")
skipped = manifest_payload.get("skipped")
if file_items is None and isinstance(skipped, list) and skipped:
lines.append("")
lines.append("Skipped entries:")
for item in skipped[:24]:
if not isinstance(item, dict):
continue
lines.append(
f"- {item.get('path')} | reason={item.get('reason')} | size={item.get('size')}"
)
if len(skipped) > 24:
lines.append(f"- ... {len(skipped) - 24} more skipped entries")
return "\n".join(lines).strip()
def _generate_task_data_map(
*,
repo_dir: Path,
run_dir: Path | None,
source_description: str,
planner_plan: dict[str, object],
compact_manifest_payload: dict[str, object],
full_manifest_payload: dict[str, object] | None,
summary_text: str,
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
max_tries: int,
log_tag: str,
data_context: dict[str, object],
) -> dict[str, object]:
cli = _cli()
raw_plan_tasks = planner_plan.get("tasks")
if not isinstance(raw_plan_tasks, list):
raise cli.PackageError("Planner plan is missing task list for data mapping.")
plan_tasks = [task for task in raw_plan_tasks if isinstance(task, dict)]
mapping_thresholds = (
data_context.get("mapping_thresholds")
if isinstance(data_context.get("mapping_thresholds"), dict)
else _default_data_mapping_thresholds()
)
mapping_mode = _select_data_mapping_mode(
manifest_payload=compact_manifest_payload,
planner_plan=planner_plan,
summary_text=summary_text,
mapping_thresholds=mapping_thresholds,
)
data_context["mapping_mode"] = mapping_mode
data_context["mapping_mode_selected_at_utc"] = _utc_now_z()
cli._print_tagged(log_tag, f"data mapping mode: {mapping_mode}")
allowed_paths = _manifest_paths_set(compact_manifest_payload)
if isinstance(full_manifest_payload, dict):
allowed_paths = allowed_paths.union(_manifest_paths_set(full_manifest_payload))
def _run_and_normalize_map(
*,
prompt: str,
target_tasks: list[dict[str, object]],
source_stage: str,
attempt_label: str,
) -> dict[str, object] | None:
for attempt in range(1, max_tries + 1):
cli._print_tagged(
log_tag,
f"{attempt_label} attempt {attempt}/{max_tries}",
)
run_result = _run_reproduce_exec_turn(
repo_dir=repo_dir,
prompt=prompt,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
data_context=data_context,
)
return_code = int(run_result.get("return_code") or 0)
if return_code != 0:
cli._print_tagged(
log_tag,
f"{attempt_label} run exited with code {return_code}.",
stderr=True,
)
continue
assistant_text = str(run_result.get("assistant_text") or "")
raw_payload = _extract_task_data_map_payload(assistant_text)
if raw_payload is None:
cli._print_tagged(
log_tag,
f"{attempt_label} response missing <{WORKFLOW_TASK_DATA_MAP_TAG}> block.",
stderr=True,
)
continue
try:
return _normalize_task_data_map(
raw_payload,
plan_tasks=target_tasks,
manifest_payload=compact_manifest_payload,
source_stage=source_stage,
allowed_paths=allowed_paths,
)
except cli.PackageError as exc:
cli._print_tagged(
log_tag,
f"{attempt_label} response invalid: {exc}",
stderr=True,
)
continue
return None
if mapping_mode == DATA_MAPPING_MODE_GLOBAL:
prompt = (
f"{WORKFLOW_DATA_AUDITOR_PROMPT_PREFIX}\n\n"
f"Workflow: {log_tag}\n"
f"Source description: {source_description}\n"
f"Data directory: {data_context.get('source_path')}\n"
"Mapping mode: global\n\n"
"Draft planner task JSON:\n"
f"{json.dumps(planner_plan, indent=2)}\n\n"
"Data summary markdown:\n"
f"{summary_text.strip()}\n\n"
"Compact data manifest excerpt:\n"
f"{_build_manifest_prompt_excerpt(compact_manifest_payload)}\n"
)
normalized_payload = _run_and_normalize_map(
prompt=prompt,
target_tasks=plan_tasks,
source_stage="data_auditor_global",
attempt_label="data auditor global",
)
if normalized_payload is None:
normalized_payload = _build_fallback_task_data_map(
plan_tasks=plan_tasks,
manifest_payload=compact_manifest_payload,
reason="data auditor output invalid; used deterministic fallback mapping",
)
cli._print_tagged(
log_tag,
"data auditor fallback activated (deterministic heuristic map).",
stderr=True,
)
normalized_payload["mapping_mode"] = DATA_MAPPING_MODE_GLOBAL
return normalized_payload
task_entries: list[dict[str, object]] = []
global_unknowns: list[str] = []
max_slice_files = int(
mapping_thresholds.get("task_slice_max_files")
or DEFAULT_DATA_TASK_SLICE_MAX_FILES
)
max_slice_chars = int(
mapping_thresholds.get("task_slice_max_chars")
or DEFAULT_DATA_TASK_SLICE_MAX_CHARS
)
for index, task in enumerate(plan_tasks, start=1):
task_id = str(task.get("id") or f"task_{index:03d}").strip()
task_slice = _select_task_manifest_slice(
task=task,
manifest_payload=compact_manifest_payload,
mapping_thresholds=mapping_thresholds,
)
slice_excerpt = _build_manifest_prompt_excerpt(
compact_manifest_payload,
file_items=task_slice,
max_files=max_slice_files,
max_chars=max_slice_chars,
)
prompt = (
f"{WORKFLOW_DATA_AUDITOR_PROMPT_PREFIX}\n\n"
f"Workflow: {log_tag}\n"
f"Source description: {source_description}\n"
f"Data directory: {data_context.get('source_path')}\n"
"Mapping mode: per_task_large\n\n"
"Task JSON:\n"
f"{json.dumps(task, indent=2)}\n\n"
"Data summary markdown:\n"
f"{summary_text.strip()}\n\n"
"Task-specific compact manifest slice:\n"
f"{slice_excerpt}\n"
)
normalized_payload = _run_and_normalize_map(
prompt=prompt,
target_tasks=[task],
source_stage="data_auditor_per_task",
attempt_label=f"data auditor task {task_id}",
)
if normalized_payload is None:
fallback_reason = f"data auditor output invalid for {task_id}; used deterministic fallback mapping"
normalized_payload = _build_fallback_task_data_map(
plan_tasks=[task],
manifest_payload=compact_manifest_payload,
reason=fallback_reason,
)
global_unknowns.append(fallback_reason)
cli._print_tagged(
log_tag,
f"data auditor fallback activated for {task_id}.",
stderr=True,
)
mapped_tasks = normalized_payload.get("tasks")
if (
isinstance(mapped_tasks, list)
and mapped_tasks
and isinstance(mapped_tasks[0], dict)
):
task_entries.append(mapped_tasks[0])
else:
task_entries.append(
{
"id": task_id,
"files": [],
"unknowns": [
"LLM task-data mapping returned no parseable task entry."
],
"notes": ["empty per-task mapping payload"],
}
)
if run_dir is not None:
partial_payload = {
"version": 1,
"source_stage": "data_auditor_per_task_large",
"generated_at_utc": _utc_now_z(),
"mapping_mode": DATA_MAPPING_MODE_PER_TASK_LARGE,
"tasks": [dict(item) for item in task_entries],
"global_unknowns": list(global_unknowns),
}
_materialize_task_data_artifacts(
repo_dir=repo_dir,
run_dir=run_dir,
plan=planner_plan,
task_data_map=partial_payload,
)
return {
"version": 1,
"source_stage": "data_auditor_per_task_large",
"generated_at_utc": _utc_now_z(),
"mapping_mode": DATA_MAPPING_MODE_PER_TASK_LARGE,
"tasks": task_entries,
"global_unknowns": global_unknowns,
}
def _render_task_data_context_markdown(
*,
task_id: str,
task_title: str,
task_entry: dict[str, object],
task_map_relpath: str,
) -> str:
files = task_entry.get("files")
if not isinstance(files, list):
files = []
unknowns = _normalize_string_list(task_entry.get("unknowns"))
notes = _normalize_string_list(task_entry.get("notes"))
lines = [
f"# Data Context: {task_id}",
"",
f"- task_title: {task_title or task_id}",
f"- task_map_source: `{task_map_relpath}`",
"",
"## Scope Rule",
"- Only use files listed below unless strong evidence supports expansion.",
(
"- If expansion is required, update task_data_map.json and this file with "
"new file paths, rationale, and confidence before continuing."
),
"",
"## Allowed Files",
]
if files:
for item in files:
if not isinstance(item, dict):
continue
path = str(item.get("path") or "").strip()
rationale = str(item.get("rationale") or "").strip()
confidence = _coerce_confidence(item.get("confidence"))
if not path:
continue
lines.append(
f"- `{path}` (confidence: {confidence:.2f})"
+ (f" - {rationale}" if rationale else "")
)
else:
lines.append(
"- No mapped files yet. Review data_summary.md and task_data_map.json."
)
lines.extend(["", "## Unknowns"])
if unknowns:
for item in unknowns:
lines.append(f"- {item}")
else:
lines.append("- none recorded")
lines.extend(["", "## Notes"])
if notes:
for item in notes:
lines.append(f"- {item}")
else:
lines.append("- none")
return "\n".join(lines).strip() + "\n"
def _materialize_task_data_artifacts(
*,
repo_dir: Path,
run_dir: Path,
plan: dict[str, object],
task_data_map: dict[str, object],
) -> dict[str, object]:
data_root = run_dir / WORKFLOW_DATA_DIRNAME
data_root.mkdir(parents=True, exist_ok=True)
task_map_path = data_root / WORKFLOW_TASK_DATA_MAP_FILENAME
task_map_rel = _repo_relative_path(repo_dir, task_map_path)
tasks = plan.get("tasks")
if not isinstance(tasks, list):
tasks = []
plan["tasks"] = tasks
map_tasks = task_data_map.get("tasks")
map_by_id: dict[str, dict[str, object]] = {}
if isinstance(map_tasks, list):
for item in map_tasks:
if not isinstance(item, dict):
continue
task_id = str(item.get("id") or "").strip()
if task_id and task_id not in map_by_id:
map_by_id[task_id] = item
ordered_tasks: list[dict[str, object]] = []
for index, task in enumerate(tasks, start=1):
if not isinstance(task, dict):
continue
task_id = str(task.get("id") or f"task_{index:03d}").strip()
task_title = str(task.get("title") or task_id).strip()
task_entry = map_by_id.get(
task_id,
{
"id": task_id,
"files": [],
"unknowns": ["No task-data mapping available yet."],
"notes": [],
},
)
context_rel = f"{WORKFLOW_DATA_DIRNAME}/{task_id}.md"
context_path = run_dir / context_rel
context_markdown = _render_task_data_context_markdown(
task_id=task_id,
task_title=task_title,
task_entry=task_entry,
task_map_relpath=task_map_rel,
)
_write_text_file(context_path, context_markdown)
task_entry = dict(task_entry)
task_entry["id"] = task_id
task_entry["context_file"] = context_rel
ordered_tasks.append(task_entry)
task["data_context_file"] = context_rel
task_data_map["tasks"] = ordered_tasks
task_data_map["updated_at_utc"] = _utc_now_z()
_write_json_atomic(task_map_path, task_data_map)
return task_data_map
def _synchronize_task_data_artifacts_with_plan(
*,
repo_dir: Path,
run_dir: Path,
plan: dict[str, object],
manifest_payload: dict[str, object],
existing_task_map: dict[str, object] | None,
) -> dict[str, object]:
plan_tasks = plan.get("tasks")
if not isinstance(plan_tasks, list):
plan_tasks = []
plan["tasks"] = plan_tasks
if isinstance(existing_task_map, dict):
try:
normalized = _normalize_task_data_map(
existing_task_map,
plan_tasks=[task for task in plan_tasks if isinstance(task, dict)],
manifest_payload=manifest_payload,
source_stage=str(existing_task_map.get("source_stage") or "resync"),
)
except Exception:
normalized = _build_fallback_task_data_map(
plan_tasks=[task for task in plan_tasks if isinstance(task, dict)],
manifest_payload=manifest_payload,
reason="existing task_data_map invalid during plan sync",
)
else:
normalized = _build_fallback_task_data_map(
plan_tasks=[task for task in plan_tasks if isinstance(task, dict)],
manifest_payload=manifest_payload,
reason="task_data_map missing during plan sync",
)
return _materialize_task_data_artifacts(
repo_dir=repo_dir,
run_dir=run_dir,
plan=plan,
task_data_map=normalized,
)
def _scan_diff_preview(
before_scan: dict[str, object], after_scan: dict[str, object], *, max_items: int = 6
) -> list[str]:
before_files = before_scan.get("files")
after_files = after_scan.get("files")
before_map: dict[str, tuple[int, int]] = {}
after_map: dict[str, tuple[int, int]] = {}
if isinstance(before_files, list):
for item in before_files:
if not isinstance(item, dict):
continue
path = str(item.get("path") or "")
size = int(item.get("size") or 0)
mtime_ns = int(item.get("mtime_ns") or 0)
if path:
before_map[path] = (size, mtime_ns)
if isinstance(after_files, list):
for item in after_files:
if not isinstance(item, dict):
continue
path = str(item.get("path") or "")
size = int(item.get("size") or 0)
mtime_ns = int(item.get("mtime_ns") or 0)
if path:
after_map[path] = (size, mtime_ns)
added = sorted(path for path in after_map if path not in before_map)
removed = sorted(path for path in before_map if path not in after_map)
modified = sorted(
path
for path in after_map
if path in before_map and after_map[path] != before_map[path]
)
lines: list[str] = []
for path in added[:max_items]:
lines.append(f"+ {path}")
for path in removed[:max_items]:
lines.append(f"- {path}")
for path in modified[:max_items]:
lines.append(f"~ {path}")
if not lines:
lines.append("(no per-file diff within indexed scope)")
return lines
def _normalize_automation_plan(
raw_plan: object,
*,
source_description: str,
hpc_context: dict[str, object] | None = None,
) -> dict[str, object]:
cli = _cli()
if not isinstance(raw_plan, dict):
raise cli.PackageError("Reproduce plan must be a JSON object.")
raw_tasks = raw_plan.get("tasks")
if not isinstance(raw_tasks, list) or not raw_tasks:
raise cli.PackageError("Reproduce plan must include a non-empty `tasks` list.")
normalized_tasks: list[dict[str, object]] = []
used_ids: set[str] = set()
for index, raw_task in enumerate(raw_tasks, start=1):
if not isinstance(raw_task, dict):
raise cli.PackageError(f"Task {index} in reproduce plan is not an object.")
task_id = _sanitize_task_id(raw_task.get("id"), index, used_ids)
title = str(raw_task.get("title") or f"Task {index}").strip()
objective = str(raw_task.get("objective") or "").strip()
figure_targets = _normalize_string_list(raw_task.get("figure_targets"))
simulation_requirements = _normalize_string_list(
raw_task.get("simulation_requirements")
)
parameter_constraints = _normalize_string_list(
raw_task.get("parameter_constraints")
)
plot_requirements = _normalize_string_list(raw_task.get("plot_requirements"))
acceptance_checks = _normalize_string_list(raw_task.get("acceptance_checks"))
prompt_markdown = str(raw_task.get("prompt_markdown") or "").strip()
normalized_task: dict[str, object] = {
"id": task_id,
"title": title,
"figure_targets": figure_targets,
"objective": objective,
"simulation_requirements": simulation_requirements,
"parameter_constraints": parameter_constraints,
"plot_requirements": plot_requirements,
"acceptance_checks": acceptance_checks,
}
data_context_file = str(raw_task.get("data_context_file") or "").strip()
if data_context_file:
normalized_task["data_context_file"] = data_context_file
if not prompt_markdown:
prompt_markdown = _render_reproduce_task_prompt(
normalized_task, hpc_context=hpc_context
).strip()
normalized_task["prompt_markdown"] = prompt_markdown
normalized_tasks.append(normalized_task)
return {
"version": 1,
"paper_source": str(raw_plan.get("paper_source") or source_description).strip()
or source_description,
"assumptions": _normalize_string_list(raw_plan.get("assumptions")),
"tasks": normalized_tasks,
}
def _normalize_reproduce_plan(
raw_plan: object,
*,
source_description: str,
hpc_context: dict[str, object] | None = None,
) -> dict[str, object]:
return _normalize_automation_plan(
raw_plan,
source_description=source_description,
hpc_context=hpc_context,
)
def _normalize_research_plan(
raw_plan: object,
*,
source_description: str,
hpc_context: dict[str, object] | None = None,
) -> dict[str, object]:
return _normalize_automation_plan(
raw_plan,
source_description=source_description,
hpc_context=hpc_context,
)
def _run_reproduce_exec_turn(
*,
repo_dir: Path,
prompt: str,
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
data_context: dict[str, object] | None = None,
) -> dict[str, object]:
"""Execute one planning/reporting turn used by `reproduce` and `research`.
Dependency note:
- Shared by planner, auditor, and report-generation stages.
- Uses the same package-routing/overlay execution path as `exec` and `loop`.
"""
cli = _cli()
scipkg_root = cli.resolve_scipkg_root()
runtime_policy = cli.resolve_agent_runtime_policy()
provider = runtime_policy.provider
sandbox_policy = runtime_policy.sandbox_policy
sandbox_mode = runtime_policy.sandbox_mode
model = runtime_policy.model
reasoning_effort = runtime_policy.reasoning_effort
if isinstance(sandbox_override, str) and sandbox_override.strip():
sandbox_policy = "enforce"
sandbox_mode = sandbox_override.strip()
data_guard_before: dict[str, object] | None = None
data_guard_source: Path | None = None
data_guard_limits: dict[str, int] | None = None
if _is_data_context_enabled(data_context) and bool(
data_context.get("read_only", True)
):
source_path = str(data_context.get("source_path") or "").strip()
limits = data_context.get("limits")
if source_path and isinstance(limits, dict):
data_guard_source = Path(source_path)
data_guard_limits = {
"max_files": int(limits.get("max_files") or DEFAULT_DATA_MAX_FILES),
"max_total_bytes": int(
limits.get("max_total_bytes") or DEFAULT_DATA_MAX_TOTAL_BYTES
),
"max_file_bytes": int(
limits.get("max_file_bytes") or DEFAULT_DATA_MAX_FILE_BYTES
),
"hash_max_bytes": int(
limits.get("hash_max_bytes") or DEFAULT_DATA_HASH_MAX_BYTES
),
}
data_guard_before = _scan_data_dir_inventory(
data_dir=data_guard_source,
max_files=data_guard_limits["max_files"],
max_total_bytes=data_guard_limits["max_total_bytes"],
max_file_bytes=data_guard_limits["max_file_bytes"],
hash_max_bytes=data_guard_limits["hash_max_bytes"],
include_hash=False,
)
provider_bin = cli.resolve_provider_binary_override(
provider,
raw_override=provider_bin_override,
)
selection = cli._resolve_exec_package_selection(
user_prompt=prompt,
scipkg_root=scipkg_root,
repo_dir=repo_dir,
requested_package_id=requested_package_id,
provider=provider,
provider_bin=provider_bin,
sandbox_policy=sandbox_policy,
model=model,
reasoning_effort=reasoning_effort,
)
package_id = selection.get("package_id")
if not isinstance(package_id, str) or not package_id:
raise cli.PackageError("No package selected for reproduce execution.")
source = str(selection.get("source") or "default")
note = str(selection.get("note") or "").strip()
cli._print_tagged("package", f"Using {package_id} (selection: {source})")
if note and note not in {"manual_pin", "default_fallback", "matched"}:
cli._print_tagged("router", note)
sandbox_text = (
f"enforce({sandbox_mode})" if sandbox_policy == "enforce" else "bypass"
)
cli._print_tagged("agent", f"provider: {provider}, sandbox: {sandbox_text}")
overlay = cli._overlay_exec_package(
repo_dir=repo_dir,
scipkg_root=scipkg_root,
package_id=package_id,
)
linked = int(overlay.get("linked_count", 0)) if isinstance(overlay, dict) else 0
collisions = (
int(overlay.get("collision_count", 0)) if isinstance(overlay, dict) else 0
)
linked_deps = (
int(overlay.get("linked_dependency_count", 0))
if isinstance(overlay, dict)
else 0
)
cli._print_tagged(
"overlay",
(
"linked entries: "
f"{linked}, linked dependencies: {linked_deps}, collisions: {collisions}"
),
)
try:
run_result = cli._run_exec_chat_turn(
repo_dir=repo_dir,
prompt=prompt,
sandbox=sandbox_mode if sandbox_policy == "enforce" else None,
provider_bin_override=provider_bin,
provider=provider,
sandbox_policy=sandbox_policy,
model=model,
reasoning_effort=reasoning_effort,
)
finally:
cli._cleanup_exec_overlay_symlinks(repo_dir=repo_dir, workspace_root=repo_dir)
if (
data_guard_before is not None
and data_guard_source is not None
and isinstance(data_guard_limits, dict)
):
data_guard_after = _scan_data_dir_inventory(
data_dir=data_guard_source,
max_files=int(data_guard_limits["max_files"]),
max_total_bytes=int(data_guard_limits["max_total_bytes"]),
max_file_bytes=int(data_guard_limits["max_file_bytes"]),
hash_max_bytes=int(data_guard_limits["hash_max_bytes"]),
include_hash=False,
)
before_fingerprint = str(data_guard_before.get("fingerprint") or "")
after_fingerprint = str(data_guard_after.get("fingerprint") or "")
if before_fingerprint != after_fingerprint:
diff_lines = _scan_diff_preview(data_guard_before, data_guard_after)
raise cli.PackageError(
"Read-only data guard violation: --data-dir was modified during "
"planning/auditing/reporting turn.\n"
f"data_dir: {data_guard_source}\n"
"indexed diff preview:\n" + "\n".join(diff_lines)
)
return run_result
def _generate_mode_plan(
*,
repo_dir: Path,
run_dir: Path | None,
source_text: str,
source_description: str,
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
planner_max_tries: int,
auditor_max_tries: int,
planner_prompt_prefix: str,
auditor_prompt_prefix: str,
plan_tag: str,
extract_payload,
normalize_plan,
log_tag: str,
data_context: dict[str, object] | None = None,
hpc_context: dict[str, object] | None = None,
workflow_status_hook: WorkflowStatusHook | None = None,
) -> dict[str, object]:
"""Generate + audit a workflow plan used by both `reproduce` and `research`."""
cli = _cli()
planner_prompt_parts = [
f"{planner_prompt_prefix}\n\n"
f"{WORKFLOW_UNIFIED_MEMORY_STAGE_INSTRUCTIONS}\n"
"\n"
f"Paper source: {source_description}\n\n"
"Paper content / request:\n"
f"{source_text.strip()}\n"
]
planner_prompt_parts.append(
"\nExecution target constraints:\n"
+ "\n".join(_build_hpc_prompt_lines(hpc_context))
+ "\n"
)
planner_prompt = "".join(planner_prompt_parts)
planner_plan: dict[str, object] | None = None
_emit_workflow_status(workflow_status_hook, f"{log_tag} plan")
for attempt in range(1, planner_max_tries + 1):
cli._print_tagged(log_tag, f"planner attempt {attempt}/{planner_max_tries}")
run_result = _run_reproduce_exec_turn(
repo_dir=repo_dir,
prompt=planner_prompt,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
data_context=data_context,
)
return_code = int(run_result.get("return_code") or 0)
if return_code != 0:
raise cli.PackageError(
f"{log_tag.title()} planner agent run failed with exit code {return_code}."
)
assistant_text = str(run_result.get("assistant_text") or "")
raw_payload = extract_payload(assistant_text)
if raw_payload is None:
cli._print_tagged(
log_tag,
f"planner response missing <{plan_tag}> JSON block.",
stderr=True,
)
continue
try:
planner_plan = normalize_plan(
raw_payload,
source_description=source_description,
hpc_context=hpc_context,
)
except cli.PackageError as exc:
cli._print_tagged(log_tag, f"planner response invalid: {exc}", stderr=True)
continue
break
if planner_plan is None:
raise cli.PackageError(
f"Unable to generate a valid {log_tag} plan from planner response."
)
draft_task_data_map: dict[str, object] | None = None
if _is_data_context_enabled(data_context):
if run_dir is None:
raise cli.PackageError("Internal error: run_dir required for data mapping.")
artifacts = data_context.get("artifacts")
if not isinstance(artifacts, dict):
raise cli.PackageError("Data context artifacts are missing.")
manifest_rel = str(
artifacts.get("manifest_compact") or artifacts.get("manifest") or ""
).strip()
manifest_full_rel = str(artifacts.get("manifest_full") or "").strip()
summary_rel = str(artifacts.get("summary") or "").strip()
if not manifest_rel or not summary_rel:
raise cli.PackageError("Data context artifacts are incomplete.")
manifest_path = repo_dir / manifest_rel
manifest_full_path = repo_dir / manifest_full_rel if manifest_full_rel else None
summary_path = repo_dir / summary_rel
compact_manifest_payload = _load_json_if_exists(manifest_path)
if not isinstance(compact_manifest_payload, dict):
raise cli.PackageError(f"Missing data manifest: {manifest_path}")
full_manifest_payload = (
_load_json_if_exists(manifest_full_path)
if isinstance(manifest_full_path, Path)
else None
)
try:
summary_text = summary_path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
raise cli.PackageError(
f"Failed to read data summary file: {summary_path}: {exc}"
) from exc
draft_task_data_map = _generate_task_data_map(
repo_dir=repo_dir,
run_dir=run_dir,
source_description=source_description,
planner_plan=planner_plan,
compact_manifest_payload=compact_manifest_payload,
full_manifest_payload=full_manifest_payload,
summary_text=summary_text,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
max_tries=auditor_max_tries,
log_tag=log_tag,
data_context=data_context,
)
draft_task_data_map = _materialize_task_data_artifacts(
repo_dir=repo_dir,
run_dir=run_dir,
plan=planner_plan,
task_data_map=draft_task_data_map,
)
audited_plan: dict[str, object] | None = None
auditor_prompt_parts = [
f"{auditor_prompt_prefix}\n\n"
f"{WORKFLOW_UNIFIED_MEMORY_STAGE_INSTRUCTIONS}\n"
"\n"
f"Paper source: {source_description}\n\n"
"Original paper content / request:\n"
f"{source_text.strip()}\n\n"
"Candidate plan JSON:\n"
f"{json.dumps(planner_plan, indent=2)}\n"
]
if _is_data_context_enabled(data_context):
artifacts = data_context.get("artifacts")
if isinstance(artifacts, dict):
summary_rel = str(artifacts.get("summary") or "").strip()
manifest_rel = str(
artifacts.get("manifest_compact") or artifacts.get("manifest") or ""
).strip()
manifest_full_rel = str(artifacts.get("manifest_full") or "").strip()
task_map_rel = str(artifacts.get("task_map") or "").strip()
if summary_rel:
auditor_prompt_parts.append(f"\nData summary file: {summary_rel}\n")
if manifest_rel:
auditor_prompt_parts.append(
f"Compact data manifest file: {manifest_rel}\n"
)
if manifest_full_rel:
auditor_prompt_parts.append(
f"Full data manifest file: {manifest_full_rel}\n"
)
if task_map_rel:
auditor_prompt_parts.append(
f"Draft task-data map file: {task_map_rel}\n"
)
if isinstance(draft_task_data_map, dict):
auditor_prompt_parts.append(
"\nDraft task-data map JSON:\n"
f"{json.dumps(draft_task_data_map, indent=2)}\n"
)
auditor_prompt_parts.append(
"\nData-scope rule for final prompts:\n"
"- For each task, include direction to use only mapped files by default.\n"
"- Require map updates when strong evidence suggests expansion.\n"
)
auditor_prompt_parts.append(
"\nExecution target constraints:\n"
+ "\n".join(_build_hpc_prompt_lines(hpc_context))
+ "\n"
)
auditor_prompt = "".join(auditor_prompt_parts)
_emit_workflow_status(workflow_status_hook, f"{log_tag} audit")
for attempt in range(1, auditor_max_tries + 1):
cli._print_tagged(log_tag, f"auditor attempt {attempt}/{auditor_max_tries}")
run_result = _run_reproduce_exec_turn(
repo_dir=repo_dir,
prompt=auditor_prompt,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
data_context=data_context,
)
return_code = int(run_result.get("return_code") or 0)
if return_code != 0:
raise cli.PackageError(
f"{log_tag.title()} auditor agent run failed with exit code {return_code}."
)
assistant_text = str(run_result.get("assistant_text") or "")
raw_payload = extract_payload(assistant_text)
if raw_payload is None:
cli._print_tagged(
log_tag,
f"auditor response missing <{plan_tag}> JSON block.",
stderr=True,
)
continue
try:
audited_plan = normalize_plan(
raw_payload,
source_description=source_description,
hpc_context=hpc_context,
)
except cli.PackageError as exc:
cli._print_tagged(log_tag, f"auditor response invalid: {exc}", stderr=True)
continue
break
if audited_plan is None:
raise cli.PackageError(
f"Unable to generate a valid {log_tag} plan from auditor response."
)
if _is_data_context_enabled(data_context):
if run_dir is None:
raise cli.PackageError("Internal error: run_dir required for data mapping.")
artifacts = data_context.get("artifacts")
if not isinstance(artifacts, dict):
raise cli.PackageError("Data context artifacts are missing.")
manifest_rel = str(
artifacts.get("manifest_compact") or artifacts.get("manifest") or ""
).strip()
if not manifest_rel:
raise cli.PackageError("Data context manifest artifact is missing.")
manifest_payload = _load_json_if_exists(repo_dir / manifest_rel)
if not isinstance(manifest_payload, dict):
raise cli.PackageError(
f"Missing data manifest for task mapping: {repo_dir / manifest_rel}"
)
_synchronize_task_data_artifacts_with_plan(
repo_dir=repo_dir,
run_dir=run_dir,
plan=audited_plan,
manifest_payload=manifest_payload,
existing_task_map=draft_task_data_map,
)
return audited_plan
def _generate_reproduce_plan(
*,
repo_dir: Path,
source_text: str,
source_description: str,
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
planner_max_tries: int,
auditor_max_tries: int,
run_dir: Path | None = None,
data_context: dict[str, object] | None = None,
hpc_context: dict[str, object] | None = None,
workflow_status_hook: WorkflowStatusHook | None = None,
) -> dict[str, object]:
return _generate_mode_plan(
repo_dir=repo_dir,
run_dir=run_dir,
source_text=source_text,
source_description=source_description,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
planner_max_tries=planner_max_tries,
auditor_max_tries=auditor_max_tries,
planner_prompt_prefix=REPRODUCE_PLANNER_PROMPT_PREFIX,
auditor_prompt_prefix=REPRODUCE_AUDITOR_PROMPT_PREFIX,
plan_tag=REPRODUCE_PLAN_TAG,
extract_payload=_extract_reproduce_plan_payload,
normalize_plan=_normalize_reproduce_plan,
log_tag="reproduce",
data_context=data_context,
hpc_context=hpc_context,
workflow_status_hook=workflow_status_hook,
)
def _generate_research_plan(
*,
repo_dir: Path,
source_text: str,
source_description: str,
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
planner_max_tries: int,
auditor_max_tries: int,
run_dir: Path | None = None,
data_context: dict[str, object] | None = None,
hpc_context: dict[str, object] | None = None,
workflow_status_hook: WorkflowStatusHook | None = None,
) -> dict[str, object]:
return _generate_mode_plan(
repo_dir=repo_dir,
run_dir=run_dir,
source_text=source_text,
source_description=source_description,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
planner_max_tries=planner_max_tries,
auditor_max_tries=auditor_max_tries,
planner_prompt_prefix=RESEARCH_PLANNER_PROMPT_PREFIX,
auditor_prompt_prefix=RESEARCH_AUDITOR_PROMPT_PREFIX,
plan_tag=RESEARCH_PLAN_TAG,
extract_payload=_extract_research_plan_payload,
normalize_plan=_normalize_research_plan,
log_tag="research",
data_context=data_context,
hpc_context=hpc_context,
workflow_status_hook=workflow_status_hook,
)
def _archive_loop_memory(
*, repo_dir: Path, archive_dir: Path, task_id: str, run_count: int
) -> None:
cli = _cli()
memory_path = repo_dir / LOOP_MEMORY_DIRNAME / LOOP_MEMORY_FILENAME
if not memory_path.is_file():
return
archive_dir.mkdir(parents=True, exist_ok=True)
archive_path = archive_dir / f"memory_{task_id}_run_{run_count:02d}.md"
try:
shutil.copy2(memory_path, archive_path)
except OSError as exc:
raise cli.PackageError(
f"Failed to archive loop memory to {archive_path}: {exc}"
) from exc
UNIFIED_MEMORY_SHORT_TERM_HEADING = "## Short-Term Memory (Operational)"
UNIFIED_MEMORY_LONG_TERM_HEADING = "## Long-Term Memory (Persistent)"
UNIFIED_MEMORY_PLAN_HEADING = "### Plan"
UNIFIED_MEMORY_PROGRESS_HEADING = "### Progress log"
UNIFIED_MEMORY_FILE_MAP_HEADING = "### File map"
UNIFIED_MEMORY_SIM_HISTORY_HEADING = "### Simulation history"
UNIFIED_MEMORY_KEY_RESULTS_HEADING = "### Key results"
UNIFIED_MEMORY_PARAM_SOURCE_HEADING = "### Parameter source mapping"
UNIFIED_MEMORY_SIM_UNCERTAINTY_HEADING = "### Simulation uncertainty"
UNIFIED_MEMORY_SKILLS_UPDATES_HEADING = "### Suggested skills updates"
UNIFIED_MEMORY_SHORT_TERM_BLOCK = (
f"{UNIFIED_MEMORY_SHORT_TERM_HEADING}\n"
"\n"
f"{UNIFIED_MEMORY_PLAN_HEADING}\n"
"- [ ] (fill in a small checklist plan)\n"
"\n"
f"{UNIFIED_MEMORY_PROGRESS_HEADING}\n"
"- initialized\n"
)
UNIFIED_MEMORY_LONG_TERM_BLOCK = (
f"{UNIFIED_MEMORY_LONG_TERM_HEADING}\n"
"\n"
f"{UNIFIED_MEMORY_FILE_MAP_HEADING}\n"
"- (path | purpose | notes)\n"
"\n"
f"{UNIFIED_MEMORY_SIM_HISTORY_HEADING}\n"
"- (run_id | objective | status | artifacts | notes)\n"
"\n"
f"{UNIFIED_MEMORY_KEY_RESULTS_HEADING}\n"
"- (result_id | metric | value | conditions | evidence_path)\n"
"\n"
f"{UNIFIED_MEMORY_PARAM_SOURCE_HEADING}\n"
"- (run_id | parameter_or_setting | value | source | evidence_path | notes)\n"
"\n"
f"{UNIFIED_MEMORY_SIM_UNCERTAINTY_HEADING}\n"
"- (run_id | uncertainty_or_assumption | impact | mitigation_or_next_step | status)\n"
"\n"
f"{UNIFIED_MEMORY_SKILLS_UPDATES_HEADING}\n"
"- (<package_id> | issue_pattern | proposed_skill_update | evidence | status)\n"
)
def _memory_heading_exists(content: str, heading: str) -> bool:
return bool(re.search(rf"(?m)^\s*{re.escape(heading)}\s*$", content))
def _append_memory_block(content: str, block: str) -> str:
normalized = content.rstrip()
if normalized:
normalized += "\n\n"
return normalized + block.rstrip() + "\n"
def _iter_unified_memory_sections(
content: str,
*,
heading: str,
) -> list[tuple[int, int, str]]:
heading_re = re.compile(rf"(?m)^\s*{re.escape(heading)}\s*$")
next_same_level_re = re.compile(r"(?m)^##\s+.+$")
sections: list[tuple[int, int, str]] = []
for match in heading_re.finditer(content):
next_match = next_same_level_re.search(content, match.end())
end = next_match.start() if next_match is not None else len(content)
sections.append((match.start(), end, content[match.start() : end]))
return sections
UNIFIED_MEMORY_LONG_TERM_PLACEHOLDERS = {
"(path | purpose | notes)",
"(run_id | objective | status | artifacts | notes)",
"(result_id | metric | value | conditions | evidence_path)",
"(run_id | parameter_or_setting | value | source | evidence_path | notes)",
"(run_id | uncertainty_or_assumption | impact | mitigation_or_next_step | status)",
"(<package_id> | issue_pattern | proposed_skill_update | evidence | status)",
}
def _score_long_term_block(block: str) -> tuple[int, int, int]:
informative_bullets = 0
nonempty_lines = 0
for raw_line in block.splitlines()[1:]:
stripped = raw_line.strip()
if not stripped:
continue
nonempty_lines += 1
if not stripped.startswith("- "):
continue
item = stripped[2:].strip()
lowered = item.lower()
if not item:
continue
if lowered in UNIFIED_MEMORY_LONG_TERM_PLACEHOLDERS:
continue
informative_bullets += 1
return informative_bullets, nonempty_lines, len(block.strip())
def _canonicalize_unified_memory_sections(content: str) -> str:
short_sections = _iter_unified_memory_sections(
content, heading=UNIFIED_MEMORY_SHORT_TERM_HEADING
)
long_sections = _iter_unified_memory_sections(
content, heading=UNIFIED_MEMORY_LONG_TERM_HEADING
)
if len(short_sections) <= 1 and len(long_sections) <= 1:
return content
chosen_short = (
short_sections[-1][2].rstrip()
if short_sections
else UNIFIED_MEMORY_SHORT_TERM_BLOCK.rstrip()
)
if long_sections:
scored_candidates = [
(_score_long_term_block(section_text), idx, section_text.rstrip())
for idx, (_, _, section_text) in enumerate(long_sections)
]
scored_candidates.sort(key=lambda item: (item[0], item[1]))
chosen_long = scored_candidates[-1][2]
else:
chosen_long = UNIFIED_MEMORY_LONG_TERM_BLOCK.rstrip()
spans = sorted((start, end) for start, end, _ in [*short_sections, *long_sections])
merged_spans: list[list[int]] = []
for start, end in spans:
if not merged_spans or start > merged_spans[-1][1]:
merged_spans.append([start, end])
continue
merged_spans[-1][1] = max(merged_spans[-1][1], end)
insert_at = merged_spans[0][0]
prefix = content[:insert_at].rstrip()
cursor = insert_at
suffix_parts: list[str] = []
for start, end in merged_spans:
if start > cursor:
suffix_parts.append(content[cursor:start])
cursor = max(cursor, end)
suffix_parts.append(content[cursor:])
suffix = "".join(suffix_parts).strip()
canonical_memory = f"{chosen_short}\n\n{chosen_long}"
parts: list[str] = []
if prefix:
parts.append(prefix)
parts.append(canonical_memory)
if suffix:
parts.append(suffix)
return "\n\n".join(parts).rstrip() + "\n"
def _normalize_workflow_context_lines(
workflow_context_lines: list[str] | None,
) -> list[str]:
if not isinstance(workflow_context_lines, list):
return []
return [
str(line).rstrip()
for line in workflow_context_lines
if isinstance(line, str) and line.strip()
]
def _upsert_workflow_context_block(
content: str, workflow_context_lines: list[str] | None
) -> str:
normalized_context = _normalize_workflow_context_lines(workflow_context_lines)
if not normalized_context:
return content
without_context = re.sub(
r"(?ms)^## Workflow context\s*$\n.*?(?=^##\s|\Z)",
"",
content,
)
block = "## Workflow context\n" + "\n".join(normalized_context) + "\n"
short_match = re.search(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_SHORT_TERM_HEADING)}\s*$",
without_context,
)
if short_match is None:
return _append_memory_block(without_context, block)
prefix = without_context[: short_match.start()].rstrip()
suffix = without_context[short_match.start() :].lstrip()
parts: list[str] = []
if prefix:
parts.append(prefix)
parts.append(block.rstrip())
if suffix:
parts.append(suffix)
return "\n\n".join(parts).rstrip() + "\n"
def _upgrade_loop_memory_schema(memory_path: Path) -> None:
cli = _cli()
try:
content = memory_path.read_text(encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to read loop memory file for schema upgrade: {memory_path}: {exc}"
) from exc
upgraded = _canonicalize_unified_memory_sections(content)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SHORT_TERM_HEADING):
# Backward compatibility: migrate legacy top-level sections when present.
upgraded = re.sub(
r"(?m)^##\s+Plan\s*$",
f"{UNIFIED_MEMORY_SHORT_TERM_HEADING}\n\n{UNIFIED_MEMORY_PLAN_HEADING}",
upgraded,
count=1,
)
upgraded = re.sub(
r"(?m)^##\s+Progress\s+log\s*$",
UNIFIED_MEMORY_PROGRESS_HEADING,
upgraded,
count=1,
)
if _memory_heading_exists(
upgraded, UNIFIED_MEMORY_PLAN_HEADING
) and not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SHORT_TERM_HEADING):
upgraded = re.sub(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_PLAN_HEADING)}\s*$",
f"{UNIFIED_MEMORY_SHORT_TERM_HEADING}\n\n{UNIFIED_MEMORY_PLAN_HEADING}",
upgraded,
count=1,
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SHORT_TERM_HEADING):
upgraded = _append_memory_block(upgraded, UNIFIED_MEMORY_SHORT_TERM_BLOCK)
else:
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_PLAN_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_PLAN_HEADING}\n- [ ] (fill in a small checklist plan)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_PROGRESS_HEADING):
upgraded = _append_memory_block(
upgraded, f"{UNIFIED_MEMORY_PROGRESS_HEADING}\n- initialized\n"
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_LONG_TERM_HEADING):
upgraded = _append_memory_block(upgraded, UNIFIED_MEMORY_LONG_TERM_BLOCK)
else:
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_FILE_MAP_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_FILE_MAP_HEADING}\n- (path | purpose | notes)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SIM_HISTORY_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_SIM_HISTORY_HEADING}\n"
"- (run_id | objective | status | artifacts | notes)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_KEY_RESULTS_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_KEY_RESULTS_HEADING}\n"
"- (result_id | metric | value | conditions | evidence_path)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_PARAM_SOURCE_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_PARAM_SOURCE_HEADING}\n"
"- (run_id | parameter_or_setting | value | source | evidence_path | notes)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SIM_UNCERTAINTY_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_SIM_UNCERTAINTY_HEADING}\n"
"- (run_id | uncertainty_or_assumption | impact | mitigation_or_next_step | status)\n",
)
if not _memory_heading_exists(upgraded, UNIFIED_MEMORY_SKILLS_UPDATES_HEADING):
upgraded = _append_memory_block(
upgraded,
f"{UNIFIED_MEMORY_SKILLS_UPDATES_HEADING}\n"
"- (<package_id> | issue_pattern | proposed_skill_update | evidence | status)\n",
)
if upgraded == content:
return
try:
memory_path.write_text(upgraded, encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to update loop memory schema at {memory_path}: {exc}"
) from exc
def _ensure_loop_memory(
*,
repo_dir: Path,
user_prompt: str,
prompt_file: str | None,
overwrite: bool = False,
workflow_context_lines: list[str] | None = None,
) -> Path:
cli = _cli()
projects_dir = repo_dir / LOOP_MEMORY_DIRNAME
if projects_dir.exists() and projects_dir.is_symlink():
raise cli.PackageError(
f"{projects_dir} is a symlink. Remove it and create a real directory "
"so fermilink loop can persist long-term memory safely."
)
if projects_dir.exists() and not projects_dir.is_dir():
raise cli.PackageError(f"{projects_dir} exists but is not a directory.")
projects_dir.mkdir(parents=True, exist_ok=True)
memory_path = projects_dir / LOOP_MEMORY_FILENAME
if memory_path.exists():
if memory_path.is_dir():
raise cli.PackageError(f"{memory_path} exists but is a directory.")
if not overwrite:
_upgrade_loop_memory_schema(memory_path)
try:
content = memory_path.read_text(encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to read memory file for workflow context update: {memory_path}: {exc}"
) from exc
updated = _upsert_workflow_context_block(content, workflow_context_lines)
if updated != content:
try:
memory_path.write_text(updated, encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to update workflow context in memory file: {memory_path}: {exc}"
) from exc
return memory_path
started_at = _utc_now_z()
source_line = f"- prompt_source: {prompt_file}\n" if prompt_file else ""
context_block = ""
normalized_context = _normalize_workflow_context_lines(workflow_context_lines)
if normalized_context:
context_block = (
"\n" "## Workflow context\n" + "\n".join(normalized_context) + "\n"
)
initial = (
"# FermiLink Unified Memory\n"
"\n"
"- schema_version: 1\n"
f"- started_at_utc: {started_at}\n"
f"- last_updated_utc: {started_at}\n"
f"{source_line}"
"\n"
"## Original request\n"
f"{user_prompt.strip()}\n"
f"{context_block}"
"\n"
f"{UNIFIED_MEMORY_SHORT_TERM_BLOCK}\n"
f"{UNIFIED_MEMORY_LONG_TERM_BLOCK}"
)
try:
memory_path.write_text(initial, encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to create loop memory file: {memory_path}: {exc}"
) from exc
return memory_path
def _reset_loop_short_term_memory(
*,
repo_dir: Path,
user_prompt: str,
prompt_file: str | None,
workflow_context_lines: list[str] | None = None,
) -> Path:
"""Reset only short-term memory while preserving long-term sections."""
cli = _cli()
memory_path = _ensure_loop_memory(
repo_dir=repo_dir,
user_prompt=user_prompt,
prompt_file=prompt_file,
overwrite=False,
workflow_context_lines=workflow_context_lines,
)
try:
content = memory_path.read_text(encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to read loop memory file for short-term reset: {memory_path}: {exc}"
) from exc
short_match = re.search(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_SHORT_TERM_HEADING)}\s*$", content
)
long_match = re.search(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_LONG_TERM_HEADING)}\s*$", content
)
if (
short_match is None
or long_match is None
or long_match.start() <= short_match.start()
):
_upgrade_loop_memory_schema(memory_path)
try:
content = memory_path.read_text(encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
"Failed to reload loop memory file after schema upgrade: "
f"{memory_path}: {exc}"
) from exc
short_match = re.search(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_SHORT_TERM_HEADING)}\s*$", content
)
long_match = re.search(
rf"(?m)^\s*{re.escape(UNIFIED_MEMORY_LONG_TERM_HEADING)}\s*$", content
)
if (
short_match is None
or long_match is None
or long_match.start() <= short_match.start()
):
raise cli.PackageError(
"Loop memory file is missing required short-term/long-term headings "
f"after schema upgrade: {memory_path}"
)
prefix = content[: short_match.start()].rstrip()
suffix = content[long_match.start() :].lstrip()
parts: list[str] = []
if prefix:
parts.append(prefix)
parts.append(UNIFIED_MEMORY_SHORT_TERM_BLOCK.rstrip())
if suffix:
parts.append(suffix)
updated = "\n\n".join(parts).rstrip() + "\n"
if updated == content:
return memory_path
try:
memory_path.write_text(updated, encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to rewrite loop short-term memory at {memory_path}: {exc}"
) from exc
return memory_path
def _truncate_handoff_line(text: str, *, max_chars: int = 180) -> str:
cleaned = str(text).strip()
if len(cleaned) <= max_chars:
return cleaned
overflow = len(cleaned) - max_chars
return f"{cleaned[:max_chars]}... ({overflow} more chars)"
def _summarize_archived_memory(path: Path, *, max_items: int = 3) -> list[str]:
try:
content = path.read_text(encoding="utf-8", errors="replace")
except OSError:
return []
candidates: list[str] = []
for raw_line in content.splitlines():
line = raw_line.strip()
if not line:
continue
if line.startswith("#"):
continue
if line.startswith("- "):
item = line[2:].strip()
else:
item = line
lowered = item.lower()
if lowered in {
"initialized",
"(fill in a small checklist plan)",
"(path | purpose | notes)",
"(run_id | objective | status | artifacts | notes)",
"(result_id | metric | value | conditions | evidence_path)",
"(run_id | parameter_or_setting | value | source | evidence_path | notes)",
"(run_id | uncertainty_or_assumption | impact | mitigation_or_next_step | status)",
"(issue_pattern | proposed_skill_update | evidence | status)",
}:
continue
if lowered.startswith("started_at_utc:") or lowered.startswith(
"prompt_source:"
):
continue
if lowered.startswith("[ ]"):
continue
candidates.append(_truncate_handoff_line(item))
if not candidates:
return []
return candidates[-max_items:]
def _extract_loop_wait_seconds(assistant_text: str) -> float | None:
if not isinstance(assistant_text, str) or not assistant_text.strip():
return None
matches = LOOP_WAIT_TOKEN_RE.findall(assistant_text)
if not matches:
return None
raw = matches[-1]
try:
value = float(raw)
except (TypeError, ValueError):
return None
if value < 0 or not math.isfinite(value):
return None
return value
def _extract_loop_pid_numbers(assistant_text: str) -> list[int]:
if not isinstance(assistant_text, str) or not assistant_text.strip():
return []
matches = LOOP_PID_TOKEN_RE.findall(assistant_text)
if not matches:
return []
seen: set[int] = set()
pids: list[int] = []
for raw in matches:
try:
pid = int(raw)
except (TypeError, ValueError):
continue
if pid <= 0 or pid in seen:
continue
seen.add(pid)
pids.append(pid)
return pids
def _extract_loop_slurm_job_numbers(assistant_text: str) -> list[str]:
if not isinstance(assistant_text, str) or not assistant_text.strip():
return []
matches = LOOP_SLURM_JOB_TOKEN_RE.findall(assistant_text)
if not matches:
return []
seen: set[str] = set()
job_ids: list[str] = []
for raw in matches:
job_id = str(raw).strip()
if not job_id or job_id in seen:
continue
seen.add(job_id)
job_ids.append(job_id)
return job_ids
def _materialize_mode_plan(
*,
run_dir: Path,
plan: dict[str, object],
state: dict[str, object],
workflow_name: str,
) -> None:
cli = _cli()
tasks = plan.get("tasks")
if not isinstance(tasks, list) or not tasks:
raise cli.PackageError(
f"{workflow_name.title()} planner produced no executable tasks."
)
prompts_dir = run_dir / REPRODUCE_PROMPTS_DIRNAME
prompts_dir.mkdir(parents=True, exist_ok=True)
plan_tasks: list[dict[str, object]] = []
state_tasks: list[dict[str, object]] = []
task_runs: dict[str, int] = {}
for index, task_obj in enumerate(tasks, start=1):
if not isinstance(task_obj, dict):
raise cli.PackageError(f"Task {index} in {workflow_name} plan is invalid.")
task_id = (
str(task_obj.get("id") or f"task_{index:03d}").strip()
or f"task_{index:03d}"
)
prompt_markdown = str(task_obj.get("prompt_markdown") or "").strip()
if not prompt_markdown:
raise cli.PackageError(f"Task {task_id} has empty `prompt_markdown`.")
prompt_rel = f"{REPRODUCE_PROMPTS_DIRNAME}/{task_id}.md"
prompt_path = run_dir / prompt_rel
try:
prompt_path.write_text(prompt_markdown.strip() + "\n", encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to write task prompt file: {prompt_path}: {exc}"
) from exc
plan_task = dict(task_obj)
plan_task["prompt_file"] = prompt_rel
plan_tasks.append(plan_task)
state_task: dict[str, object] = {
"id": task_id,
"title": str(task_obj.get("title") or task_id),
"prompt_file": prompt_rel,
}
data_context_file = str(task_obj.get("data_context_file") or "").strip()
if data_context_file:
state_task["data_context_file"] = data_context_file
state_tasks.append(state_task)
task_runs[task_id] = 0
plan["tasks"] = plan_tasks
_write_json_atomic(run_dir / REPRODUCE_PLAN_FILENAME, plan)
state["tasks"] = state_tasks
state["task_runs"] = task_runs
state["current_task_index"] = 0
state["last_error"] = ""
def _maybe_sync_mode_plan_from_disk(
*,
repo_dir: Path,
run_dir: Path,
state: dict[str, object],
source_description: str,
workflow_name: str,
data_context: dict[str, object] | None = None,
hpc_context: dict[str, object] | None = None,
) -> bool:
cli = _cli()
state_status = str(state.get("status") or "")
current_index_raw = state.get("current_task_index", 0)
try:
current_index = int(current_index_raw)
except (TypeError, ValueError):
current_index = 0
if state_status != "plan_ready" or current_index != 0:
return False
plan_path = run_dir / REPRODUCE_PLAN_FILENAME
if not plan_path.is_file():
return False
try:
raw_plan = json.loads(plan_path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError) as exc:
raise cli.PackageError(
f"Failed to read {workflow_name} plan file: {plan_path}: {exc}"
) from exc
if not isinstance(raw_plan, dict):
raise cli.PackageError(
f"{workflow_name.title()} plan file is not a JSON object: {plan_path}"
)
normalized_plan = _normalize_automation_plan(
raw_plan,
source_description=source_description,
hpc_context=hpc_context,
)
if _is_data_context_enabled(data_context):
artifacts = (
data_context.get("artifacts") if isinstance(data_context, dict) else {}
)
if not isinstance(artifacts, dict):
raise cli.PackageError("Data context artifacts missing during plan sync.")
manifest_rel = str(
artifacts.get("manifest_compact") or artifacts.get("manifest") or ""
).strip()
task_map_rel = str(artifacts.get("task_map") or "").strip()
if not manifest_rel:
raise cli.PackageError(
"Data context manifest path missing during plan sync."
)
manifest_payload = _load_json_if_exists(repo_dir / manifest_rel)
if not isinstance(manifest_payload, dict):
raise cli.PackageError(
f"Failed to load data manifest during plan sync: {repo_dir / manifest_rel}"
)
existing_task_map = (
_load_json_if_exists(repo_dir / task_map_rel) if task_map_rel else None
)
normalized_plan_map = _synchronize_task_data_artifacts_with_plan(
repo_dir=repo_dir,
run_dir=run_dir,
plan=normalized_plan,
manifest_payload=manifest_payload,
existing_task_map=existing_task_map,
)
if isinstance(data_context, dict):
data_context["task_map_updated_at_utc"] = str(
normalized_plan_map.get("updated_at_utc") or _utc_now_z()
)
_materialize_mode_plan(
run_dir=run_dir,
plan=normalized_plan,
state=state,
workflow_name=workflow_name,
)
state["status"] = "plan_ready"
state["updated_at_utc"] = _utc_now_z()
_write_json_atomic(run_dir / REPRODUCE_STATE_FILENAME, state)
return True
def _capture_file_signature(path: Path) -> tuple[int, int, str] | None:
if not path.is_file():
return None
try:
file_stat = path.stat()
payload = path.read_bytes()
except OSError:
return None
return (
int(len(payload)),
int(file_stat.st_mtime_ns),
hashlib.sha256(payload).hexdigest(),
)
def _capture_signatures(
paths: list[Path],
) -> dict[str, tuple[int, int, str] | None]:
return {str(path): _capture_file_signature(path) for path in paths}
def _set_file_executable(path: Path) -> None:
cli = _cli()
try:
mode = path.stat().st_mode
path.chmod(mode | 0o111)
except OSError as exc:
raise cli.PackageError(
f"Failed to mark script executable: {path}: {exc}"
) from exc
def _render_workflow_stage_driver_script(
*,
workflow_name: str,
run_id: str,
stage_label: str,
task_script_relpaths: list[tuple[str, str]],
upstream_job_map_filename: str | None = None,
emitted_job_map_filename: str | None = None,
) -> str:
stage_slug = stage_label.lower().replace(" ", "_")
upstream_job_map_rel = (
str(upstream_job_map_filename or "").strip()
if upstream_job_map_filename is not None
else ""
)
emitted_job_map_rel = (
str(emitted_job_map_filename or "").strip()
if emitted_job_map_filename is not None
else ""
)
entries = [
f" {shlex.quote(task_id + '|' + rel_path)}"
for task_id, rel_path in task_script_relpaths
]
total_count = len(task_script_relpaths)
lines: list[str] = [
"#!/usr/bin/env bash",
"set -u -o pipefail",
"",
'SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"',
'RUN_DIR="$SCRIPT_DIR"',
"",
f'STAGE_LABEL="{stage_label}"',
f'STAGE_SLUG="{stage_slug}"',
f'WORKFLOW_NAME="{workflow_name}"',
f'RUN_ID="{run_id}"',
f"TOTAL_COUNT={total_count}",
"SUCCESS_COUNT=0",
"FAILURES=()",
f'UPSTREAM_JOB_MAP_REL="{upstream_job_map_rel}"',
f'EMITTED_JOB_MAP_REL="{emitted_job_map_rel}"',
'UPSTREAM_JOB_MAP=""',
'EMITTED_JOB_MAP=""',
'if [[ -n "$UPSTREAM_JOB_MAP_REL" ]]; then',
' UPSTREAM_JOB_MAP="$RUN_DIR/$UPSTREAM_JOB_MAP_REL"',
"fi",
'if [[ -n "$EMITTED_JOB_MAP_REL" ]]; then',
' EMITTED_JOB_MAP="$RUN_DIR/$EMITTED_JOB_MAP_REL"',
' : > "$EMITTED_JOB_MAP"',
"fi",
"",
"_lookup_upstream_job_id() {",
' local lookup_task="$1"',
' if [[ -z "$UPSTREAM_JOB_MAP" || ! -f "$UPSTREAM_JOB_MAP" ]]; then',
" return 0",
" fi",
" while IFS=$'\\t' read -r map_task map_job_id; do",
' if [[ "$map_task" == "$lookup_task" ]]; then',
' printf "%s" "$map_job_id"',
" return 0",
" fi",
' done < "$UPSTREAM_JOB_MAP"',
" return 0",
"}",
"",
"_extract_final_job_id() {",
' local log_path="$1"',
' local marker_line=""',
' if [[ ! -f "$log_path" ]]; then',
" return 0",
" fi",
f' marker_line="$(grep -E \'^{WORKFLOW_FINAL_JOB_ID_MARKER}\' "$log_path" | tail -n 1 || true)"',
' if [[ -z "$marker_line" ]]; then',
" return 0",
" fi",
f' marker_line="${{marker_line#{WORKFLOW_FINAL_JOB_ID_MARKER}}}"',
" marker_line=\"${marker_line//$'\\r'/}\"",
' marker_line="$(printf "%s" "$marker_line" | tr -d "[:space:]")"',
' printf "%s" "$marker_line"',
"}",
"",
"TASK_ENTRIES=(",
]
if entries:
lines.extend(entries)
else:
lines.append(" ''")
lines.extend(
[
")",
"",
'echo "[${STAGE_SLUG}] workflow=${WORKFLOW_NAME} run_id=${RUN_ID}"',
'echo "[${STAGE_SLUG}] tasks=${TOTAL_COUNT}"',
"",
'for entry in "${TASK_ENTRIES[@]}"; do',
' if [[ -z "$entry" ]]; then',
" continue",
" fi",
' task_id="${entry%%|*}"',
' rel_script="${entry#*|}"',
' task_script="$RUN_DIR/$rel_script"',
"",
' if [[ ! -f "$task_script" ]]; then',
' echo "[warn] ${task_id}: missing script ${rel_script}" >&2',
' FAILURES+=("${task_id}:missing_script")',
" continue",
" fi",
"",
' upstream_job_id="$(_lookup_upstream_job_id "$task_id")"',
' if [[ -n "$upstream_job_id" ]]; then',
' echo "[run] ${task_id}: bash ${rel_script} (upstream_job_id=${upstream_job_id})"',
" else",
' echo "[run] ${task_id}: bash ${rel_script}"',
" fi",
"",
' task_log="$RUN_DIR/.${STAGE_SLUG}_${task_id}.log"',
' : > "$task_log"',
' if [[ -n "$upstream_job_id" ]]; then',
f' {WORKFLOW_UPSTREAM_JOB_ID_ENV}="$upstream_job_id" bash "$task_script" > >(tee "$task_log") 2>&1',
" else",
' bash "$task_script" > >(tee "$task_log") 2>&1',
" fi",
" exit_code=$?",
' final_job_id="$(_extract_final_job_id "$task_log")"',
' rm -f "$task_log"',
"",
" if [[ $exit_code -eq 0 ]]; then",
" SUCCESS_COUNT=$((SUCCESS_COUNT + 1))",
' if [[ -n "$EMITTED_JOB_MAP" && -n "$final_job_id" ]]; then',
' printf "%s\\t%s\\n" "$task_id" "$final_job_id" >> "$EMITTED_JOB_MAP"',
' echo "[meta] ${task_id}: final_job_id=${final_job_id}"',
" fi",
' echo "[ok] ${task_id}"',
" else",
' echo "[error] ${task_id}: exit=${exit_code}" >&2',
' FAILURES+=("${task_id}:exit_${exit_code}")',
" fi",
"done",
"",
'echo "[summary] ${STAGE_SLUG}: success=${SUCCESS_COUNT}/${TOTAL_COUNT}"',
'if [[ -n "$EMITTED_JOB_MAP_REL" ]]; then',
' echo "[summary] ${STAGE_SLUG}: job_map=${EMITTED_JOB_MAP_REL}"',
"fi",
"if [[ ${#FAILURES[@]} -gt 0 ]]; then",
' echo "[summary] ${STAGE_SLUG}: failures=${#FAILURES[@]}" >&2',
' for item in "${FAILURES[@]}"; do',
' echo " - ${item}" >&2',
" done",
" exit 1",
"fi",
"exit 0",
]
)
return "\n".join(lines) + "\n"
def _render_workflow_all_in_one_driver_script(
*,
workflow_name: str,
run_id: str,
task_stage_script_relpaths: list[tuple[str, str, str, str]],
) -> str:
entries = [
f" {shlex.quote(task_id + '|' + sim_rel + '|' + post_rel + '|' + plot_rel)}"
for task_id, sim_rel, post_rel, plot_rel in task_stage_script_relpaths
]
total_count = len(task_stage_script_relpaths)
lines: list[str] = [
"#!/usr/bin/env bash",
"set -u -o pipefail",
"",
'SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"',
'RUN_DIR="$SCRIPT_DIR"',
"",
f'WORKFLOW_NAME="{workflow_name}"',
f'RUN_ID="{run_id}"',
f"TOTAL_COUNT={total_count}",
"SUCCESS_COUNT=0",
"FAILURES=()",
'POLL_SECONDS="${FERMILINK_POLL_SECONDS:-20}"',
'MAX_POLLS="${FERMILINK_MAX_POLLS:-0}"',
'STAGE_FINAL_JOB_ID=""',
"",
'if ! [[ "$POLL_SECONDS" =~ ^[0-9]+([.][0-9]+)?$ ]]; then',
' echo "[error] invalid FERMILINK_POLL_SECONDS=${POLL_SECONDS}" >&2',
" exit 2",
"fi",
'if ! [[ "$MAX_POLLS" =~ ^[0-9]+$ ]]; then',
' echo "[error] invalid FERMILINK_MAX_POLLS=${MAX_POLLS}" >&2',
" exit 2",
"fi",
"",
"_extract_final_job_id() {",
' local log_path="$1"',
' local marker_line=""',
' if [[ ! -f "$log_path" ]]; then',
" return 0",
" fi",
f' marker_line="$(grep -E \'^{WORKFLOW_FINAL_JOB_ID_MARKER}\' "$log_path" | tail -n 1 || true)"',
' if [[ -z "$marker_line" ]]; then',
" return 0",
" fi",
f' marker_line="${{marker_line#{WORKFLOW_FINAL_JOB_ID_MARKER}}}"',
" marker_line=\"${marker_line//$'\\r'/}\"",
' marker_line="$(printf "%s" "$marker_line" | tr -d "[:space:]")"',
' printf "%s" "$marker_line"',
"}",
"",
"_query_slurm_job_state() {",
' local job_id="$1"',
' local state=""',
" if command -v sacct >/dev/null 2>&1; then",
' state="$(sacct -n -o State -j "$job_id" 2>/dev/null | awk \'NF {print $1; exit}\')"',
" fi",
' if [[ -z "$state" ]] && command -v squeue >/dev/null 2>&1; then',
' if squeue -h -j "$job_id" 2>/dev/null | grep -q .; then',
' state="PENDING"',
" fi",
" fi",
' state="${state%%+*}"',
' state="${state%% *}"',
' printf "%s" "$state"',
"}",
"",
"_wait_for_slurm_job() {",
' local job_id="$1"',
' local stage_ref="$2"',
" local poll_count=0",
" if ! command -v sacct >/dev/null 2>&1 && ! command -v squeue >/dev/null 2>&1; then",
' echo "[error] ${stage_ref}: cannot wait for job ${job_id}; both `sacct` and `squeue` are unavailable" >&2',
" return 1",
" fi",
" while true; do",
' local state=""',
' state="$(_query_slurm_job_state "$job_id")"',
' if [[ "$state" == "COMPLETED" ]]; then',
' echo "[wait] ${stage_ref}: job ${job_id} COMPLETED"',
" return 0",
" fi",
' case "$state" in',
" FAILED|CANCELLED|TIMEOUT|NODE_FAIL|OUT_OF_MEMORY|PREEMPTED|BOOT_FAIL|DEADLINE|REVOKED|SPECIAL_EXIT|STOPPED)",
' echo "[error] ${stage_ref}: job ${job_id} finished in state ${state}" >&2',
" return 1",
" ;;",
" esac",
" poll_count=$((poll_count + 1))",
' if [[ "$MAX_POLLS" -gt 0 && "$poll_count" -ge "$MAX_POLLS" ]]; then',
' echo "[error] ${stage_ref}: exceeded FERMILINK_MAX_POLLS=${MAX_POLLS} while waiting for job ${job_id}" >&2',
" return 1",
" fi",
' if [[ -n "$state" ]]; then',
' echo "[wait] ${stage_ref}: job ${job_id} state=${state}; sleeping ${POLL_SECONDS}s"',
" else",
' echo "[wait] ${stage_ref}: job ${job_id} state unavailable; sleeping ${POLL_SECONDS}s"',
" fi",
' sleep "$POLL_SECONDS"',
" done",
"}",
"",
"_run_stage_script() {",
' local task_id="$1"',
' local stage_label="$2"',
' local rel_script="$3"',
' local upstream_job_id="${4:-}"',
' local stage_script="$RUN_DIR/$rel_script"',
' local stage_log="$RUN_DIR/.run_all_${task_id}_${stage_label}.log"',
' STAGE_FINAL_JOB_ID=""',
' if [[ ! -f "$stage_script" ]]; then',
' echo "[error] ${task_id}/${stage_label}: missing script ${rel_script}" >&2',
" return 127",
" fi",
' if [[ -n "$upstream_job_id" ]]; then',
' echo "[run] ${task_id}/${stage_label}: bash ${rel_script} (upstream_job_id=${upstream_job_id})"',
" else",
' echo "[run] ${task_id}/${stage_label}: bash ${rel_script}"',
" fi",
' : > "$stage_log"',
' if [[ -n "$upstream_job_id" ]]; then',
f' {WORKFLOW_UPSTREAM_JOB_ID_ENV}="$upstream_job_id" bash "$stage_script" > >(tee "$stage_log") 2>&1',
" else",
' bash "$stage_script" > >(tee "$stage_log") 2>&1',
" fi",
" local exit_code=$?",
' local final_job_id=""',
' final_job_id="$(_extract_final_job_id "$stage_log")"',
' rm -f "$stage_log"',
' STAGE_FINAL_JOB_ID="$final_job_id"',
' if [[ "$exit_code" -ne 0 ]]; then',
' echo "[error] ${task_id}/${stage_label}: exit=${exit_code}" >&2',
' return "$exit_code"',
" fi",
' if [[ -n "$final_job_id" ]]; then',
' echo "[meta] ${task_id}/${stage_label}: final_job_id=${final_job_id}"',
" fi",
" return 0",
"}",
"",
"TASK_ENTRIES=(",
]
if entries:
lines.extend(entries)
else:
lines.append(" ''")
lines.extend(
[
")",
"",
'echo "[run_all] workflow=${WORKFLOW_NAME} run_id=${RUN_ID}"',
'echo "[run_all] tasks=${TOTAL_COUNT}"',
"",
'for entry in "${TASK_ENTRIES[@]}"; do',
' if [[ -z "$entry" ]]; then',
" continue",
" fi",
' task_id="${entry%%|*}"',
' rest="${entry#*|}"',
' sim_rel="${rest%%|*}"',
' rest="${rest#*|}"',
' post_rel="${rest%%|*}"',
' plot_rel="${rest#*|}"',
"",
' sim_job_id=""',
' post_job_id=""',
' plot_job_id=""',
"",
' if ! _run_stage_script "$task_id" "simulation" "$sim_rel" ""; then',
" exit_code=$?",
' FAILURES+=("${task_id}:simulation_exit_${exit_code}")',
" continue",
" fi",
' sim_job_id="$STAGE_FINAL_JOB_ID"',
' if [[ -n "$sim_job_id" ]]; then',
' if ! _wait_for_slurm_job "$sim_job_id" "${task_id}/simulation"; then',
' FAILURES+=("${task_id}:simulation_job_wait_failed_${sim_job_id}")',
" continue",
" fi",
" fi",
"",
' if ! _run_stage_script "$task_id" "postprocess" "$post_rel" "$sim_job_id"; then',
" exit_code=$?",
' FAILURES+=("${task_id}:postprocess_exit_${exit_code}")',
" continue",
" fi",
' post_job_id="$STAGE_FINAL_JOB_ID"',
' if [[ -n "$post_job_id" ]]; then',
' if ! _wait_for_slurm_job "$post_job_id" "${task_id}/postprocess"; then',
' FAILURES+=("${task_id}:postprocess_job_wait_failed_${post_job_id}")',
" continue",
" fi",
" fi",
"",
' if ! _run_stage_script "$task_id" "plot" "$plot_rel" "$post_job_id"; then',
" exit_code=$?",
' FAILURES+=("${task_id}:plot_exit_${exit_code}")',
" continue",
" fi",
' plot_job_id="$STAGE_FINAL_JOB_ID"',
' if [[ -n "$plot_job_id" ]]; then',
' if ! _wait_for_slurm_job "$plot_job_id" "${task_id}/plot"; then',
' FAILURES+=("${task_id}:plot_job_wait_failed_${plot_job_id}")',
" continue",
" fi",
" fi",
"",
" SUCCESS_COUNT=$((SUCCESS_COUNT + 1))",
' echo "[ok] ${task_id}: simulation + postprocess + plot completed"',
"done",
"",
'echo "[summary] run_all: success=${SUCCESS_COUNT}/${TOTAL_COUNT}"',
"if [[ ${#FAILURES[@]} -gt 0 ]]; then",
' echo "[summary] run_all: failures=${#FAILURES[@]}" >&2',
' for item in "${FAILURES[@]}"; do',
' echo " - ${item}" >&2',
" done",
" exit 1",
"fi",
"exit 0",
]
)
return "\n".join(lines) + "\n"
def _write_workflow_stage_driver_scripts(
*,
run_dir: Path,
workflow_name: str,
run_id: str,
simulation_task_scripts: list[tuple[str, Path]],
postprocess_task_scripts: list[tuple[str, Path]],
plot_task_scripts: list[tuple[str, Path]],
) -> dict[str, object]:
run_all_path = run_dir / WORKFLOW_ALLINONE_BATCH_SCRIPT_FILENAME
simulation_path = run_dir / WORKFLOW_SIMULATION_BATCH_SCRIPT_FILENAME
postprocess_path = run_dir / WORKFLOW_POSTPROCESS_BATCH_SCRIPT_FILENAME
plot_path = run_dir / WORKFLOW_PLOT_BATCH_SCRIPT_FILENAME
simulation_job_map_path = run_dir / WORKFLOW_SIMULATION_JOB_MAP_FILENAME
postprocess_job_map_path = run_dir / WORKFLOW_POSTPROCESS_JOB_MAP_FILENAME
plot_job_map_path = run_dir / WORKFLOW_PLOT_JOB_MAP_FILENAME
simulation_rel = [
(task_id, str(path.relative_to(run_dir)))
for task_id, path in simulation_task_scripts
]
postprocess_rel = [
(task_id, str(path.relative_to(run_dir)))
for task_id, path in postprocess_task_scripts
]
plot_rel = [
(task_id, str(path.relative_to(run_dir))) for task_id, path in plot_task_scripts
]
if not (
len(simulation_task_scripts)
== len(postprocess_task_scripts)
== len(plot_task_scripts)
):
cli = _cli()
raise cli.PackageError(
"Workflow stage script lists are inconsistent; cannot generate run-all script."
)
run_all_rel = [
(
task_id,
str(sim_path.relative_to(run_dir)),
str(post_path.relative_to(run_dir)),
str(plot_path.relative_to(run_dir)),
)
for (task_id, sim_path), (_, post_path), (_, plot_path) in zip(
simulation_task_scripts, postprocess_task_scripts, plot_task_scripts
)
]
_write_text_file(
run_all_path,
_render_workflow_all_in_one_driver_script(
workflow_name=workflow_name,
run_id=run_id,
task_stage_script_relpaths=run_all_rel,
),
)
_write_text_file(
simulation_path,
_render_workflow_stage_driver_script(
workflow_name=workflow_name,
run_id=run_id,
stage_label="simulation",
task_script_relpaths=simulation_rel,
emitted_job_map_filename=WORKFLOW_SIMULATION_JOB_MAP_FILENAME,
),
)
_write_text_file(
postprocess_path,
_render_workflow_stage_driver_script(
workflow_name=workflow_name,
run_id=run_id,
stage_label="postprocess",
task_script_relpaths=postprocess_rel,
upstream_job_map_filename=WORKFLOW_SIMULATION_JOB_MAP_FILENAME,
emitted_job_map_filename=WORKFLOW_POSTPROCESS_JOB_MAP_FILENAME,
),
)
_write_text_file(
plot_path,
_render_workflow_stage_driver_script(
workflow_name=workflow_name,
run_id=run_id,
stage_label="plot",
task_script_relpaths=plot_rel,
upstream_job_map_filename=WORKFLOW_POSTPROCESS_JOB_MAP_FILENAME,
emitted_job_map_filename=WORKFLOW_PLOT_JOB_MAP_FILENAME,
),
)
_set_file_executable(run_all_path)
_set_file_executable(simulation_path)
_set_file_executable(postprocess_path)
_set_file_executable(plot_path)
return {
"run_all_script_path": str(run_all_path),
"simulation_script_path": str(simulation_path),
"postprocess_script_path": str(postprocess_path),
"plot_script_path": str(plot_path),
"simulation_job_map_path": str(simulation_job_map_path),
"postprocess_job_map_path": str(postprocess_job_map_path),
"plot_job_map_path": str(plot_job_map_path),
}
def _non_comment_shell_lines(script_text: str) -> list[str]:
lines: list[str] = []
for raw_line in script_text.splitlines():
stripped = raw_line.strip()
if not stripped or stripped.startswith("#"):
continue
lines.append(stripped)
return lines
def _build_hpc_contract_issue(
*,
stage_label: str,
task_id: str,
script_rel: str,
code: str,
message: str,
) -> dict[str, str]:
return {
"stage": stage_label,
"task_id": task_id,
"script_path": script_rel,
"code": code,
"message": message,
}
def _format_hpc_contract_issue(issue: dict[str, str]) -> str:
stage_label = str(issue.get("stage") or "stage").strip() or "stage"
task_id = str(issue.get("task_id") or "task").strip() or "task"
script_rel = str(issue.get("script_path") or "unknown").strip() or "unknown"
message = (
str(issue.get("message") or "validation issue").strip() or "validation issue"
)
return f"{stage_label}:{task_id} ({script_rel}) {message}"
def _collect_hpc_stage_task_script_issues(
*,
repo_dir: Path,
stage_label: str,
task_scripts: list[tuple[str, Path]],
require_upstream_dependency: bool,
require_dependency_for_multi_sbatch: bool,
) -> list[dict[str, str]]:
issues: list[dict[str, str]] = []
for task_id, script_path in task_scripts:
script_rel = _repo_relative_path(repo_dir, script_path)
try:
script_text = script_path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="read_failed",
message=f"failed to read script: {exc}",
)
)
continue
command_lines = _non_comment_shell_lines(script_text)
sbatch_count = sum(
1 for line in command_lines if SBATCH_TOKEN_RE.search(line) is not None
)
if sbatch_count <= 0:
continue
has_parsable = any(
SBATCH_PARSABLE_RE.search(line) is not None for line in command_lines
)
has_afterok = any(
SBATCH_AFTEROK_RE.search(line) is not None for line in command_lines
)
has_final_job_marker = any(
WORKFLOW_FINAL_JOB_ID_MARKER in line for line in command_lines
)
has_upstream_env_ref = any(
WORKFLOW_UPSTREAM_JOB_ID_ENV in line for line in command_lines
)
if not has_parsable:
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="missing_parsable",
message="uses `sbatch` without `--parsable`.",
)
)
if not has_final_job_marker:
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="missing_final_job_marker",
message=(
"uses `sbatch` without emitting "
f"`{WORKFLOW_FINAL_JOB_ID_MARKER}<job_id>`."
),
)
)
if (
require_dependency_for_multi_sbatch
and sbatch_count >= 2
and not has_afterok
):
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="missing_afterok_chain",
message=(
"has multiple `sbatch` commands without "
"`--dependency=afterok:<job_id>` chaining."
),
)
)
if require_upstream_dependency:
if not has_upstream_env_ref:
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="missing_upstream_env_ref",
message=(
"uses `sbatch` without referencing "
f"`{WORKFLOW_UPSTREAM_JOB_ID_ENV}`."
),
)
)
if not has_afterok:
issues.append(
_build_hpc_contract_issue(
stage_label=stage_label,
task_id=task_id,
script_rel=script_rel,
code="missing_upstream_afterok",
message=(
"uses `sbatch` without an `--dependency=afterok:` "
"path for upstream gating."
),
)
)
return issues
def _collect_hpc_workflow_task_script_issues(
*,
repo_dir: Path,
simulation_task_scripts: list[tuple[str, Path]],
postprocess_task_scripts: list[tuple[str, Path]],
plot_task_scripts: list[tuple[str, Path]],
) -> list[dict[str, str]]:
issues: list[dict[str, str]] = []
issues.extend(
_collect_hpc_stage_task_script_issues(
repo_dir=repo_dir,
stage_label="simulation",
task_scripts=simulation_task_scripts,
require_upstream_dependency=False,
require_dependency_for_multi_sbatch=True,
)
)
issues.extend(
_collect_hpc_stage_task_script_issues(
repo_dir=repo_dir,
stage_label="postprocess",
task_scripts=postprocess_task_scripts,
require_upstream_dependency=True,
require_dependency_for_multi_sbatch=False,
)
)
issues.extend(
_collect_hpc_stage_task_script_issues(
repo_dir=repo_dir,
stage_label="plot",
task_scripts=plot_task_scripts,
require_upstream_dependency=True,
require_dependency_for_multi_sbatch=False,
)
)
return issues
def _summarize_hpc_contract_issues(issues: list[dict[str, str]]) -> str | None:
if not issues:
return None
rendered = [_format_hpc_contract_issue(issue) for issue in issues]
return "HPC SLURM task-script contract validation failed: " + "; ".join(rendered)
def _hpc_contract_issues_fingerprint(issues: list[dict[str, str]]) -> str:
normalized = [
{
"stage": str(item.get("stage") or ""),
"task_id": str(item.get("task_id") or ""),
"script_path": str(item.get("script_path") or ""),
"code": str(item.get("code") or ""),
"message": str(item.get("message") or ""),
}
for item in issues
if isinstance(item, dict)
]
normalized.sort(
key=lambda item: (
item["stage"],
item["task_id"],
item["script_path"],
item["code"],
item["message"],
)
)
payload = json.dumps(normalized, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(payload.encode("utf-8")).hexdigest()
def _render_hpc_contract_feedback_block(
*,
stage_label: str,
attempt: int,
max_attempts: int,
issues: list[dict[str, str]],
) -> str:
limited = issues[:WORKFLOW_HPC_FEEDBACK_MAX_ISSUES]
issue_lines = [f"- {_format_hpc_contract_issue(issue)}" for issue in limited]
if len(issues) > len(limited):
issue_lines.append(
f"- ... plus {len(issues) - len(limited)} additional issues."
)
issue_json = json.dumps({"issues": limited}, indent=2)
return (
"\n"
"Validation feedback from previous attempt (must fix all before continuing):\n"
f"- Stage: {stage_label}\n"
f"- Attempt: {attempt}/{max_attempts}\n"
"- Focus edits on failing task scripts only; keep unrelated files unchanged.\n"
"- Keep scripts valid bash with shebang + `set -euo pipefail`.\n"
+ "\n".join(issue_lines)
+ "\n"
"Structured issue payload:\n"
f"{issue_json}\n"
)
def _write_hpc_contract_errors_artifact(
*,
path: Path,
workflow_name: str,
run_id: str,
stage_label: str,
attempt: int,
max_attempts: int,
issues: list[dict[str, str]],
stage_errors: list[str],
resolved: bool,
) -> None:
payload: dict[str, object] = {
"version": 1,
"updated_at_utc": _utc_now_z(),
"workflow": workflow_name,
"run_id": run_id,
"stage": stage_label,
"attempt": attempt,
"max_attempts": max_attempts,
"resolved": resolved,
"issue_count": len(issues),
"issues": issues,
"stage_errors": stage_errors,
}
_write_text_file(path, json.dumps(payload, indent=2) + "\n")
def _validate_hpc_stage_task_scripts(
*,
repo_dir: Path,
stage_label: str,
task_scripts: list[tuple[str, Path]],
require_upstream_dependency: bool,
require_dependency_for_multi_sbatch: bool,
) -> list[str]:
issues = _collect_hpc_stage_task_script_issues(
repo_dir=repo_dir,
stage_label=stage_label,
task_scripts=task_scripts,
require_upstream_dependency=require_upstream_dependency,
require_dependency_for_multi_sbatch=require_dependency_for_multi_sbatch,
)
return [_format_hpc_contract_issue(issue) for issue in issues]
def _validate_hpc_workflow_task_scripts(
*,
repo_dir: Path,
simulation_task_scripts: list[tuple[str, Path]],
postprocess_task_scripts: list[tuple[str, Path]],
plot_task_scripts: list[tuple[str, Path]],
) -> str | None:
issues = _collect_hpc_workflow_task_script_issues(
repo_dir=repo_dir,
simulation_task_scripts=simulation_task_scripts,
postprocess_task_scripts=postprocess_task_scripts,
plot_task_scripts=plot_task_scripts,
)
return _summarize_hpc_contract_issues(issues)
def _validate_report_stage_artifacts(
*,
stage_label: str,
report_path: Path,
summary_paths: list[Path],
before_report_signature: tuple[int, int, str] | None,
before_summary_signatures: dict[str, tuple[int, int, str] | None],
required_marker: str,
required_files: list[Path] | None = None,
before_required_signatures: dict[str, tuple[int, int, str] | None] | None = None,
) -> str | None:
errors: list[str] = []
report_signature = _capture_file_signature(report_path)
report_text = ""
if report_signature is None:
errors.append(f"missing report file: {report_path}")
else:
if report_signature[0] <= 0:
errors.append(f"empty report file: {report_path}")
try:
report_text = report_path.read_text(encoding="utf-8", errors="replace")
except OSError as exc:
errors.append(f"failed to read report file: {report_path}: {exc}")
if required_marker not in report_text:
errors.append(f"report missing required marker: {required_marker}")
summary_signatures = _capture_signatures(summary_paths)
for summary_path in summary_paths:
summary_signature = summary_signatures.get(str(summary_path))
if summary_signature is None:
errors.append(f"missing summary file: {summary_path}")
continue
if summary_signature[0] <= 0:
errors.append(f"empty summary file: {summary_path}")
required_signatures: dict[str, tuple[int, int, str] | None] = {}
if isinstance(required_files, list):
required_signatures = _capture_signatures(required_files)
for required_path in required_files:
required_signature = required_signatures.get(str(required_path))
if required_signature is None:
errors.append(f"missing required file: {required_path}")
continue
if required_signature[0] <= 0:
errors.append(f"empty required file: {required_path}")
artifacts_updated = report_signature != before_report_signature
if not artifacts_updated:
for path_text, after_signature in summary_signatures.items():
if after_signature != before_summary_signatures.get(path_text):
artifacts_updated = True
break
if (
not artifacts_updated
and isinstance(before_required_signatures, dict)
and required_signatures
):
for path_text, after_signature in required_signatures.items():
if after_signature != before_required_signatures.get(path_text):
artifacts_updated = True
break
if not artifacts_updated:
errors.append("no report/summary artifacts were updated in this stage")
if errors:
return f"{stage_label} validation failed: {'; '.join(errors)}"
return None
def _finalize_workflow_report(
*,
repo_dir: Path,
run_dir: Path,
runs_root: Path,
workflow_name: str,
source_description: str,
tasks_state: list[dict[str, object]],
requested_package_id: str | None,
sandbox_override: str | None,
provider_bin_override: str,
data_context: dict[str, object] | None = None,
hpc_context: dict[str, object] | None = None,
workflow_status_hook: WorkflowStatusHook | None = None,
) -> dict[str, object]:
"""Generate and audit final workflow reports for `reproduce` and `research`."""
cli = _cli()
plan_path = run_dir / REPRODUCE_PLAN_FILENAME
if not plan_path.is_file():
raise cli.PackageError(
f"Missing plan file for {workflow_name} report generation: {plan_path}"
)
summaries_root = run_dir / WORKFLOW_SUMMARIES_DIRNAME
summaries_root.mkdir(parents=True, exist_ok=True)
report_path = run_dir / WORKFLOW_REPORT_FILENAME
run_id = run_dir.name
generation_marker = f"<!-- FERMILINK_REPORT_STAGE:generated run_id={run_id} -->"
audit_marker = f"<!-- FERMILINK_REPORT_STAGE:audited run_id={run_id} -->"
hpc_contract_errors_path = run_dir / WORKFLOW_HPC_CONTRACT_ERRORS_FILENAME
def _display_path(path: Path) -> str:
try:
return str(path.relative_to(repo_dir))
except ValueError:
return str(path)
summary_paths: list[Path] = []
simulation_task_scripts: list[tuple[str, Path]] = []
postprocess_task_scripts: list[tuple[str, Path]] = []
plot_task_scripts: list[tuple[str, Path]] = []
task_lines: list[str] = []
for index, task in enumerate(tasks_state, start=1):
task_id = (
str(task.get("id") or f"task_{index:03d}").strip() or f"task_{index:03d}"
)
summary_path = summaries_root / task_id / "summary.md"
summary_path.parent.mkdir(parents=True, exist_ok=True)
summary_paths.append(summary_path)
simulation_task_script = (
summaries_root / task_id / WORKFLOW_TASK_SIMULATION_SCRIPT_FILENAME
)
postprocess_task_script = (
summaries_root / task_id / WORKFLOW_TASK_POSTPROCESS_SCRIPT_FILENAME
)
plot_task_script = summaries_root / task_id / WORKFLOW_TASK_PLOT_SCRIPT_FILENAME
simulation_task_scripts.append((task_id, simulation_task_script))
postprocess_task_scripts.append((task_id, postprocess_task_script))
plot_task_scripts.append((task_id, plot_task_script))
task_title = str(task.get("title") or task_id).strip() or task_id
task_lines.append(
"\n".join(
[
f"- {task_id}: {task_title}",
f" - summary: {_display_path(summary_path)}",
f" - simulation script: {_display_path(simulation_task_script)}",
f" - postprocess script: {_display_path(postprocess_task_script)}",
f" - plot script: {_display_path(plot_task_script)}",
]
)
)
required_stage_files: list[Path] = (
[path for _, path in simulation_task_scripts]
+ [path for _, path in postprocess_task_scripts]
+ [path for _, path in plot_task_scripts]
)
hpc_prompt_lines = _build_hpc_prompt_lines(hpc_context)
execution_target_block = "\n".join(hpc_prompt_lines)
hpc_mode = _is_hpc_context_enabled(hpc_context)
hpc_task_script_requirements = ""
if hpc_mode:
hpc_task_script_requirements = (
"7) In HPC SLURM mode, when any stage script uses `sbatch`, capture job ids via `sbatch --parsable` and emit the final submitted job id as `FERMILINK_FINAL_JOB_ID=<job_id>`.\n"
"8) In `run_simulation.sh`, when multiple `sbatch` submissions have intrinsic ordering (for example equilibration then production), chain them with `--dependency=afterok:<job_id>`.\n"
"9) In `run_postprocess.sh` and `run_plot.sh`, honor optional environment variable `FERMILINK_UPSTREAM_JOB_ID`; when set and using `sbatch`, apply `--dependency=afterok:${FERMILINK_UPSTREAM_JOB_ID}` to the first submission.\n"
)
generator_prompt = (
f"{WORKFLOW_REPORT_GENERATOR_PROMPT_PREFIX}\n"
f"{WORKFLOW_UNIFIED_MEMORY_STAGE_INSTRUCTIONS}\n"
"\n"
f"Workflow: {workflow_name}\n"
f"Source description: {source_description}\n"
"\n"
"Use these artifacts:\n"
f"- Plan JSON: {_display_path(plan_path)}\n"
f"- Task prompt directory: {_display_path(run_dir / REPRODUCE_PROMPTS_DIRNAME)}\n"
f"- Existing run logs directory: {_display_path(run_dir / REPRODUCE_LOGS_DIRNAME)}\n"
"\n"
"Execution target constraints:\n"
f"{execution_target_block}\n"
"\n"
"For each task, create/update one summary and three runnable task scripts at:\n"
+ "\n".join(task_lines)
+ "\n\n"
"Then create/update a polished top-level markdown report at:\n"
f"- {_display_path(report_path)}\n"
"\n"
"Task script requirements:\n"
"1) Each task script must be valid bash with shebang (`#!/usr/bin/env bash`).\n"
"2) Each task script should use `set -euo pipefail`.\n"
"3) `run_simulation.sh` should contain simulation submission/execution commands for that task.\n"
"4) `run_postprocess.sh` should contain post-processing commands for that task.\n"
"5) `run_plot.sh` should contain plotting commands for that task.\n"
"6) Prefer concrete commands from task artifacts; if uncertain, include explicit TODO comments while keeping script syntax valid.\n"
f"{hpc_task_script_requirements}"
"\n"
"Report requirements:\n"
"1) Brief objective and methodology sections.\n"
"2) Per-task results linked to corresponding task summaries.\n"
"3) Include markdown figure/image links to generated outputs whenever files exist.\n"
"4) Explicitly note missing artifacts or limitations.\n"
"5) End with concise conclusions and next-step suggestions.\n"
"6) Include this exact marker line anywhere in the report:\n"
f"{generation_marker}\n"
)
generation_max_attempts = WORKFLOW_HPC_VALIDATION_MAX_ATTEMPTS if hpc_mode else 2
generation_feedback_block = ""
generation_failure_reason = ""
generation_hpc_last_fingerprint = ""
generation_hpc_stall_count = 0
generation_success = False
generation_last_hpc_issues: list[dict[str, str]] = []
_emit_workflow_status(workflow_status_hook, "summary")
for attempt in range(1, generation_max_attempts + 1):
cli._print_tagged(
workflow_name,
f"report generation attempt {attempt}/{generation_max_attempts}",
)
before_report_signature = _capture_file_signature(report_path)
before_summary_signatures = _capture_signatures(summary_paths)
before_required_signatures = _capture_signatures(required_stage_files)
generation_prompt_with_feedback = (
generator_prompt + generation_feedback_block
if generation_feedback_block
else generator_prompt
)
run_result = _run_reproduce_exec_turn(
repo_dir=repo_dir,
prompt=generation_prompt_with_feedback,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
data_context=data_context,
)
return_code = int(run_result.get("return_code") or 0)
if return_code == 0:
generation_validation_error = _validate_report_stage_artifacts(
stage_label=f"{workflow_name} report generation",
report_path=report_path,
summary_paths=summary_paths,
before_report_signature=before_report_signature,
before_summary_signatures=before_summary_signatures,
required_marker=generation_marker,
required_files=required_stage_files,
before_required_signatures=before_required_signatures,
)
generation_last_hpc_issues = (
_collect_hpc_workflow_task_script_issues(
repo_dir=repo_dir,
simulation_task_scripts=simulation_task_scripts,
postprocess_task_scripts=postprocess_task_scripts,
plot_task_scripts=plot_task_scripts,
)
if hpc_mode
else []
)
hpc_script_validation_error = _summarize_hpc_contract_issues(
generation_last_hpc_issues
)
stage_errors = [
error
for error in [generation_validation_error, hpc_script_validation_error]
if isinstance(error, str) and error.strip()
]
if not stage_errors:
generation_success = True
break
generation_failure_reason = "; ".join(stage_errors)
for error in stage_errors:
cli._print_tagged(workflow_name, error, stderr=True)
if hpc_mode and generation_last_hpc_issues:
_write_hpc_contract_errors_artifact(
path=hpc_contract_errors_path,
workflow_name=workflow_name,
run_id=run_id,
stage_label="generation",
attempt=attempt,
max_attempts=generation_max_attempts,
issues=generation_last_hpc_issues,
stage_errors=stage_errors,
resolved=False,
)
generation_feedback_block = _render_hpc_contract_feedback_block(
stage_label="generation",
attempt=attempt,
max_attempts=generation_max_attempts,
issues=generation_last_hpc_issues,
)
fingerprint = _hpc_contract_issues_fingerprint(
generation_last_hpc_issues
)
if (
generation_hpc_last_fingerprint
and fingerprint == generation_hpc_last_fingerprint
):
generation_hpc_stall_count += 1
else:
generation_hpc_stall_count = 0
generation_hpc_last_fingerprint = fingerprint
if generation_hpc_stall_count >= WORKFLOW_HPC_VALIDATION_STALL_LIMIT:
generation_failure_reason = (
generation_failure_reason
+ " Repeated identical HPC contract issues detected with no progress."
)
break
else:
generation_feedback_block = ""
else:
generation_failure_reason = f"{workflow_name.title()} report generation failed with exit code {return_code}."
generation_feedback_block = ""
if not generation_success:
raise cli.PackageError(
generation_failure_reason
or f"{workflow_name.title()} report generation failed."
)
auditor_prompt = (
f"{WORKFLOW_REPORT_AUDITOR_PROMPT_PREFIX}\n"
f"{WORKFLOW_UNIFIED_MEMORY_STAGE_INSTRUCTIONS}\n"
"\n"
f"Workflow: {workflow_name}\n"
f"Source description: {source_description}\n"
"\n"
"Start from scratch as an independent reviewer.\n"
"Read and audit these files:\n"
f"- Plan JSON: {_display_path(plan_path)}\n"
f"- Per-task summaries root: {_display_path(summaries_root)}\n"
f"- Top-level report to audit in place: {_display_path(report_path)}\n"
"\n"
"Required audit actions:\n"
"1) Verify report consistency with plan and summaries.\n"
"2) Improve structure, clarity, and scientific correctness.\n"
"3) Keep/repair figure links and explain missing figures explicitly.\n"
"4) Update the same report file in place.\n"
"5) Ensure the report contains this exact marker line:\n"
f"{audit_marker}\n"
)
audit_max_attempts = WORKFLOW_HPC_VALIDATION_MAX_ATTEMPTS if hpc_mode else 2
audit_feedback_block = ""
audit_failure_reason = ""
audit_hpc_last_fingerprint = ""
audit_hpc_stall_count = 0
audit_success = False
audit_last_hpc_issues: list[dict[str, str]] = []
_emit_workflow_status(workflow_status_hook, "summary audit")
for attempt in range(1, audit_max_attempts + 1):
cli._print_tagged(
workflow_name,
f"report audit attempt {attempt}/{audit_max_attempts}",
)
before_report_signature = _capture_file_signature(report_path)
before_summary_signatures = _capture_signatures(summary_paths)
before_required_signatures = _capture_signatures(required_stage_files)
audit_prompt_with_feedback = (
auditor_prompt + audit_feedback_block
if audit_feedback_block
else auditor_prompt
)
run_result = _run_reproduce_exec_turn(
repo_dir=repo_dir,
prompt=audit_prompt_with_feedback,
requested_package_id=requested_package_id,
sandbox_override=sandbox_override,
provider_bin_override=provider_bin_override,
data_context=data_context,
)
return_code = int(run_result.get("return_code") or 0)
if return_code == 0:
audit_validation_error = _validate_report_stage_artifacts(
stage_label=f"{workflow_name} report audit",
report_path=report_path,
summary_paths=summary_paths,
before_report_signature=before_report_signature,
before_summary_signatures=before_summary_signatures,
required_marker=audit_marker,
required_files=required_stage_files,
before_required_signatures=before_required_signatures,
)
audit_last_hpc_issues = (
_collect_hpc_workflow_task_script_issues(
repo_dir=repo_dir,
simulation_task_scripts=simulation_task_scripts,
postprocess_task_scripts=postprocess_task_scripts,
plot_task_scripts=plot_task_scripts,
)
if hpc_mode
else []
)
hpc_script_validation_error = _summarize_hpc_contract_issues(
audit_last_hpc_issues
)
stage_errors = [
error
for error in [audit_validation_error, hpc_script_validation_error]
if isinstance(error, str) and error.strip()
]
if not stage_errors:
audit_success = True
break
audit_failure_reason = "; ".join(stage_errors)
for error in stage_errors:
cli._print_tagged(workflow_name, error, stderr=True)
if hpc_mode and audit_last_hpc_issues:
_write_hpc_contract_errors_artifact(
path=hpc_contract_errors_path,
workflow_name=workflow_name,
run_id=run_id,
stage_label="audit",
attempt=attempt,
max_attempts=audit_max_attempts,
issues=audit_last_hpc_issues,
stage_errors=stage_errors,
resolved=False,
)
audit_feedback_block = _render_hpc_contract_feedback_block(
stage_label="audit",
attempt=attempt,
max_attempts=audit_max_attempts,
issues=audit_last_hpc_issues,
)
fingerprint = _hpc_contract_issues_fingerprint(audit_last_hpc_issues)
if (
audit_hpc_last_fingerprint
and fingerprint == audit_hpc_last_fingerprint
):
audit_hpc_stall_count += 1
else:
audit_hpc_stall_count = 0
audit_hpc_last_fingerprint = fingerprint
if audit_hpc_stall_count >= WORKFLOW_HPC_VALIDATION_STALL_LIMIT:
audit_failure_reason = (
audit_failure_reason
+ " Repeated identical HPC contract issues detected with no progress."
)
break
else:
audit_feedback_block = ""
else:
audit_failure_reason = f"{workflow_name.title()} report audit failed with exit code {return_code}."
audit_feedback_block = ""
if not audit_success:
raise cli.PackageError(
audit_failure_reason or f"{workflow_name.title()} report audit failed."
)
if hpc_mode:
_write_hpc_contract_errors_artifact(
path=hpc_contract_errors_path,
workflow_name=workflow_name,
run_id=run_id,
stage_label="final",
attempt=0,
max_attempts=0,
issues=[],
stage_errors=[],
resolved=True,
)
stage_driver_paths = _write_workflow_stage_driver_scripts(
run_dir=run_dir,
workflow_name=workflow_name,
run_id=run_id,
simulation_task_scripts=simulation_task_scripts,
postprocess_task_scripts=postprocess_task_scripts,
plot_task_scripts=plot_task_scripts,
)
result = {
"report_path": str(report_path),
"summaries_root": str(summaries_root),
"summary_count": len(summary_paths),
**stage_driver_paths,
}
if hpc_mode:
result["hpc_contract_errors_path"] = str(hpc_contract_errors_path)
return result
[docs]
def cmd_plan_workflow(
args: argparse.Namespace,
*,
workflow_name: str,
runs_dir_name: str,
generate_plan,
) -> int:
"""Shared orchestration backbone for `reproduce` and `research`.
Dependency note:
- This planner/executor is reused by both major workflow commands.
- Task execution is delegated to `loop`, which itself reuses the `exec` stack.
"""
cli = _cli()
repo_dir = Path.cwd().resolve()
cli._ensure_exec_repo_ready(repo_dir, args)
user_prompt, prompt_file = cli._resolve_exec_like_user_prompt(args)
source_description = prompt_file or "inline prompt"
plan_only = bool(getattr(args, "plan_only", False))
report_only = bool(getattr(args, "report_only", False))
skip_report = bool(getattr(args, "skip_report", False))
if plan_only and report_only:
raise cli.PackageError("Cannot combine --plan-only and --report-only.")
if report_only and skip_report:
raise cli.PackageError("Cannot combine --report-only and --skip-report.")
if report_only:
cli._ensure_loop_memory(
repo_dir=repo_dir,
user_prompt=user_prompt,
prompt_file=prompt_file,
overwrite=False,
)
else:
cli._reset_loop_short_term_memory(
repo_dir=repo_dir,
user_prompt=user_prompt,
prompt_file=prompt_file,
workflow_context_lines=[
f"- workflow: {workflow_name}",
"- stage: workflow_entry",
],
)
task_max_runs_raw = getattr(args, "task_max_runs", 5)
try:
task_max_runs = int(task_max_runs_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--task-max-runs must be an integer.") from exc
if task_max_runs < 1:
raise cli.PackageError("--task-max-runs must be >= 1.")
planner_max_tries_raw = getattr(args, "planner_max_tries", 2)
try:
planner_max_tries = int(planner_max_tries_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--planner-max-tries must be an integer.") from exc
if planner_max_tries < 1:
raise cli.PackageError("--planner-max-tries must be >= 1.")
auditor_max_tries_raw = getattr(args, "auditor_max_tries", 2)
try:
auditor_max_tries = int(auditor_max_tries_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--auditor-max-tries must be an integer.") from exc
if auditor_max_tries < 1:
raise cli.PackageError("--auditor-max-tries must be >= 1.")
max_iterations_raw = getattr(args, "max_iterations", 10)
try:
max_iterations = int(max_iterations_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--max-iterations must be an integer.") from exc
if max_iterations < 1:
raise cli.PackageError("--max-iterations must be >= 1.")
wait_seconds_raw = getattr(args, "wait_seconds", 0.0)
try:
wait_seconds = float(wait_seconds_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--wait-seconds must be a number.") from exc
if wait_seconds < 0:
raise cli.PackageError("--wait-seconds must be >= 0.")
max_wait_seconds_raw = getattr(args, "max_wait_seconds", 600.0)
try:
max_wait_seconds = float(max_wait_seconds_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--max-wait-seconds must be a number.") from exc
if max_wait_seconds < 0:
raise cli.PackageError("--max-wait-seconds must be >= 0.")
pid_stall_seconds_raw = getattr(args, "pid_stall_seconds", 900.0)
try:
pid_stall_seconds = float(pid_stall_seconds_raw)
except (TypeError, ValueError) as exc:
raise cli.PackageError("--pid-stall-seconds must be a number.") from exc
if pid_stall_seconds < 0:
raise cli.PackageError("--pid-stall-seconds must be >= 0.")
workflow_status_hook_raw = getattr(args, "_fermilink_workflow_status_hook", None)
workflow_status_hook: WorkflowStatusHook | None = (
workflow_status_hook_raw if callable(workflow_status_hook_raw) else None
)
projects_dir = repo_dir / cli.LOOP_MEMORY_DIRNAME
runs_root = projects_dir / runs_dir_name
latest_path = runs_root / cli.REPRODUCE_LATEST_RUN_FILENAME
runs_root.mkdir(parents=True, exist_ok=True)
source_fingerprint = hashlib.sha256(user_prompt.strip().encode("utf-8")).hexdigest()
resume_enabled = bool(getattr(args, "resume", True))
if report_only and not resume_enabled:
raise cli.PackageError(
"--report-only requires --resume (do not use --restart)."
)
run_dir: Path | None = None
state: dict[str, object] | None = None
if resume_enabled and latest_path.is_file():
try:
latest_run_id = latest_path.read_text(encoding="utf-8").strip()
if latest_run_id:
candidate = runs_root / latest_run_id
candidate_state_path = candidate / cli.REPRODUCE_STATE_FILENAME
if candidate_state_path.is_file():
loaded = json.loads(
candidate_state_path.read_text(encoding="utf-8")
)
if isinstance(loaded, dict):
same_source = (
str(loaded.get("source_fingerprint") or "")
== source_fingerprint
)
status = str(loaded.get("status") or "")
if same_source and (status not in {"completed"} or report_only):
run_dir = candidate
state = loaded
except (OSError, json.JSONDecodeError):
run_dir = None
state = None
created_new_run = run_dir is None or state is None
if created_new_run:
if report_only:
raise cli.PackageError(
f"--report-only requires an existing resumable {workflow_name} run for this prompt."
)
run_id = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
run_dir = runs_root / run_id
run_dir.mkdir(parents=True, exist_ok=True)
state = {
"version": 1,
"run_id": run_id,
"status": "planning",
"created_at_utc": cli._utc_now_z(),
"updated_at_utc": cli._utc_now_z(),
"source_fingerprint": source_fingerprint,
"source_description": source_description,
"source_prompt_file": prompt_file,
"source_prompt_preview": user_prompt[:400],
"current_task_index": 0,
"tasks": [],
"task_runs": {},
"last_error": "",
}
else:
cli._print_tagged(workflow_name, f"resuming run: {run_dir.name}")
if report_only and not created_new_run:
# Report-only runs should finalize from existing run artifacts/config and
# must not be blocked by invocation-time mode drift from legacy runs.
state_data_context = _coerce_saved_data_context(state)
state_hpc_context = _coerce_saved_hpc_context(state)
state["data_context"] = state_data_context
state["hpc_context"] = state_hpc_context
else:
invocation_data_context = _resolve_invocation_data_context(
repo_dir=repo_dir,
run_dir=run_dir,
workflow_name=workflow_name,
args=args,
)
invocation_hpc_context = _resolve_invocation_hpc_context(
repo_dir=repo_dir,
args=args,
)
# Workflow CLI no longer exposes --data-dir; keep workflow data context fixed
# to invocation defaults and ignore legacy saved run-state toggles.
state_data_context = invocation_data_context
state["data_context"] = state_data_context
if created_new_run:
state_hpc_context = invocation_hpc_context
else:
saved_hpc_context = _coerce_saved_hpc_context(state)
_assert_hpc_context_compatible(
run_id=str(state.get("run_id") or run_dir.name),
workflow_name=workflow_name,
state_hpc_context=saved_hpc_context,
invocation_hpc_context=invocation_hpc_context,
)
state_hpc_context = invocation_hpc_context
state["hpc_context"] = state_hpc_context
if created_new_run:
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
try:
latest_path.write_text(run_dir.name + "\n", encoding="utf-8")
except OSError as exc:
raise cli.PackageError(
f"Failed to write latest {workflow_name} run file: {latest_path}: {exc}"
) from exc
if "dry_run" in state:
state.pop("dry_run", None)
prompts_dir = run_dir / cli.REPRODUCE_PROMPTS_DIRNAME
logs_dir = run_dir / cli.REPRODUCE_LOGS_DIRNAME
prompts_dir.mkdir(parents=True, exist_ok=True)
logs_dir.mkdir(parents=True, exist_ok=True)
if _is_data_context_enabled(state_data_context):
(run_dir / WORKFLOW_DATA_DIRNAME).mkdir(parents=True, exist_ok=True)
state_data_context = _prepare_workflow_data_artifacts(
repo_dir=repo_dir,
run_dir=run_dir,
data_context=state_data_context,
)
state["data_context"] = state_data_context
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
artifacts = (
state_data_context.get("artifacts")
if isinstance(state_data_context.get("artifacts"), dict)
else {}
)
if isinstance(artifacts, dict):
manifest_rel = str(
artifacts.get("manifest_compact") or artifacts.get("manifest") or ""
).strip()
manifest_full_rel = str(artifacts.get("manifest_full") or "").strip()
summary_rel = str(artifacts.get("summary") or "").strip()
if manifest_rel:
cli._print_tagged(
workflow_name, f"data manifest (compact): {manifest_rel}"
)
if manifest_full_rel:
cli._print_tagged(
workflow_name, f"data manifest (full): {manifest_full_rel}"
)
if summary_rel:
cli._print_tagged(workflow_name, f"data summary: {summary_rel}")
cli._print_tagged(workflow_name, f"run dir: {run_dir.relative_to(repo_dir)}")
if _is_hpc_context_enabled(state_hpc_context):
hpc_profile_rel = str(state_hpc_context.get("profile_relpath") or "").strip()
hpc_profile_path = str(state_hpc_context.get("profile_path") or "").strip()
hpc_profile_label = hpc_profile_rel or hpc_profile_path
cli._print_tagged(workflow_name, "execution target: hpc_slurm")
if hpc_profile_label:
cli._print_tagged(workflow_name, f"hpc profile: {hpc_profile_label}")
else:
cli._print_tagged(workflow_name, "execution target: local")
state_status = str(state.get("status") or "planning")
tasks_state_raw = state.get("tasks")
has_existing_tasks = isinstance(tasks_state_raw, list) and bool(tasks_state_raw)
if state_status == "planning" or not has_existing_tasks:
if report_only:
raise cli.PackageError(
f"--report-only requires an existing {workflow_name} run with generated tasks."
)
plan = generate_plan(
repo_dir=repo_dir,
run_dir=run_dir,
source_text=user_prompt,
source_description=source_description,
requested_package_id=args.package_id,
sandbox_override=args.sandbox,
provider_bin_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE,
planner_max_tries=planner_max_tries,
auditor_max_tries=auditor_max_tries,
data_context=state_data_context,
hpc_context=state_hpc_context,
workflow_status_hook=workflow_status_hook,
)
cli._materialize_mode_plan(
run_dir=run_dir,
plan=plan,
state=state,
workflow_name=workflow_name,
)
state["status"] = (
"plan_ready" if bool(getattr(args, "plan_only", False)) else "running_tasks"
)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
cli._print_tagged(
workflow_name, f"plan ready with {len(state.get('tasks') or [])} tasks"
)
elif state_status == "completed" and not report_only:
cli._print_tagged(workflow_name, "run already completed")
print(cli.LOOP_DONE_TOKEN)
return 0
plan_synced = cli._maybe_sync_mode_plan_from_disk(
repo_dir=repo_dir,
run_dir=run_dir,
state=state,
source_description=source_description,
workflow_name=workflow_name,
data_context=state_data_context,
hpc_context=state_hpc_context,
)
if plan_synced:
cli._print_tagged(workflow_name, "synced plan from plan.json")
if plan_only:
cli._print_tagged(
workflow_name, f"plan-only mode: {run_dir.relative_to(repo_dir)}"
)
return 0
tasks_state = state.get("tasks")
if not isinstance(tasks_state, list) or not tasks_state:
raise cli.PackageError(f"No tasks found in {workflow_name} state.")
total_task_count = len(tasks_state)
task_runs_state = state.get("task_runs")
if not isinstance(task_runs_state, dict):
task_runs_state = {}
state["task_runs"] = task_runs_state
def _print_report_artifacts(report_payload: object) -> None:
if not isinstance(report_payload, dict):
return
artifact_labels = [
("report_path", "report"),
("run_all_script_path", "run-all script"),
("simulation_script_path", "simulation script"),
("postprocess_script_path", "postprocess script"),
("plot_script_path", "plot script"),
("simulation_job_map_path", "simulation job map"),
("postprocess_job_map_path", "postprocess job map"),
("plot_job_map_path", "plot job map"),
("hpc_contract_errors_path", "hpc contract validation"),
]
for key, label in artifact_labels:
raw_path = str(report_payload.get(key) or "").strip()
if not raw_path:
continue
try:
relative = str(Path(raw_path).relative_to(repo_dir))
except Exception:
relative = raw_path
cli._print_tagged(workflow_name, f"{label}: {relative}")
if report_only:
try:
report_info = cli._finalize_workflow_report(
repo_dir=repo_dir,
run_dir=run_dir,
runs_root=runs_root,
workflow_name=workflow_name,
source_description=source_description,
tasks_state=tasks_state,
requested_package_id=args.package_id,
sandbox_override=args.sandbox,
provider_bin_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE,
data_context=state_data_context,
hpc_context=state_hpc_context,
workflow_status_hook=workflow_status_hook,
)
except cli.PackageError as exc:
state["last_error"] = str(exc)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
cli._print_tagged(workflow_name, str(exc), stderr=True)
return 1
state["report"] = report_info
state["last_error"] = ""
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
_print_report_artifacts(report_info)
return 0
state["status"] = "running_tasks"
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
data_artifacts = (
state_data_context.get("artifacts")
if isinstance(state_data_context.get("artifacts"), dict)
else {}
)
data_manifest_rel = (
str(
data_artifacts.get("manifest_compact")
or data_artifacts.get("manifest")
or ""
).strip()
if isinstance(data_artifacts, dict)
else ""
)
data_manifest_full_rel = (
str(data_artifacts.get("manifest_full") or "").strip()
if isinstance(data_artifacts, dict)
else ""
)
data_summary_rel = (
str(data_artifacts.get("summary") or "").strip()
if isinstance(data_artifacts, dict)
else ""
)
data_task_map_rel = (
str(data_artifacts.get("task_map") or "").strip()
if isinstance(data_artifacts, dict)
else ""
)
data_guard_enabled = _is_data_context_enabled(state_data_context) and bool(
state_data_context.get("read_only", True)
)
data_guard_source = (
Path(str(state_data_context.get("source_path") or ""))
if data_guard_enabled
else None
)
data_limits = (
state_data_context.get("limits")
if isinstance(state_data_context.get("limits"), dict)
else {}
)
data_guard_limits = (
{
"max_files": int(data_limits.get("max_files") or DEFAULT_DATA_MAX_FILES),
"max_total_bytes": int(
data_limits.get("max_total_bytes") or DEFAULT_DATA_MAX_TOTAL_BYTES
),
"max_file_bytes": int(
data_limits.get("max_file_bytes") or DEFAULT_DATA_MAX_FILE_BYTES
),
"hash_max_bytes": int(
data_limits.get("hash_max_bytes") or DEFAULT_DATA_HASH_MAX_BYTES
),
}
if data_guard_enabled
else {}
)
while True:
current_index_raw = state.get("current_task_index", 0)
try:
current_index = int(current_index_raw)
except (TypeError, ValueError):
current_index = 0
if current_index < 0:
current_index = 0
if current_index >= len(tasks_state):
if skip_report:
state["report"] = {
"skipped": True,
"reason": "skip_report_flag",
"updated_at_utc": cli._utc_now_z(),
}
cli._print_tagged(
workflow_name, "skipping report generation (--skip-report)"
)
else:
try:
report_info = cli._finalize_workflow_report(
repo_dir=repo_dir,
run_dir=run_dir,
runs_root=runs_root,
workflow_name=workflow_name,
source_description=source_description,
tasks_state=tasks_state,
requested_package_id=args.package_id,
sandbox_override=args.sandbox,
provider_bin_override=cli.DEFAULT_PROVIDER_BINARY_OVERRIDE,
data_context=state_data_context,
hpc_context=state_hpc_context,
workflow_status_hook=workflow_status_hook,
)
except cli.PackageError as exc:
state["status"] = "failed"
state["last_error"] = str(exc)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(
run_dir / cli.REPRODUCE_STATE_FILENAME, state
)
cli._print_tagged(workflow_name, str(exc), stderr=True)
return 1
state["report"] = report_info
state["status"] = "completed"
state["last_error"] = ""
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
_print_report_artifacts(state.get("report"))
print(cli.LOOP_DONE_TOKEN)
return 0
task = tasks_state[current_index]
if not isinstance(task, dict):
raise cli.PackageError(
f"Task index {current_index} in {workflow_name} state is invalid."
)
task_id = str(task.get("id") or f"task_{current_index + 1:03d}").strip()
prompt_rel = str(task.get("prompt_file") or "").strip()
if not prompt_rel:
raise cli.PackageError(
f"Task {task_id} is missing `prompt_file` in {workflow_name} state."
)
prompt_path = run_dir / prompt_rel
if not prompt_path.is_file():
raise cli.PackageError(f"Task prompt file does not exist: {prompt_path}")
task_data_rel = str(task.get("data_context_file") or "").strip()
if _is_data_context_enabled(state_data_context) and not task_data_rel:
task_data_rel = f"{WORKFLOW_DATA_DIRNAME}/{task_id}.md"
task_data_path = run_dir / task_data_rel if task_data_rel else None
if _is_data_context_enabled(state_data_context):
if task_data_path is None or not task_data_path.is_file():
raise cli.PackageError(
f"Task data context file does not exist: {task_data_path}"
)
plan_path = run_dir / REPRODUCE_PLAN_FILENAME
state_path = run_dir / REPRODUCE_STATE_FILENAME
def _memory_relpath(path: Path) -> str:
try:
return str(path.relative_to(repo_dir))
except ValueError:
return str(path)
workflow_prompt_preamble_lines = [
"Workflow preflight (research/reproduce mode):",
"- Before acting, read short/long term memory at `projects/memory.md`.",
f"- Before acting, read the full workflow plan at `{_memory_relpath(plan_path)}`.",
]
if isinstance(prompt_file, str) and prompt_file.strip():
workflow_prompt_preamble_lines.append(
"- Before acting, optionally read original paper or request at "
f"`{_memory_relpath(Path(prompt_file))}` for additional context if needed."
)
else:
workflow_prompt_preamble_lines.append(
"- Before acting, read original request from "
"`projects/memory.md` under `## Original request`."
)
if _is_data_context_enabled(state_data_context) and task_data_path is not None:
workflow_prompt_preamble_lines.extend(
[
f"- Before acting, read `{_memory_relpath(task_data_path)}`.",
(
"- Data scope rule: only use files listed in the task data "
"context unless strong evidence requires expansion."
),
(
"- If scope expansion is required, update "
f"`{data_task_map_rel}` and `{_memory_relpath(task_data_path)}` "
"with rationale/confidence before continuing."
),
]
)
workflow_prompt_preamble_lines.extend(
[
"",
"Execution target constraints:",
*_build_hpc_prompt_lines(state_hpc_context),
]
)
workflow_prompt_preamble_lines.extend(
[
"",
"Simulation policy:",
"- Execute the simulations required by each task and record reproducible results.",
]
)
workflow_prompt_preamble = "\n".join(workflow_prompt_preamble_lines).strip()
task_runs = int(task_runs_state.get(task_id, 0))
if task_runs == 0:
try:
task_prompt_text = prompt_path.read_text(
encoding="utf-8", errors="replace"
)
except OSError as exc:
raise cli.PackageError(
f"Failed to read task prompt file: {prompt_path}: {exc}"
) from exc
workflow_context_lines = [
f"- workflow: {workflow_name}",
"- simulation_execution: enforced (run required simulations; no dry-run mode)",
f"- plan_json: {_memory_relpath(plan_path)} (overall workflow task plan)",
f"- state_json: {_memory_relpath(state_path)} (workflow progress and task status)",
]
workflow_context_lines.extend(
[
"- execution_target_context:",
*_build_hpc_prompt_lines(state_hpc_context),
]
)
if _is_data_context_enabled(state_data_context):
if data_manifest_rel:
workflow_context_lines.append(
f"- data_manifest_json: {data_manifest_rel} (compact indexed data inventory)"
)
if data_manifest_full_rel:
workflow_context_lines.append(
f"- data_manifest_full_json: {data_manifest_full_rel} (full deterministic indexed data inventory)"
)
if data_summary_rel:
workflow_context_lines.append(
f"- data_summary_markdown: {data_summary_rel} (human-readable data scope)"
)
if data_task_map_rel:
workflow_context_lines.append(
f"- task_data_map_json: {data_task_map_rel} (task-to-file mapping)"
)
if task_data_path is not None:
workflow_context_lines.append(
f"- task_data_context: {_memory_relpath(task_data_path)} (allowed per-task files)"
)
mapping_mode = str(state_data_context.get("mapping_mode") or "").strip()
if mapping_mode:
workflow_context_lines.append(
f"- data_mapping_mode: {mapping_mode}"
)
cli._reset_loop_short_term_memory(
repo_dir=repo_dir,
user_prompt=task_prompt_text,
prompt_file=str(prompt_path),
workflow_context_lines=workflow_context_lines,
)
run_number = task_runs + 1
task_ordinal = current_index + 1
cli._print_tagged(
workflow_name,
(
f"task {task_ordinal}/{total_task_count} "
f"{task_id} run {run_number}/{task_max_runs}"
),
)
def _workflow_loop_iteration_hook(
iteration: int, max_loop_iterations: int
) -> None:
_emit_workflow_status(
workflow_status_hook,
(
f"{workflow_name} task {task_ordinal}/{total_task_count} "
f"loop {iteration}/{max_loop_iterations}"
),
)
loop_args = argparse.Namespace(
command="loop",
prompt=[str(prompt_path)],
package_id=args.package_id,
sandbox=args.sandbox,
max_iterations=max_iterations,
wait_seconds=wait_seconds,
max_wait_seconds=max_wait_seconds,
pid_stall_seconds=pid_stall_seconds,
init_git=args.init_git,
no_init_git=args.no_init_git,
workflow_prompt_preamble=workflow_prompt_preamble,
_fermilink_loop_iteration_hook=_workflow_loop_iteration_hook,
)
data_guard_before_scan: dict[str, object] | None = None
if (
data_guard_enabled
and data_guard_source is not None
and isinstance(data_guard_limits, dict)
):
data_guard_before_scan = _scan_data_dir_inventory(
data_dir=data_guard_source,
max_files=int(data_guard_limits["max_files"]),
max_total_bytes=int(data_guard_limits["max_total_bytes"]),
max_file_bytes=int(data_guard_limits["max_file_bytes"]),
hash_max_bytes=int(data_guard_limits["hash_max_bytes"]),
include_hash=False,
)
started_at = cli._utc_now_z()
code = cli._cmd_loop(loop_args)
loop_completion_commit = _normalize_checkpoint_payload(
getattr(loop_args, "_fermilink_completion_commit", None)
)
if (
data_guard_before_scan is not None
and data_guard_source is not None
and isinstance(data_guard_limits, dict)
):
data_guard_after_scan = _scan_data_dir_inventory(
data_dir=data_guard_source,
max_files=int(data_guard_limits["max_files"]),
max_total_bytes=int(data_guard_limits["max_total_bytes"]),
max_file_bytes=int(data_guard_limits["max_file_bytes"]),
hash_max_bytes=int(data_guard_limits["hash_max_bytes"]),
include_hash=False,
)
before_fingerprint = str(data_guard_before_scan.get("fingerprint") or "")
after_fingerprint = str(data_guard_after_scan.get("fingerprint") or "")
if before_fingerprint != after_fingerprint:
state["status"] = "failed"
state["last_error"] = (
"Read-only data guard violation: --data-dir changed during task "
f"{task_id} run {run_number}."
)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
diff_lines = _scan_diff_preview(
data_guard_before_scan, data_guard_after_scan
)
cli._print_tagged(
workflow_name,
str(state["last_error"])
+ "\n"
+ f"data_dir: {data_guard_source}\n"
+ "indexed diff preview:\n"
+ "\n".join(diff_lines),
stderr=True,
)
return 1
loop_outcome_payload = getattr(loop_args, "_fermilink_loop_outcome", None)
loop_status = ""
loop_reason = ""
provider_exit_code: int | None = None
if isinstance(loop_outcome_payload, dict):
loop_status = str(loop_outcome_payload.get("status") or "").strip()
loop_reason = str(loop_outcome_payload.get("reason") or "").strip()
provider_exit_code_raw = loop_outcome_payload.get("provider_exit_code")
if isinstance(provider_exit_code_raw, int):
provider_exit_code = provider_exit_code_raw
if not loop_status:
if code == 0:
loop_status = "done"
loop_reason = "loop_exit_code_0"
elif code == 1:
loop_status = "incomplete_max_iterations"
loop_reason = "legacy_loop_exit_code_1"
else:
loop_status = "provider_failure"
loop_reason = f"legacy_loop_exit_code_{code}"
provider_exit_code = code
finished_at = cli._utc_now_z()
task_runs_state[task_id] = run_number
state["updated_at_utc"] = finished_at
try:
(logs_dir / f"{task_id}_run_{run_number:02d}.json").write_text(
json.dumps(
{
"task_id": task_id,
"task_index": current_index + 1,
"run_number": run_number,
"started_at_utc": started_at,
"finished_at_utc": finished_at,
"loop_exit_code": code,
"loop_status": loop_status,
"loop_reason": loop_reason,
"provider_exit_code": provider_exit_code,
"completion_commit_status": str(
loop_completion_commit.get("status") or ""
),
"completion_commit_sha": str(
loop_completion_commit.get("sha") or ""
),
"completion_commit_error": str(
loop_completion_commit.get("error") or ""
),
"completion_commit_memory_only": str(
loop_completion_commit.get("memory_only") or "false"
),
},
indent=2,
)
+ "\n",
encoding="utf-8",
)
except OSError:
pass
if loop_status == "done":
state["current_task_index"] = current_index + 1
state["last_error"] = ""
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
continue
if loop_status == "incomplete_max_iterations" and run_number < task_max_runs:
state["last_error"] = (
f"Task {task_id} did not reach {cli.LOOP_DONE_TOKEN}; retrying "
f"({run_number}/{task_max_runs})."
)
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
continue
if loop_status == "incomplete_max_iterations":
state["status"] = "failed"
state["last_error"] = (
f"Task {task_id} exceeded --task-max-runs ({task_max_runs}) "
f"without {cli.LOOP_DONE_TOKEN}."
)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
cli._print_tagged(workflow_name, str(state["last_error"]), stderr=True)
return 1
state["status"] = "failed"
if loop_status == "provider_failure" and provider_exit_code is not None:
state["last_error"] = (
f"Task {task_id} failed with provider exit code {provider_exit_code}."
)
else:
state["last_error"] = (
f"Task {task_id} failed with loop status {loop_status or 'unknown'} "
f"(exit code {code})."
)
state["updated_at_utc"] = cli._utc_now_z()
cli._write_json_atomic(run_dir / cli.REPRODUCE_STATE_FILENAME, state)
cli._print_tagged(workflow_name, str(state["last_error"]), stderr=True)
return code
[docs]
def cmd_reproduce(args: argparse.Namespace) -> int:
"""Run publication reproduction orchestration.
Dependency note:
- `reproduce` delegates task execution to `loop`.
- `loop` depends on the same execution/routing stack used by `exec`.
"""
cli = _cli()
repo_dir = Path.cwd().resolve()
try:
return cmd_plan_workflow(
args,
workflow_name="reproduce",
runs_dir_name=cli.REPRODUCE_RUNS_DIR,
generate_plan=cli._generate_reproduce_plan,
)
finally:
_attempt_mode_completion_commit(
repo_dir=repo_dir,
args=args,
mode_name="reproduce",
)
[docs]
def cmd_research(args: argparse.Namespace) -> int:
"""Run multi-task autonomous research orchestration.
Dependency note:
- `research` delegates task execution to `loop`.
- `loop` depends on the same execution/routing stack used by `exec`.
"""
cli = _cli()
repo_dir = Path.cwd().resolve()
try:
return cmd_plan_workflow(
args,
workflow_name="research",
runs_dir_name=cli.RESEARCH_RUNS_DIR,
generate_plan=cli._generate_research_plan,
)
finally:
_attempt_mode_completion_commit(
repo_dir=repo_dir,
args=args,
mode_name="research",
)