Skip to content

helicon.runner

Simulation launch, hardware detection, batch submission, checkpoints, and convergence.

Hardware Detection

helicon.runner.hardware_config.HardwareInfo(platform, arch, cpu_count, is_apple_silicon=False, apple_chip=None, has_nvidia_gpu=False, nvidia_gpu_name=None, cuda_version=None, has_mlx=False, mlx_version=None, has_pywarpx=False, warpx_version=None, has_warpx_metal=False, warpx_metal_exe_2d=None, warpx_metal_exe_3d=None, warpx_metal_root=None, recommended_backend='cpu', omp_num_threads=1) dataclass

Detected hardware capabilities.

Functions

summary()

Human-readable hardware summary.

Source code in src/helicon/runner/hardware_config.py
def summary(self) -> str:
    """Human-readable hardware summary."""
    lines = [
        f"Platform: {self.platform} ({self.arch})",
        f"CPU cores: {self.cpu_count}",
    ]
    if self.is_apple_silicon:
        lines.append(f"Apple Silicon: {self.apple_chip or 'yes'}")
    if self.has_nvidia_gpu:
        lines.append(f"NVIDIA GPU: {self.nvidia_gpu_name or 'detected'}")
        if self.cuda_version:
            lines.append(f"CUDA: {self.cuda_version}")
    if self.has_mlx:
        lines.append(f"MLX: {self.mlx_version or 'available'}")
    if self.has_pywarpx:
        lines.append(f"WarpX: {self.warpx_version or 'available'}")
    if self.has_warpx_metal:
        lines.append(f"WarpX Metal: {self.warpx_metal_root or 'detected'}")
    lines.append(f"Recommended backend: {self.recommended_backend}")
    lines.append(f"OMP threads: {self.omp_num_threads}")
    return "\n".join(lines)

helicon.runner.hardware_config.detect_hardware()

Detect available hardware and recommend a WarpX backend.

Source code in src/helicon/runner/hardware_config.py
def detect_hardware() -> HardwareInfo:
    """Detect available hardware and recommend a WarpX backend."""
    info = HardwareInfo(
        platform=platform.system().lower(),
        arch=platform.machine(),
        cpu_count=os.cpu_count() or 1,
    )

    # Apple Silicon detection
    if info.platform == "darwin" and info.arch == "arm64":
        info.is_apple_silicon = True
        try:
            result = subprocess.run(
                ["sysctl", "-n", "machdep.cpu.brand_string"],
                capture_output=True,
                text=True,
                timeout=5,
            )
            if result.returncode == 0:
                info.apple_chip = result.stdout.strip()
        except (FileNotFoundError, subprocess.TimeoutExpired):
            pass

    # NVIDIA GPU detection
    if shutil.which("nvidia-smi"):
        try:
            result = subprocess.run(
                ["nvidia-smi", "--query-gpu=name", "--format=csv,noheader"],
                capture_output=True,
                text=True,
                timeout=10,
            )
            if result.returncode == 0 and result.stdout.strip():
                info.has_nvidia_gpu = True
                info.nvidia_gpu_name = result.stdout.strip().split("\n")[0]

            result = subprocess.run(
                ["nvidia-smi", "--query-gpu=driver_version", "--format=csv,noheader"],
                capture_output=True,
                text=True,
                timeout=10,
            )
            if result.returncode == 0:
                info.cuda_version = result.stdout.strip().split("\n")[0]
        except (FileNotFoundError, subprocess.TimeoutExpired):
            pass

    # MLX detection
    try:
        import mlx.core as mx

        info.has_mlx = True
        info.mlx_version = getattr(mx, "__version__", "unknown")
    except ImportError:
        pass

    # pywarpx detection
    try:
        import pywarpx

        info.has_pywarpx = True
        info.warpx_version = getattr(pywarpx, "__version__", "unknown")
    except ImportError:
        pass

    # warpx-metal detection (Apple Silicon native Metal GPU backend)
    if info.is_apple_silicon:
        try:
            from helicon.runner.metal_runner import detect_warpx_metal

            metal = detect_warpx_metal()
            if metal.valid:
                info.has_warpx_metal = True
                info.warpx_metal_root = str(metal.root)
                info.warpx_metal_exe_2d = str(metal.exe_2d) if metal.exe_2d else None
                info.warpx_metal_exe_3d = str(metal.exe_3d) if metal.exe_3d else None
        except Exception:
            pass

    # Recommended backend
    if info.has_nvidia_gpu:
        info.recommended_backend = "cuda"
    elif info.has_warpx_metal:
        info.recommended_backend = "metal"  # Apple Silicon GPU via SYCL/Metal
    elif info.is_apple_silicon:
        info.recommended_backend = "omp"  # Metal build not found
    else:
        info.recommended_backend = "omp"

    # OMP threads — use all cores unless overridden
    info.omp_num_threads = int(os.environ.get("OMP_NUM_THREADS", info.cpu_count))

    return info

Launch

helicon.runner.launch.RunResult(output_dir, input_file, bfield_file, success, wall_time_seconds, metadata) dataclass

Result of a WarpX simulation run.

helicon.runner.launch.run_simulation(config, *, output_dir=None, hardware=None, dry_run=False)

Run a WarpX simulation from a Helicon configuration.

Parameters:

Name Type Description Default
config SimConfig

Simulation configuration.

required
output_dir path

Override output directory (defaults to config.output_dir).

None
hardware HardwareInfo

Pre-detected hardware info. Auto-detected if not provided.

None
dry_run bool

If True, generate input files but do not launch WarpX.

False

Returns:

Type Description
RunResult
Source code in src/helicon/runner/launch.py
def run_simulation(
    config: SimConfig,
    *,
    output_dir: str | Path | None = None,
    hardware: HardwareInfo | None = None,
    dry_run: bool = False,
) -> RunResult:
    """Run a WarpX simulation from a Helicon configuration.

    Parameters
    ----------
    config : SimConfig
        Simulation configuration.
    output_dir : path, optional
        Override output directory (defaults to ``config.output_dir``).
    hardware : HardwareInfo, optional
        Pre-detected hardware info. Auto-detected if not provided.
    dry_run : bool
        If True, generate input files but do not launch WarpX.

    Returns
    -------
    RunResult
    """
    if hardware is None:
        hardware = detect_hardware()

    out = Path(output_dir or config.output_dir)
    out.mkdir(parents=True, exist_ok=True)

    t0 = time.monotonic()

    # Step 1: Pre-compute applied B-field
    bfield_path = _precompute_bfield(config, out)

    # Step 2: Generate WarpX input
    input_path = write_warpx_input(config, out / "warpx_input")

    # Step 3: Collect metadata
    meta = collect_metadata(config)
    meta["hardware"] = hardware.summary()
    # Flag non-physical configurations for downstream citation guards (§14)
    mr = config.plasma.mass_ratio
    meta["mass_ratio_reduced"] = mr is not None and mr < 1836.0
    meta["electron_model"] = config.plasma.electron_model

    # Validation proximity
    try:
        from helicon.validate.proximity import config_proximity

        prox = config_proximity(config)
        meta["validation_proximity"] = {
            "nearest_case": prox.nearest_case,
            "distance": prox.distance,
            "in_validated_region": prox.in_validated_region,
            "warning": prox.warning,
        }
    except Exception:
        meta["validation_proximity"] = None

    meta_path = out / "run_metadata.json"
    meta_path.write_text(json.dumps(meta, indent=2, default=str))

    if dry_run:
        wall = time.monotonic() - t0
        return RunResult(
            output_dir=out,
            input_file=input_path,
            bfield_file=bfield_path,
            success=True,
            wall_time_seconds=wall,
            metadata=meta,
        )

    # Step 4: Launch WarpX
    # Try Metal backend first (Apple Silicon native GPU via SYCL/AdaptiveCpp).
    # The warpx-metal build is warpx.2d (2D Cartesian, single precision, AMReX format).
    # If the inputs use RZ geometry or openPMD diagnostics, adapt them automatically.
    if hardware.has_warpx_metal:
        from helicon.runner.metal_runner import detect_warpx_metal, run_warpx_metal

        metal_info = detect_warpx_metal()
        if metal_info.valid and metal_info.exe_2d is not None:
            os.environ["OMP_NUM_THREADS"] = str(hardware.omp_num_threads)
            _inputs_text = _adapt_inputs_for_metal(input_path.read_text())
            metal_result = run_warpx_metal(
                metal_info=metal_info,
                output_dir=out,
                inputs_content=_inputs_text,
                timeout_s=3600 * 24,
                progress=True,
            )
            wall = time.monotonic() - t0
            meta["wall_time_seconds"] = wall
            meta["backend"] = "metal"
            meta["warpx_returncode"] = metal_result.exit_code
            if not metal_result.success and metal_result.error:
                meta["error"] = metal_result.error
            meta_path.write_text(json.dumps(meta, indent=2, default=str))
            return RunResult(
                output_dir=out,
                input_file=input_path,
                bfield_file=bfield_path,
                success=metal_result.success,
                wall_time_seconds=wall,
                metadata=meta,
            )

    if not hardware.has_pywarpx:
        msg = (
            "pywarpx is not installed. Install WarpX with Python bindings to "
            "run simulations. See: https://warpx.readthedocs.io/en/latest/install/\n"
            "Use dry_run=True to generate input files without running."
        )
        raise RuntimeError(msg)

    # Set OMP threads
    os.environ["OMP_NUM_THREADS"] = str(hardware.omp_num_threads)

    import shutil
    import subprocess
    import sys

    # Build WarpX command: python -m pywarpx.WarpX <input_file>
    cmd = [sys.executable, "-m", "pywarpx.WarpX", str(input_path)]

    # On Linux with NVIDIA GPU, try mpirun for multi-rank execution
    if hardware.platform == "linux" and hardware.has_nvidia_gpu:
        mpirun = shutil.which("mpirun") or shutil.which("mpiexec")
        if mpirun:
            n_ranks = max(1, hardware.cpu_count // 4)  # 1 rank per 4 CPU threads
            cmd = [mpirun, "-n", str(n_ranks), *cmd]

    log_path = out / "warpx.log"
    success = False
    try:
        with log_path.open("w") as log_fh:
            proc = subprocess.run(
                cmd,
                cwd=str(out),
                stdout=log_fh,
                stderr=subprocess.STDOUT,
                timeout=3600 * 24,  # 24-hour max
            )
        success = proc.returncode == 0
        if not success:
            meta["warpx_returncode"] = proc.returncode
    except subprocess.TimeoutExpired:
        meta["error"] = "WarpX simulation timed out after 24 hours"
    except Exception as exc:
        meta["error"] = str(exc)

    wall = time.monotonic() - t0
    meta["wall_time_seconds"] = wall

    # Update metadata
    meta_path.write_text(json.dumps(meta, indent=2, default=str))

    return RunResult(
        output_dir=out,
        input_file=input_path,
        bfield_file=bfield_path,
        success=success,
        wall_time_seconds=wall,
        metadata=meta,
    )

Batch Submission

helicon.runner.batch.BatchConfig(backend='local', n_workers=4, slurm_partition=None, slurm_account=None, slurm_time='12:00:00', slurm_ntasks=1, slurm_cpus_per_task=16, slurm_mem='64G', pbs_queue=None, pbs_walltime=None, pbs_ncpus=16) dataclass

Configuration for batch job submission.

helicon.runner.batch.BatchJob(index, config, output_dir, job_id=None, status='pending') dataclass

A single job within a batch run.

helicon.runner.batch.BatchResult(jobs=list(), n_completed=0, n_failed=0, wall_time_seconds=0.0) dataclass

Result of a batch run submission.

helicon.runner.batch.run_local_batch(configs, output_base, *, n_workers=4, dry_run=False)

Run a batch of simulations locally using parallel workers.

Parameters:

Name Type Description Default
configs list[SimConfig]

Simulation configurations to run.

required
output_base path

Base output directory. Each run goes to output_base/run_NNNN/.

required
n_workers int

Number of parallel workers.

4
dry_run bool

If True, create output directories but do not launch simulations.

False

Returns:

Type Description
BatchResult
Source code in src/helicon/runner/batch.py
def run_local_batch(
    configs: list[SimConfig],
    output_base: str | Path,
    *,
    n_workers: int = 4,
    dry_run: bool = False,
) -> BatchResult:
    """Run a batch of simulations locally using parallel workers.

    Parameters
    ----------
    configs : list[SimConfig]
        Simulation configurations to run.
    output_base : path
        Base output directory. Each run goes to ``output_base/run_NNNN/``.
    n_workers : int
        Number of parallel workers.
    dry_run : bool
        If True, create output directories but do not launch simulations.

    Returns
    -------
    BatchResult
    """
    output_base = Path(output_base)
    output_base.mkdir(parents=True, exist_ok=True)

    jobs = [
        BatchJob(
            index=i,
            config=cfg,
            output_dir=output_base / f"run_{i:04d}",
        )
        for i, cfg in enumerate(configs)
    ]

    t0 = time.monotonic()

    executor_cls = ThreadPoolExecutor if dry_run else ProcessPoolExecutor
    worker_fn = _run_single_dry if dry_run else _run_single

    with executor_cls(max_workers=n_workers) as executor:
        completed_jobs = list(executor.map(worker_fn, jobs))

    wall = time.monotonic() - t0
    n_completed = sum(1 for j in completed_jobs if j.status == "completed")
    n_failed = sum(1 for j in completed_jobs if j.status == "failed")

    return BatchResult(
        jobs=completed_jobs,
        n_completed=n_completed,
        n_failed=n_failed,
        wall_time_seconds=wall,
    )

helicon.runner.batch.submit_batch(configs, batch_config, output_base, *, dry_run=False)

Submit a batch of simulations using the configured backend.

Parameters:

Name Type Description Default
configs list[SimConfig]

Simulation configurations to run.

required
batch_config BatchConfig

Backend and resource configuration.

required
output_base path

Base output directory.

required
dry_run bool

If True, prepare but do not actually submit/run.

False

Returns:

Type Description
BatchResult
Source code in src/helicon/runner/batch.py
def submit_batch(
    configs: list[SimConfig],
    batch_config: BatchConfig,
    output_base: str | Path,
    *,
    dry_run: bool = False,
) -> BatchResult:
    """Submit a batch of simulations using the configured backend.

    Parameters
    ----------
    configs : list[SimConfig]
        Simulation configurations to run.
    batch_config : BatchConfig
        Backend and resource configuration.
    output_base : path
        Base output directory.
    dry_run : bool
        If True, prepare but do not actually submit/run.

    Returns
    -------
    BatchResult
    """
    if batch_config.backend == "local":
        return run_local_batch(
            configs, output_base, n_workers=batch_config.n_workers, dry_run=dry_run
        )

    output_base = Path(output_base)
    output_base.mkdir(parents=True, exist_ok=True)

    jobs: list[BatchJob] = []
    t0 = time.monotonic()

    for i, cfg in enumerate(configs):
        out_dir = output_base / f"run_{i:04d}"
        out_dir.mkdir(parents=True, exist_ok=True)

        # Write config for the job
        cfg.to_yaml(out_dir / "config.yaml")

        job = BatchJob(index=i, config=cfg, output_dir=out_dir)

        if batch_config.backend == "slurm":
            script = generate_slurm_script(
                cfg,
                out_dir,
                "helicon",
                partition=batch_config.slurm_partition,
                account=batch_config.slurm_account,
                time=batch_config.slurm_time,
                ntasks=batch_config.slurm_ntasks,
                cpus_per_task=batch_config.slurm_cpus_per_task,
                mem=batch_config.slurm_mem,
            )
            script_path = out_dir / "submit.sh"
            script_path.write_text(script)

            if not dry_run:
                try:
                    result = subprocess.run(
                        ["sbatch", str(script_path)],
                        capture_output=True,
                        text=True,
                        check=False,
                    )
                    if result.returncode == 0:
                        # Parse job ID from "Submitted batch job 12345"
                        parts = result.stdout.strip().split()
                        job.job_id = parts[-1] if parts else None
                        job.status = "running"
                    else:
                        warnings.warn(
                            f"sbatch submission failed: {result.stderr.strip()}",
                            stacklevel=2,
                        )
                        job.status = "failed"
                except FileNotFoundError:
                    warnings.warn(
                        "sbatch command not found. Is SLURM installed?",
                        stacklevel=2,
                    )
                    job.status = "failed"
            else:
                job.status = "completed"

        elif batch_config.backend == "pbs":
            script = generate_pbs_script(
                cfg,
                out_dir,
                "helicon",
                queue=batch_config.pbs_queue,
                walltime=batch_config.pbs_walltime,
                ncpus=batch_config.pbs_ncpus,
            )
            script_path = out_dir / "submit.sh"
            script_path.write_text(script)

            if not dry_run:
                try:
                    result = subprocess.run(
                        ["qsub", str(script_path)],
                        capture_output=True,
                        text=True,
                        check=False,
                    )
                    if result.returncode == 0:
                        job.job_id = result.stdout.strip()
                        job.status = "running"
                    else:
                        warnings.warn(
                            f"qsub submission failed: {result.stderr.strip()}",
                            stacklevel=2,
                        )
                        job.status = "failed"
                except FileNotFoundError:
                    warnings.warn(
                        "qsub command not found. Is PBS installed?",
                        stacklevel=2,
                    )
                    job.status = "failed"
            else:
                job.status = "completed"

        else:
            msg = f"Unknown backend: {batch_config.backend!r}"
            raise ValueError(msg)

        jobs.append(job)

    wall = time.monotonic() - t0
    n_completed = sum(1 for j in jobs if j.status == "completed")
    n_failed = sum(1 for j in jobs if j.status == "failed")

    return BatchResult(
        jobs=jobs,
        n_completed=n_completed,
        n_failed=n_failed,
        wall_time_seconds=wall,
    )

helicon.runner.batch.generate_slurm_script(config, output_dir, helicon_exe, *, partition=None, account=None, time='12:00:00', ntasks=1, cpus_per_task=16, mem='64G')

Generate a SLURM sbatch submission script.

Parameters:

Name Type Description Default
config SimConfig

Simulation configuration.

required
output_dir Path

Directory for this run's output.

required
helicon_exe str

Command or path for the helicon executable.

required
partition str | None

SLURM resource directives.

None
account str | None

SLURM resource directives.

None
time str | None

SLURM resource directives.

None
ntasks str | None

SLURM resource directives.

None
cpus_per_task str | None

SLURM resource directives.

None
mem str | None

SLURM resource directives.

None

Returns:

Type Description
str

Complete SLURM script content.

Source code in src/helicon/runner/batch.py
def generate_slurm_script(
    config: SimConfig,
    output_dir: Path,
    helicon_exe: str,
    *,
    partition: str | None = None,
    account: str | None = None,
    time: str = "12:00:00",
    ntasks: int = 1,
    cpus_per_task: int = 16,
    mem: str = "64G",
) -> str:
    """Generate a SLURM sbatch submission script.

    Parameters
    ----------
    config : SimConfig
        Simulation configuration.
    output_dir : Path
        Directory for this run's output.
    helicon_exe : str
        Command or path for the helicon executable.
    partition, account, time, ntasks, cpus_per_task, mem
        SLURM resource directives.

    Returns
    -------
    str
        Complete SLURM script content.
    """
    config_path = output_dir / "config.yaml"
    lines = [
        "#!/bin/bash",
        "#SBATCH --job-name=helicon",
        f"#SBATCH --output={output_dir / 'slurm_%j.out'}",
        f"#SBATCH --error={output_dir / 'slurm_%j.err'}",
        f"#SBATCH --ntasks={ntasks}",
        f"#SBATCH --cpus-per-task={cpus_per_task}",
        f"#SBATCH --mem={mem}",
        f"#SBATCH --time={time}",
    ]
    if partition:
        lines.append(f"#SBATCH --partition={partition}")
    if account:
        lines.append(f"#SBATCH --account={account}")

    lines.extend(
        [
            "",
            f"cd {output_dir}",
            f"{helicon_exe} run {config_path}",
        ]
    )

    return "\n".join(lines) + "\n"

helicon.runner.batch.generate_pbs_script(config, output_dir, helicon_exe, *, queue=None, walltime=None, ncpus=16)

Generate a PBS qsub submission script.

Parameters:

Name Type Description Default
config SimConfig

Simulation configuration.

required
output_dir Path

Directory for this run's output.

required
helicon_exe str

Command or path for the helicon executable.

required
queue str | None

PBS resource directives.

None
walltime str | None

PBS resource directives.

None
ncpus str | None

PBS resource directives.

None

Returns:

Type Description
str

Complete PBS script content.

Source code in src/helicon/runner/batch.py
def generate_pbs_script(
    config: SimConfig,
    output_dir: Path,
    helicon_exe: str,
    *,
    queue: str | None = None,
    walltime: str | None = None,
    ncpus: int = 16,
) -> str:
    """Generate a PBS qsub submission script.

    Parameters
    ----------
    config : SimConfig
        Simulation configuration.
    output_dir : Path
        Directory for this run's output.
    helicon_exe : str
        Command or path for the helicon executable.
    queue, walltime, ncpus
        PBS resource directives.

    Returns
    -------
    str
        Complete PBS script content.
    """
    config_path = output_dir / "config.yaml"
    lines = [
        "#!/bin/bash",
        "#PBS -N helicon",
        f"#PBS -o {output_dir / 'pbs_output.log'}",
        f"#PBS -e {output_dir / 'pbs_error.log'}",
        f"#PBS -l ncpus={ncpus}",
    ]
    if queue:
        lines.append(f"#PBS -q {queue}")
    if walltime:
        lines.append(f"#PBS -l walltime={walltime}")

    lines.extend(
        [
            "",
            f"cd {output_dir}",
            f"{helicon_exe} run {config_path}",
        ]
    )

    return "\n".join(lines) + "\n"

Checkpoints

helicon.runner.checkpoints.CheckpointInfo(path, step, timestamp, size_bytes) dataclass

Information about a single WarpX checkpoint.

helicon.runner.checkpoints.find_checkpoints(output_dir)

Search for WarpX checkpoint directories.

Looks for directories matching chk* or diags/chk* patterns.

Parameters:

Name Type Description Default
output_dir path

Simulation output directory to search.

required

Returns:

Type Description
list[CheckpointInfo]

Checkpoints sorted by step number ascending. Empty list if none found.

Source code in src/helicon/runner/checkpoints.py
def find_checkpoints(output_dir: str | Path) -> list[CheckpointInfo]:
    """Search for WarpX checkpoint directories.

    Looks for directories matching ``chk*`` or ``diags/chk*`` patterns.

    Parameters
    ----------
    output_dir : path
        Simulation output directory to search.

    Returns
    -------
    list[CheckpointInfo]
        Checkpoints sorted by step number ascending. Empty list if none found.
    """
    output_dir = Path(output_dir)
    checkpoints: list[CheckpointInfo] = []

    # Search patterns: chk* in output_dir and in output_dir/diags/
    search_dirs = [output_dir, output_dir / "diags"]
    for search_dir in search_dirs:
        if not search_dir.is_dir():
            continue
        for entry in search_dir.iterdir():
            if entry.is_dir() and entry.name.startswith("chk"):
                step = _parse_step(entry.name)
                if step is not None:
                    stat = entry.stat()
                    checkpoints.append(
                        CheckpointInfo(
                            path=entry,
                            step=step,
                            timestamp=datetime.fromtimestamp(stat.st_mtime),
                            size_bytes=_dir_size(entry),
                        )
                    )

    checkpoints.sort(key=lambda c: c.step)
    return checkpoints

helicon.runner.checkpoints.find_latest_checkpoint(output_dir)

Return the checkpoint with the highest step number.

Parameters:

Name Type Description Default
output_dir path

Simulation output directory to search.

required

Returns:

Type Description
CheckpointInfo or None

The latest checkpoint, or None if no checkpoints exist.

Source code in src/helicon/runner/checkpoints.py
def find_latest_checkpoint(output_dir: str | Path) -> CheckpointInfo | None:
    """Return the checkpoint with the highest step number.

    Parameters
    ----------
    output_dir : path
        Simulation output directory to search.

    Returns
    -------
    CheckpointInfo or None
        The latest checkpoint, or None if no checkpoints exist.
    """
    chks = find_checkpoints(output_dir)
    return chks[-1] if chks else None

helicon.runner.checkpoints.cleanup_checkpoints(output_dir, *, keep_latest=True, keep_steps=None)

Delete checkpoint directories.

Parameters:

Name Type Description Default
output_dir path

Simulation output directory.

required
keep_latest bool

If True, keep the most recent checkpoint.

True
keep_steps list[int]

Specific step numbers to keep.

None
Source code in src/helicon/runner/checkpoints.py
def cleanup_checkpoints(
    output_dir: str | Path,
    *,
    keep_latest: bool = True,
    keep_steps: list[int] | None = None,
) -> None:
    """Delete checkpoint directories.

    Parameters
    ----------
    output_dir : path
        Simulation output directory.
    keep_latest : bool
        If True, keep the most recent checkpoint.
    keep_steps : list[int], optional
        Specific step numbers to keep.
    """
    chks = find_checkpoints(output_dir)
    if not chks:
        return

    keep_set: set[int] = set()
    if keep_steps:
        keep_set.update(keep_steps)
    if keep_latest:
        keep_set.add(chks[-1].step)

    for chk in chks:
        if chk.step not in keep_set:
            shutil.rmtree(chk.path)

helicon.runner.checkpoints.get_restart_flag(output_dir)

Get WarpX restart flag if a checkpoint exists.

Parameters:

Name Type Description Default
output_dir path

Simulation output directory.

required

Returns:

Type Description
str or None

WarpX --restart <path> flag string, or None if no checkpoint.

Source code in src/helicon/runner/checkpoints.py
def get_restart_flag(output_dir: str | Path) -> str | None:
    """Get WarpX restart flag if a checkpoint exists.

    Parameters
    ----------
    output_dir : path
        Simulation output directory.

    Returns
    -------
    str or None
        WarpX ``--restart <path>`` flag string, or None if no checkpoint.
    """
    latest = find_latest_checkpoint(output_dir)
    if latest is None:
        return None
    return f"--restart {latest.path}"

Grid Convergence

helicon.runner.convergence.ConvergenceResult(levels, convergence_order, extrapolated_thrust_N, converged) dataclass

Full grid convergence study result.

Attributes:

Name Type Description
levels list of ConvergenceLevel

One entry per resolution, ordered coarse → fine.

convergence_order float or None

Richardson-estimated formal convergence order p.

extrapolated_thrust_N float or None

Richardson-extrapolated (grid-converged) thrust estimate.

converged bool

True if the relative change between the two finest levels is below tol.

helicon.runner.convergence.run_convergence_study(base_config, resolutions, *, output_base='convergence_study', dry_run=False, tol=0.05)

Run a grid convergence study at multiple resolutions.

Parameters:

Name Type Description Default
base_config SimConfig

Base simulation configuration; only resolution is varied.

required
resolutions list of (nz, nr) tuples

Grid resolutions to run, ordered coarse → fine. Recommended: three levels with factor-2 refinement, e.g. [(128, 64), (256, 128), (512, 256)].

required
output_base path

Root output directory; each level gets a subdirectory.

'convergence_study'
dry_run bool

Generate inputs without launching WarpX.

False
tol float

Relative convergence tolerance between the two finest levels.

0.05

Returns:

Type Description
ConvergenceResult
Source code in src/helicon/runner/convergence.py
def run_convergence_study(
    base_config: SimConfig,
    resolutions: list[tuple[int, int]],
    *,
    output_base: str | Path = "convergence_study",
    dry_run: bool = False,
    tol: float = 0.05,
) -> ConvergenceResult:
    """Run a grid convergence study at multiple resolutions.

    Parameters
    ----------
    base_config : SimConfig
        Base simulation configuration; only resolution is varied.
    resolutions : list of (nz, nr) tuples
        Grid resolutions to run, ordered coarse → fine.
        Recommended: three levels with factor-2 refinement,
        e.g. ``[(128, 64), (256, 128), (512, 256)]``.
    output_base : path
        Root output directory; each level gets a subdirectory.
    dry_run : bool
        Generate inputs without launching WarpX.
    tol : float
        Relative convergence tolerance between the two finest levels.

    Returns
    -------
    ConvergenceResult
    """
    from helicon.runner.launch import run_simulation

    output_base = Path(output_base)
    levels: list[ConvergenceLevel] = []

    for nz, nr in resolutions:
        config_i = _modified_config(base_config, nz=nz, nr=nr)
        h_i = 1.0 / math.sqrt(nz * nr)
        level_dir = output_base / f"nz{nz}_nr{nr}"

        run_result = run_simulation(config_i, output_dir=level_dir, dry_run=dry_run)

        thrust_N: float | None = None
        if run_result.success and not dry_run:
            try:
                from helicon.postprocess.thrust import compute_thrust

                t = compute_thrust(level_dir)
                thrust_N = t.thrust_N
            except (FileNotFoundError, ValueError):
                pass

        levels.append(
            ConvergenceLevel(
                nz=nz,
                nr=nr,
                h=h_i,
                output_dir=level_dir,
                success=run_result.success,
                thrust_N=thrust_N,
            )
        )

    # Richardson extrapolation (requires ≥ 3 levels with valid thrust)
    thrust_values = [lv.thrust_N for lv in levels if lv.thrust_N is not None]
    conv_order: float | None = None
    extrap: float | None = None
    converged = False

    if len(thrust_values) >= 3:
        h_vals = [lv.h for lv in levels if lv.thrust_N is not None]
        h_ratios = [h_vals[i] / h_vals[i + 1] for i in range(len(h_vals) - 1)]
        p, extrap = richardson_extrapolate(thrust_values[-3:], h_ratios[-2:])
        conv_order = p if not math.isnan(p) else None

        # Check convergence between two finest levels
        q_fine = thrust_values[-1]
        q_mid = thrust_values[-2]
        if abs(q_fine) > 1e-20:
            converged = abs(q_fine - q_mid) / abs(q_fine) < tol

    elif len(thrust_values) == 2:
        q_fine = thrust_values[-1]
        q_mid = thrust_values[-2]
        if abs(q_fine) > 1e-20:
            converged = abs(q_fine - q_mid) / abs(q_fine) < tol

    return ConvergenceResult(
        levels=levels,
        convergence_order=conv_order,
        extrapolated_thrust_N=extrap,
        converged=converged,
    )

Metal GPU Runner

helicon.runner.metal_runner.WarpXMetalInfo(root, exe_2d, exe_3d, acpp_bin, valid) dataclass

Paths and availability for a warpx-metal build.

helicon.runner.metal_runner.WarpXMetalDiag(diag_dir, step, time_s, field_vars, n_cells, domain_lo, domain_hi, species=list()) dataclass

Parsed AMReX/WarpX diagnostic snapshot.

Extracted from the Header and WarpXHeader files written to each diags/diag<step>/ directory.

Functions

from_dir(diag_dir) classmethod

Parse a diags/diag<step>/ directory.

Parameters:

Name Type Description Default
diag_dir str | Path

Path to the AMReX output directory (contains Header file).

required

Returns:

Type Description
WarpXMetalDiag
Source code in src/helicon/runner/metal_runner.py
@classmethod
def from_dir(cls, diag_dir: str | Path) -> WarpXMetalDiag:
    """Parse a ``diags/diag<step>/`` directory.

    Parameters
    ----------
    diag_dir:
        Path to the AMReX output directory (contains ``Header`` file).

    Returns
    -------
    WarpXMetalDiag
    """
    diag_dir = Path(diag_dir)
    header_path = diag_dir / "Header"
    if not header_path.exists():
        msg = f"AMReX Header not found: {header_path}"
        raise FileNotFoundError(msg)

    lines = header_path.read_text().splitlines()
    idx = 0

    # Line 0: version ("HyperCLaw-V1.1")
    idx += 1

    # Line 1: n_components
    n_comp = int(lines[idx])
    idx += 1

    # Lines 2..2+n_comp: component names
    field_vars = []
    for _ in range(n_comp):
        field_vars.append(lines[idx].strip())
        idx += 1

    # n_levels
    _n_levels = int(lines[idx])
    idx += 1

    # time
    time_s = float(lines[idx])
    idx += 1

    # finest level
    idx += 1  # skip

    # prob_lo, prob_hi
    lo_parts = lines[idx].split()
    idx += 1
    hi_parts = lines[idx].split()
    idx += 1
    domain_lo = tuple(float(x) for x in lo_parts)
    domain_hi = tuple(float(x) for x in hi_parts)

    # Skip blank line
    while idx < len(lines) and not lines[idx].strip():
        idx += 1

    # Domain box: ((0,0) (nx-1,ny-1) (0,0))
    box_line = lines[idx] if idx < len(lines) else ""
    idx += 1
    n_cells: tuple[int, ...] = ()
    if box_line:
        import re

        nums = re.findall(r"\d+", box_line)
        # AMReX box format: ((lo_x,lo_y) (hi_x,hi_y) (type_x,type_y))
        # nums has 3*ndim values: lo coords, then hi coords, then cell types
        if len(nums) >= 4 and len(nums) % 3 == 0:
            ndim = len(nums) // 3
            n_cells = tuple(int(nums[ndim + i]) - int(nums[i]) + 1 for i in range(ndim))

    # Infer step from directory name (diag<step>)
    stem = diag_dir.name  # e.g. "diag1000004"
    digits = "".join(c for c in stem if c.isdigit())
    step = int(digits) if digits else 0

    # Detect species from particle subdirectories (each has its own Header)
    species: list[str] = [
        d.name
        for d in sorted(diag_dir.iterdir())
        if d.is_dir() and (d / "Header").exists() and d.name != "Level_0"
    ]

    return cls(
        diag_dir=diag_dir,
        step=step,
        time_s=time_s,
        field_vars=field_vars,
        n_cells=n_cells,
        domain_lo=domain_lo,
        domain_hi=domain_hi,
        species=species,
    )

read_fields()

Read all field arrays from multi-box AMReX FAB binary.

Parses Level_0/Cell_H for tile box extents and byte offsets, reads each tile from Cell_D_00000, and assembles the full domain.

Returns a dict mapping field name → numpy.ndarray of shape (nx, ny), float32, Fortran-order within each tile. Requires numpy.

Source code in src/helicon/runner/metal_runner.py
def read_fields(self) -> dict[str, Any]:
    """Read all field arrays from multi-box AMReX FAB binary.

    Parses ``Level_0/Cell_H`` for tile box extents and byte offsets,
    reads each tile from ``Cell_D_00000``, and assembles the full domain.

    Returns a dict mapping field name → ``numpy.ndarray`` of shape
    ``(nx, ny)``, float32, Fortran-order within each tile.
    Requires ``numpy``.
    """
    import re

    import numpy as np

    level_dir = self.diag_dir / "Level_0"
    cell_h_path = level_dir / "Cell_H"
    fab_path = level_dir / "Cell_D_00000"

    if not fab_path.exists() or not cell_h_path.exists():
        return {}

    n_comp = len(self.field_vars)
    if not self.n_cells or len(self.n_cells) < 2:
        return {}
    nx, ny = self.n_cells[0], self.n_cells[1]

    # --- Parse Cell_H ---
    # Format:
    #   (N R
    #   ((lo_x,lo_y) (hi_x,hi_y) (type_x,type_y))
    #   ...
    #   )
    #   N
    #   FabOnDisk: Cell_D_00000 <byte_offset>
    #   ...
    cell_h_text = cell_h_path.read_text()

    boxes = [
        (int(m.group(1)), int(m.group(2)), int(m.group(3)), int(m.group(4)))
        for m in re.finditer(r"\(\((\d+),(\d+)\)\s*\((\d+),(\d+)\)", cell_h_text)
    ]
    fab_offsets = [
        int(m.group(1)) for m in re.finditer(r"FabOnDisk:\s+\S+\s+(\d+)", cell_h_text)
    ]

    n_fabs = len(fab_offsets)
    if n_fabs == 0 or len(boxes) < n_fabs:
        return {}
    # Take the last n_fabs boxes (guard against any extra header box)
    boxes = boxes[-n_fabs:]

    fab_data = fab_path.read_bytes()
    fields: dict[str, Any] = {
        name: np.zeros((nx, ny), dtype=np.float32) for name in self.field_vars
    }

    for (lo_x, lo_y, hi_x, hi_y), byte_offset in zip(boxes, fab_offsets):
        bx = hi_x - lo_x + 1
        by = hi_y - lo_y + 1

        # Each FAB in the file starts with an ASCII header line then binary data
        nl = fab_data.index(b"\n", byte_offset)
        fab_header = fab_data[byte_offset:nl].decode("ascii", errors="replace")

        # Detect real size from "FAB ((N," prefix (4 = float32, 8 = float64)
        rs_match = re.search(r"FAB\s+\(\((\d+),", fab_header)
        real_size = int(rs_match.group(1)) if rs_match else 4
        dtype = np.dtype("<f4") if real_size == 4 else np.dtype("<f8")

        data_start = nl + 1
        n_vals = n_comp * bx * by
        chunk = np.frombuffer(
            fab_data[data_start : data_start + n_vals * real_size], dtype=dtype
        ).astype(np.float32)

        for comp_idx, name in enumerate(self.field_vars):
            tile = chunk[comp_idx * bx * by : (comp_idx + 1) * bx * by]
            fields[name][lo_x : hi_x + 1, lo_y : hi_y + 1] = tile.reshape(
                bx, by, order="F"
            )

    return fields

helicon.runner.metal_runner.MetalRunResult(success, exit_code, output_dir, log_path, wall_time_s, steps_completed, diags, error=None) dataclass

Result of a warpx-metal native GPU simulation.

helicon.runner.metal_runner.detect_warpx_metal(hint=None)

Detect a warpx-metal build and return its paths.

Parameters:

Name Type Description Default
hint str | Path | None

Override search path. If None, uses :func:find_warpx_metal_root.

None

Returns:

Type Description
WarpXMetalInfo

valid=True if at least one executable was found.

Source code in src/helicon/runner/metal_runner.py
def detect_warpx_metal(hint: str | Path | None = None) -> WarpXMetalInfo:
    """Detect a warpx-metal build and return its paths.

    Parameters
    ----------
    hint:
        Override search path. If None, uses :func:`find_warpx_metal_root`.

    Returns
    -------
    WarpXMetalInfo
        ``valid=True`` if at least one executable was found.
    """
    root = find_warpx_metal_root(hint)
    if root is None:
        return WarpXMetalInfo(
            root=Path("."),
            exe_2d=None,
            exe_3d=None,
            acpp_bin=None,
            valid=False,
        )

    def _find_exe(name: str) -> Path | None:
        p = root / _BUILD_SUBPATH / name
        return p if (p.exists() and os.access(p, os.X_OK)) else None

    exe_2d = _find_exe(_EXE_2D)
    exe_3d = _find_exe(_EXE_3D)

    acpp_dir = root / _ACPP_BIN
    acpp_bin = (acpp_dir / "acpp") if (acpp_dir / "acpp").exists() else None

    return WarpXMetalInfo(
        root=root,
        exe_2d=exe_2d,
        exe_3d=exe_3d,
        acpp_bin=acpp_bin,
        valid=(exe_2d is not None or exe_3d is not None),
    )

helicon.runner.metal_runner.find_warpx_metal_root(hint=None)

Search for the warpx-metal build directory.

Search order: 1. hint parameter (if provided) 2. WARPX_METAL_ROOT environment variable 3. Sibling directory ../warpx-metal relative to this module 4. ~/work/warpx-metal

Source code in src/helicon/runner/metal_runner.py
def find_warpx_metal_root(hint: str | Path | None = None) -> Path | None:
    """Search for the warpx-metal build directory.

    Search order:
    1. ``hint`` parameter (if provided)
    2. ``WARPX_METAL_ROOT`` environment variable
    3. Sibling directory ``../warpx-metal`` relative to this module
    4. ``~/work/warpx-metal``
    """
    candidates: list[Path] = []

    if hint is not None:
        candidates.append(Path(hint))

    env_root = os.environ.get("WARPX_METAL_ROOT")
    if env_root:
        candidates.append(Path(env_root))

    # Sibling to Helicon source tree
    _here = Path(__file__).resolve()
    for _ in range(5):
        _here = _here.parent
        sib = _here.parent / "warpx-metal"
        candidates.append(sib)
        if _here.name in ("helicon", "Helicon", "src"):
            break

    candidates.append(Path.home() / "work" / "warpx-metal")

    for candidate in candidates:
        if _has_metal_exe(candidate):
            return candidate.resolve()
    return None

helicon.runner.metal_runner.find_diag_dirs(output_dir)

Find and parse all AMReX diagnostic directories under output_dir.

Looks for any subdirectory named diag* that contains a Header file, parses each, and returns them sorted by step number.

Source code in src/helicon/runner/metal_runner.py
def find_diag_dirs(output_dir: str | Path) -> list[WarpXMetalDiag]:
    """Find and parse all AMReX diagnostic directories under *output_dir*.

    Looks for any subdirectory named ``diag*`` that contains a ``Header``
    file, parses each, and returns them sorted by step number.
    """
    output_dir = Path(output_dir)
    diags = []
    for d in sorted(output_dir.glob("diag*")):
        if (d / "Header").exists():
            with contextlib.suppress(Exception):
                diags.append(WarpXMetalDiag.from_dir(d))
    return sorted(diags, key=lambda x: x.step)

helicon.runner.metal_runner.generate_metal_inputs(*, n_cell=128, max_step=4, n_ppc=2, density=2e+24, domain_m=2e-05, cfl=1.0, diag_interval=4, extra=None)

Generate a WarpX input file for the Metal backend.

Produces a 2D pair-plasma (electrons + positrons) FDTD simulation, matching the configuration validated by the warpx-metal project.

Parameters:

Name Type Description Default
n_cell int

Grid cells per dimension (same in x and z for 2D).

128
max_step int

Number of PIC timesteps.

4
n_ppc int

Particles per cell per dimension (n_ppc² total per species per cell).

2
density float

Plasma number density [m⁻³].

2e+24
domain_m float

Half-width of the physical domain [m].

2e-05
cfl float

CFL number for the FDTD timestep.

1.0
diag_interval int

Diagnostic output every N steps (use max_step + 1 to suppress).

4
extra dict[str, Any] | None

Additional key=value pairs appended verbatim.

None

Returns:

Type Description
str

WarpX inputs file content.

Source code in src/helicon/runner/metal_runner.py
def generate_metal_inputs(
    *,
    n_cell: int = 128,
    max_step: int = 4,
    n_ppc: int = 2,
    density: float = 2e24,
    domain_m: float = 20e-6,
    cfl: float = 1.0,
    diag_interval: int = 4,
    extra: dict[str, Any] | None = None,
) -> str:
    """Generate a WarpX input file for the Metal backend.

    Produces a 2D pair-plasma (electrons + positrons) FDTD simulation,
    matching the configuration validated by the warpx-metal project.

    Parameters
    ----------
    n_cell:
        Grid cells per dimension (same in x and z for 2D).
    max_step:
        Number of PIC timesteps.
    n_ppc:
        Particles per cell per dimension (n_ppc² total per species per cell).
    density:
        Plasma number density [m⁻³].
    domain_m:
        Half-width of the physical domain [m].
    cfl:
        CFL number for the FDTD timestep.
    diag_interval:
        Diagnostic output every N steps (use ``max_step + 1`` to suppress).
    extra:
        Additional key=value pairs appended verbatim.

    Returns
    -------
    str
        WarpX inputs file content.
    """
    domain = domain_m
    lines = [
        "# Helicon-generated WarpX Metal inputs",
        "# 2D pair plasma, FDTD, single precision",
        "",
        f"max_step = {max_step}",
        f"amr.n_cell = {n_cell} {n_cell}",
        "amr.max_level = 0",
        f"amr.max_grid_size = {n_cell}",
        "",
        "geometry.dims = 2",
        "geometry.coord_sys = 0",
        "geometry.is_periodic = 1 1",
        f"geometry.prob_lo = -{domain:.15e} -{domain:.15e}",
        f"geometry.prob_hi =  {domain:.15e}  {domain:.15e}",
        "",
        "boundary.field_lo = periodic periodic",
        "boundary.field_hi = periodic periodic",
        "",
        "algo.current_deposition = direct",
        "algo.field_gathering = energy-conserving",
        "algo.particle_shape = 1",
        f"warpx.cfl = {cfl}",
        "warpx.sort_intervals = 1",
        "warpx.use_filter = 0",
        "",
        "particles.species_names = electrons positrons",
        "particles.do_mem_efficient_sort = 1",
        "particles.do_tiling = 0",
        "",
        "electrons.charge = -q_e",
        "electrons.mass = m_e",
        "electrons.injection_style = NUniformPerCell",
        f"electrons.num_particles_per_cell_each_dim = {n_ppc} {n_ppc}",
        f"electrons.density = {density:.3e}",
        "electrons.profile = constant",
        f"electrons.xmin = -{domain:.15e}",
        f"electrons.xmax =  {domain:.15e}",
        f"electrons.zmin = -{domain:.15e}",
        f"electrons.zmax =  {domain:.15e}",
        "electrons.momentum_distribution_type = constant",
        "electrons.ux = 0.01",
        "electrons.uy = 0.0",
        "electrons.uz = 0.01",
        "",
        "positrons.charge = q_e",
        "positrons.mass = m_e",
        "positrons.injection_style = NUniformPerCell",
        f"positrons.num_particles_per_cell_each_dim = {n_ppc} {n_ppc}",
        f"positrons.density = {density:.3e}",
        "positrons.profile = constant",
        f"positrons.xmin = -{domain:.15e}",
        f"positrons.xmax =  {domain:.15e}",
        f"positrons.zmin = -{domain:.15e}",
        f"positrons.zmax =  {domain:.15e}",
        "positrons.momentum_distribution_type = constant",
        "positrons.ux = -0.01",
        "positrons.uy = 0.0",
        "positrons.uz = -0.01",
        "",
        "diagnostics.diags_names = diag1",
        "diag1.diag_type = Full",
        "diag1.fields_to_plot = Ex Ey Bz",
        "diag1.electrons.variables = x z w ux uy uz",
        "diag1.positrons.variables = x z w ux uy uz",
        f"diag1.intervals = {diag_interval}",
        "",
        "amrex.abort_on_out_of_gpu_memory = 1",
        "amrex.verbose = 1",
        "tiny_profiler.enabled = 1",
        "tiny_profiler.device_synchronize_around_region = 1",
        "tiny_profiler.memprof_enabled = 1",
    ]

    if extra:
        lines.append("")
        lines.append("# Extra parameters")
        for k, v in extra.items():
            lines.append(f"{k} = {v}")

    return "\n".join(lines) + "\n"

helicon.runner.metal_runner.run_warpx_metal(*, metal_info=None, output_dir, inputs_content=None, n_cell=128, max_step=4, n_ppc=2, density=2e+24, diag_interval=4, timeout_s=3600.0, extra_params=None, progress=False)

Launch WarpX on Apple Silicon GPU via the Metal SYCL backend.

Generates a WarpX input file (or uses inputs_content directly), launches the warpx-metal 2D executable, and returns a :class:MetalRunResult with parsed diagnostics.

Parameters:

Name Type Description Default
metal_info WarpXMetalInfo | None

Pre-detected :class:WarpXMetalInfo. Auto-detected if None.

None
output_dir str | Path

Directory to write inputs, logs, and diagnostics.

required
inputs_content str | None

Raw WarpX inputs file text. If None, generated from other params.

None
n_cell int

Physics parameters passed to :func:generate_metal_inputs.

128
max_step int

Physics parameters passed to :func:generate_metal_inputs.

128
n_ppc int

Physics parameters passed to :func:generate_metal_inputs.

128
density int

Physics parameters passed to :func:generate_metal_inputs.

128
diag_interval int

Physics parameters passed to :func:generate_metal_inputs.

128
timeout_s float

Maximum run time before killing the process.

3600.0
extra_params dict[str, Any] | None

Extra key = value lines appended to the inputs file.

None

Returns:

Type Description
MetalRunResult
Source code in src/helicon/runner/metal_runner.py
def run_warpx_metal(
    *,
    metal_info: WarpXMetalInfo | None = None,
    output_dir: str | Path,
    inputs_content: str | None = None,
    n_cell: int = 128,
    max_step: int = 4,
    n_ppc: int = 2,
    density: float = 2e24,
    diag_interval: int = 4,
    timeout_s: float = 3600.0,
    extra_params: dict[str, Any] | None = None,
    progress: bool = False,
) -> MetalRunResult:
    """Launch WarpX on Apple Silicon GPU via the Metal SYCL backend.

    Generates a WarpX input file (or uses *inputs_content* directly),
    launches the warpx-metal 2D executable, and returns a
    :class:`MetalRunResult` with parsed diagnostics.

    Parameters
    ----------
    metal_info:
        Pre-detected :class:`WarpXMetalInfo`. Auto-detected if None.
    output_dir:
        Directory to write inputs, logs, and diagnostics.
    inputs_content:
        Raw WarpX inputs file text. If None, generated from other params.
    n_cell, max_step, n_ppc, density, diag_interval:
        Physics parameters passed to :func:`generate_metal_inputs`.
    timeout_s:
        Maximum run time before killing the process.
    extra_params:
        Extra ``key = value`` lines appended to the inputs file.

    Returns
    -------
    MetalRunResult
    """
    if metal_info is None:
        metal_info = detect_warpx_metal()

    if not metal_info.valid or metal_info.exe_2d is None:
        _out = Path(output_dir).resolve()
        return MetalRunResult(
            success=False,
            exit_code=-1,
            output_dir=_out,
            log_path=_out / "warpx_metal.log",
            wall_time_s=0.0,
            steps_completed=0,
            diags=[],
            error="warpx-metal 2D executable not found — run warpx-metal build scripts first",
        )

    out = Path(output_dir).resolve()
    out.mkdir(parents=True, exist_ok=True)

    # Write inputs file
    if inputs_content is None:
        inputs_content = generate_metal_inputs(
            n_cell=n_cell,
            max_step=max_step,
            n_ppc=n_ppc,
            density=density,
            diag_interval=diag_interval,
            extra=extra_params,
        )
    inputs_path = out / "inputs"
    inputs_path.write_text(inputs_content)

    # Environment: add acpp bin to PATH if present
    env = os.environ.copy()
    if metal_info.acpp_bin is not None:
        acpp_dir = str(metal_info.acpp_bin.parent)
        path_parts = env.get("PATH", "").split(":")
        if acpp_dir not in path_parts:
            env["PATH"] = acpp_dir + ":" + env.get("PATH", "")

    log_path = out / "warpx_metal.log"
    t0 = time.monotonic()
    exit_code = -1
    steps_completed = 0
    error: str | None = None

    # Infer max_step from inputs for the progress bar total
    import re

    _ms = re.search(r"^\s*max_step\s*=\s*(\d+)", inputs_content or "", re.MULTILINE)
    _max_step = int(_ms.group(1)) if _ms else None

    try:
        with log_path.open("w") as log_fh:
            proc = subprocess.Popen(
                [str(metal_info.exe_2d), str(inputs_path)],
                cwd=str(out),
                env=env,
                stdout=subprocess.PIPE,
                stderr=subprocess.STDOUT,
                text=True,
            )

            _step_re = re.compile(r"STEP\s+(\d+)\s+ends")

            if progress:
                from tqdm import tqdm

                bar = tqdm(
                    total=_max_step,
                    unit="step",
                    desc="WarpX Metal",
                    dynamic_ncols=True,
                )
            else:
                bar = None

            deadline = time.monotonic() + timeout_s
            for line in proc.stdout:  # type: ignore[union-attr]
                log_fh.write(line)
                log_fh.flush()
                m = _step_re.search(line)
                if m:
                    step = int(m.group(1))
                    if bar is not None:
                        bar.update(step - bar.n)
                    steps_completed = step
                if time.monotonic() > deadline:
                    proc.kill()
                    error = f"WarpX Metal timed out after {timeout_s:.0f} s"
                    exit_code = -2
                    break

            if bar is not None:
                bar.close()

            if exit_code != -2:
                proc.wait()
                exit_code = proc.returncode

    except Exception as exc:
        error = str(exc)
        exit_code = -3

    wall_time_s = time.monotonic() - t0

    # Collect diagnostics
    diags = find_diag_dirs(out)
    if not diags:
        # Also check default diags/ subdir
        diags = find_diag_dirs(out / "diags")

    success = exit_code == 0
    return MetalRunResult(
        success=success,
        exit_code=exit_code,
        output_dir=out,
        log_path=log_path,
        wall_time_s=wall_time_s,
        steps_completed=steps_completed,
        diags=diags,
        error=error,
    )