Skip to content

helicon.postprocess

Post-processing pipeline: thrust, detachment, plume metrics, reports, PropBench export.

Thrust

helicon.postprocess.thrust.compute_thrust(output_dir, *, exit_plane_z=None, species_masses=None, backend='auto')

Compute thrust from WarpX openPMD output.

Reads the final particle snapshot, selects particles near the exit plane, and integrates momentum flux.

Parameters:

Name Type Description Default
output_dir path

WarpX output directory containing openPMD files.

required
exit_plane_z float

Axial position of the exit plane [m]. If None, uses the domain boundary (inferred from data).

None
species_masses dict

Species name -> mass [kg] mapping. Defaults to standard values.

None
backend ``"auto"`` | ``"mlx"`` | ``"numpy"``

Compute backend for momentum flux reduction.

'auto'
Source code in src/helicon/postprocess/thrust.py
def compute_thrust(
    output_dir: str | Path,
    *,
    exit_plane_z: float | None = None,
    species_masses: dict[str, float] | None = None,
    backend: str = "auto",
) -> ThrustResult:
    """Compute thrust from WarpX openPMD output.

    Reads the final particle snapshot, selects particles near the exit
    plane, and integrates momentum flux.

    Parameters
    ----------
    output_dir : path
        WarpX output directory containing openPMD files.
    exit_plane_z : float, optional
        Axial position of the exit plane [m]. If None, uses the domain
        boundary (inferred from data).
    species_masses : dict, optional
        Species name -> mass [kg] mapping. Defaults to standard values.
    backend : ``"auto"`` | ``"mlx"`` | ``"numpy"``
        Compute backend for momentum flux reduction.
    """
    use_mlx = resolve_backend(backend) == "mlx"
    output_dir = Path(output_dir)

    # Default species masses
    if species_masses is None:
        species_masses = {
            "D_plus": 3.3435837724e-27,
            "H_plus": 1.67262192595e-27,
            "He3_plus": 5.0082353373e-27,
            "e_minus": 9.1093837139e-31,
        }

    # Try to read openPMD data via h5py
    import h5py

    # Find the latest openPMD iteration
    diag_dir = output_dir / "diags" / "diag1"
    if not diag_dir.exists():
        # Try alternative layouts
        diag_dir = output_dir
        openpmd_files = sorted(diag_dir.glob("openpmd_*.h5"))
        if not openpmd_files:
            openpmd_files = sorted(diag_dir.glob("**/*.h5"))
        if not openpmd_files:
            msg = f"No openPMD files found in {output_dir}"
            raise FileNotFoundError(msg)
        latest = openpmd_files[-1]
    else:
        openpmd_files = sorted(diag_dir.glob("*.h5"))
        if not openpmd_files:
            msg = f"No openPMD files found in {diag_dir}"
            raise FileNotFoundError(msg)
        latest = openpmd_files[-1]

    total_momentum_z = 0.0
    total_mass_flow = 0.0
    total_particles = 0

    with h5py.File(latest, "r") as f:
        # Navigate openPMD hierarchy
        if "data" in f:
            iterations = sorted(f["data"].keys(), key=int)
            base = f["data"][iterations[-1]]
        else:
            base = f

        if "particles" not in base:
            msg = "No particle data found in openPMD file"
            raise ValueError(msg)

        for species_name in base["particles"]:
            sp = base["particles"][species_name]
            mass = species_masses.get(species_name, 1.67e-27)

            # Read positions and momenta
            if "position" in sp and "z" in sp["position"]:
                z = sp["position"]["z"][:]
            else:
                continue

            if "momentum" in sp and "z" in sp["momentum"]:
                pz = sp["momentum"]["z"][:]
            else:
                continue

            # Weight (number of real particles per macro-particle)
            w = sp["weighting"][:] if "weighting" in sp else np.ones_like(z)

            # Exit plane selection
            if exit_plane_z is None:
                exit_plane_z = z.max() * 0.9

            dz_tolerance = (z.max() - z.min()) / 100.0
            near_exit = np.abs(z - exit_plane_z) < dz_tolerance

            if np.sum(near_exit) == 0:
                continue

            vz = pz[near_exit] / mass
            wt = w[near_exit]

            # Momentum flux: sum(w * m * vz^2) is force (thrust)
            if use_mlx and len(wt) > 0:
                mf, mflow = _thrust_reduce_mlx(wt, mass, vz)
                total_momentum_z += mf
                total_mass_flow += mflow
            else:
                total_momentum_z += np.sum(wt * mass * vz**2)
                total_mass_flow += np.sum(wt * mass * np.abs(vz))
            total_particles += int(np.sum(near_exit))

    g0 = 9.80665  # standard gravity
    thrust = total_momentum_z
    mdot = total_mass_flow  # rough: assumes steady-state flux
    v_ex = thrust / mdot if mdot > 0 else 0.0
    isp = v_ex / g0 if g0 > 0 else 0.0

    return ThrustResult(
        thrust_N=float(thrust),
        mass_flow_rate_kgs=float(mdot),
        isp_s=float(isp),
        exhaust_velocity_ms=float(v_ex),
        exit_plane_z=float(exit_plane_z) if exit_plane_z is not None else 0.0,
        n_particles_counted=total_particles,
    )

Detachment

helicon.postprocess.detachment.compute_detachment(output_dir, *, species_name='D_plus', species_mass=3.3435837724e-27, z_inject=None, z_exit=None, r_max=None, backend='auto')

Compute detachment efficiency from openPMD particle data.

Reads initial and final particle snapshots, classifies particles by exit location, and computes all three η_d definitions.

Parameters:

Name Type Description Default
output_dir path

WarpX output directory.

required
species_name str

Particle species to analyze.

'D_plus'
species_mass float

Species mass [kg].

3.3435837724e-27
z_inject float

Injection plane z-coordinate [m].

None
z_exit float

Exit plane z-coordinate [m].

None
r_max float

Radial domain boundary [m].

None
backend ``"auto"`` | ``"mlx"`` | ``"numpy"``

Compute backend for particle classification and efficiency sums.

'auto'
Source code in src/helicon/postprocess/detachment.py
def compute_detachment(
    output_dir: str | Path,
    *,
    species_name: str = "D_plus",
    species_mass: float = 3.3435837724e-27,
    z_inject: float | None = None,
    z_exit: float | None = None,
    r_max: float | None = None,
    backend: str = "auto",
) -> DetachmentResult:
    """Compute detachment efficiency from openPMD particle data.

    Reads initial and final particle snapshots, classifies particles
    by exit location, and computes all three η_d definitions.

    Parameters
    ----------
    output_dir : path
        WarpX output directory.
    species_name : str
        Particle species to analyze.
    species_mass : float
        Species mass [kg].
    z_inject : float, optional
        Injection plane z-coordinate [m].
    z_exit : float, optional
        Exit plane z-coordinate [m].
    r_max : float, optional
        Radial domain boundary [m].
    backend : ``"auto"`` | ``"mlx"`` | ``"numpy"``
        Compute backend for particle classification and efficiency sums.
    """
    use_mlx = resolve_backend(backend) == "mlx"
    output_dir = Path(output_dir)
    h5_files = sorted(output_dir.glob("**/*.h5"))
    if len(h5_files) < 2:
        msg = "Need at least 2 snapshots (initial + final) for detachment analysis"
        raise FileNotFoundError(msg)

    # Read initial snapshot
    pz_inject_total, ke_inject_total, n_inject = _read_snapshot_stats(
        h5_files[0], species_name, species_mass
    )

    # Read final snapshot
    _, _, positions_final, momenta_final, weights_final = _read_final_snapshot(
        h5_files[-1], species_name, species_mass
    )

    if n_inject == 0:
        return DetachmentResult(
            momentum_based=0.0,
            particle_based=0.0,
            energy_based=0.0,
            n_injected=0,
            n_exited_downstream=0,
            n_lost_radial=0,
            n_reflected=0,
        )

    # Classify particles by exit location
    z_pos = positions_final[:, 1] if positions_final.ndim > 1 else positions_final
    r_pos = positions_final[:, 0] if positions_final.ndim > 1 else np.zeros_like(z_pos)

    if z_exit is None:
        z_exit = z_pos.max() * 0.95
    if z_inject is None:
        z_inject = z_pos.min() * 1.05
    if r_max is None:
        r_max = r_pos.max() * 0.95

    if use_mlx:
        pz_exit, ke_directed, n_downstream, n_radial, n_reflected_count = _classify_reduce_mlx(
            z_pos,
            r_pos,
            momenta_final,
            weights_final,
            species_mass,
            z_inject,
            z_exit,
            r_max,
        )
    else:
        downstream = z_pos >= z_exit
        radial_loss = r_pos >= r_max
        reflected = z_pos <= z_inject

        n_downstream = int(np.sum(weights_final[downstream]))
        n_radial = int(np.sum(weights_final[radial_loss & ~downstream]))
        n_reflected_count = int(np.sum(weights_final[reflected & ~downstream & ~radial_loss]))

        pz_exit = float(np.sum(weights_final[downstream] * momenta_final[downstream]))
        vz_down = momenta_final[downstream] / species_mass
        ke_directed = float(
            0.5 * species_mass * np.sum(weights_final[downstream] * vz_down**2)
        )

    eta_momentum = float(pz_exit / pz_inject_total) if pz_inject_total > 0 else 0.0

    # 2. Particle-based: N_downstream / N_injected
    eta_particle = float(n_downstream / n_inject) if n_inject > 0 else 0.0

    # 3. Energy-based: KE_directed_exit / KE_injected
    eta_energy = float(ke_directed / ke_inject_total) if ke_inject_total > 0 else 0.0

    return DetachmentResult(
        momentum_based=np.clip(eta_momentum, 0.0, 1.0),
        particle_based=np.clip(eta_particle, 0.0, 1.0),
        energy_based=np.clip(eta_energy, 0.0, 1.0),
        n_injected=int(n_inject),
        n_exited_downstream=n_downstream,
        n_lost_radial=n_radial,
        n_reflected=n_reflected_count,
    )

Plume

helicon.postprocess.plume.compute_plume_metrics(output_dir, *, species_name='D_plus', species_mass=3.3435837724e-27, T_e_eV=2000.0, z_exit=None, r_max=None, backend='auto')

Compute plume divergence and beam efficiency from particle data.

Parameters:

Name Type Description Default
output_dir path

WarpX output directory.

required
species_name str

Species to analyze.

'D_plus'
species_mass float

Species mass [kg].

3.3435837724e-27
T_e_eV float

Electron temperature at throat [eV], for C_T calculation.

2000.0
z_exit float

Exit plane z [m].

None
r_max float

Radial domain boundary [m].

None
backend ``"auto"`` | ``"mlx"`` | ``"numpy"``

Compute backend for force and kinetic-energy reductions.

'auto'
Source code in src/helicon/postprocess/plume.py
def compute_plume_metrics(
    output_dir: str | Path,
    *,
    species_name: str = "D_plus",
    species_mass: float = 3.3435837724e-27,
    T_e_eV: float = 2000.0,
    z_exit: float | None = None,
    r_max: float | None = None,
    backend: str = "auto",
) -> PlumeResult:
    """Compute plume divergence and beam efficiency from particle data.

    Parameters
    ----------
    output_dir : path
        WarpX output directory.
    species_name : str
        Species to analyze.
    species_mass : float
        Species mass [kg].
    T_e_eV : float
        Electron temperature at throat [eV], for C_T calculation.
    z_exit : float, optional
        Exit plane z [m].
    r_max : float, optional
        Radial domain boundary [m].
    backend : ``"auto"`` | ``"mlx"`` | ``"numpy"``
        Compute backend for force and kinetic-energy reductions.
    """
    use_mlx = resolve_backend(backend) == "mlx"
    import h5py

    output_dir = Path(output_dir)
    h5_files = sorted(output_dir.glob("**/*.h5"))
    if not h5_files:
        msg = f"No HDF5 files found in {output_dir}"
        raise FileNotFoundError(msg)

    with h5py.File(h5_files[-1], "r") as f:
        base = _navigate_openpmd(f)
        if "particles" not in base or species_name not in base["particles"]:
            return PlumeResult(
                divergence_half_angle_deg=0.0,
                beam_efficiency=0.0,
                thrust_coefficient=0.0,
                radial_loss_fraction=0.0,
            )

        sp = base["particles"][species_name]
        z = sp["position"]["z"][:]
        r = sp["position"]["r"][:] if "r" in sp["position"] else np.zeros_like(z)
        pz = sp["momentum"]["z"][:]
        pr = sp["momentum"]["r"][:] if "r" in sp["momentum"] else np.zeros_like(pz)
        w = sp["weighting"][:] if "weighting" in sp else np.ones_like(z)

    if z_exit is None:
        z_exit = z.max() * 0.9
    if r_max is None:
        r_max = r.max() * 0.95

    n_total = np.sum(w)

    # Select particles near exit plane
    dz_tol = (z.max() - z.min()) / 100.0
    near_exit = np.abs(z - z_exit) < dz_tol

    if np.sum(near_exit) == 0:
        return PlumeResult(
            divergence_half_angle_deg=0.0,
            beam_efficiency=0.0,
            thrust_coefficient=0.0,
            radial_loss_fraction=0.0,
        )

    vz = pz[near_exit] / species_mass
    vr = pr[near_exit] / species_mass
    wt = w[near_exit]

    eV_to_J = 1.602176634e-19
    c_s = math.sqrt(T_e_eV * eV_to_J / species_mass)

    if use_mlx and len(wt) > 0:
        F_axial, F_total, ke_axial, ke_total, mdot = _plume_reduce_mlx(
            wt, species_mass, vz, vr, c_s
        )
    else:
        v_total = np.sqrt(vr**2 + vz**2)
        F_axial = float(np.sum(wt * species_mass * vz**2))
        F_total = float(np.sum(wt * species_mass * v_total * np.abs(vz)))
        ke_axial = float(np.sum(wt * vz**2))
        ke_total = float(np.sum(wt * v_total**2))
        mdot = float(np.sum(wt * species_mass * np.abs(vz)))

    if F_total > 0:
        cos_theta = F_axial / F_total
        cos_theta = float(np.clip(cos_theta, -1.0, 1.0))
        theta_deg = float(np.degrees(np.arccos(cos_theta)))
    else:
        theta_deg = 0.0

    # Beam efficiency: η_b = (½ṁv_z²) / (½ṁ|v|²)
    eta_beam = float(ke_axial / ke_total) if ke_total > 0 else 0.0

    # Thrust coefficient: C_T = F / (ṁ c_s)
    C_T = float(F_axial / (mdot * c_s)) if mdot > 0 and c_s > 0 else 0.0

    # Radial loss fraction
    radial_lost = r >= r_max
    radial_fraction = float(np.sum(w[radial_lost]) / n_total) if n_total > 0 else 0.0

    return PlumeResult(
        divergence_half_angle_deg=theta_deg,
        beam_efficiency=eta_beam,
        thrust_coefficient=C_T,
        radial_loss_fraction=radial_fraction,
    )

Moments

helicon.postprocess.moments.compute_moments(output_dir, *, nz=128, nr=64, species_name='D_plus', species_mass=3.3435837724e-27, backend='auto')

Compute velocity-space moments on a uniform grid.

Reads particle data from the latest openPMD snapshot, bins particles onto a (nr, nz) grid, and computes density, mean velocity, and pressure tensor components.

Parameters:

Name Type Description Default
output_dir path

WarpX output directory.

required
nz int

Grid dimensions for moment computation.

128
nr int

Grid dimensions for moment computation.

128
species_name str

Name of species to process.

'D_plus'
species_mass float

Particle mass [kg].

3.3435837724e-27
backend ``"auto"`` | ``"mlx"`` | ``"numpy"``

Compute backend for bin index computation. Scatter-add always runs on CPU regardless.

'auto'
Source code in src/helicon/postprocess/moments.py
def compute_moments(
    output_dir: str | Path,
    *,
    nz: int = 128,
    nr: int = 64,
    species_name: str = "D_plus",
    species_mass: float = 3.3435837724e-27,
    backend: str = "auto",
) -> MomentData:
    """Compute velocity-space moments on a uniform grid.

    Reads particle data from the latest openPMD snapshot, bins particles
    onto a (nr, nz) grid, and computes density, mean velocity, and
    pressure tensor components.

    Parameters
    ----------
    output_dir : path
        WarpX output directory.
    nz, nr : int
        Grid dimensions for moment computation.
    species_name : str
        Name of species to process.
    species_mass : float
        Particle mass [kg].
    backend : ``"auto"`` | ``"mlx"`` | ``"numpy"``
        Compute backend for bin index computation.  Scatter-add always runs
        on CPU regardless.
    """
    use_mlx = resolve_backend(backend) == "mlx"
    import h5py

    output_dir = Path(output_dir)

    # Find latest openPMD file
    h5_files = sorted(output_dir.glob("**/*.h5"))
    if not h5_files:
        msg = f"No HDF5 files found in {output_dir}"
        raise FileNotFoundError(msg)

    latest = h5_files[-1]

    with h5py.File(latest, "r") as f:
        # Navigate openPMD structure
        if "data" in f:
            iterations = sorted(f["data"].keys(), key=int)
            base = f["data"][iterations[-1]]
        else:
            base = f

        if "particles" not in base or species_name not in base["particles"]:
            msg = f"Species {species_name!r} not found in {latest}"
            raise ValueError(msg)

        sp = base["particles"][species_name]

        z_pos = sp["position"]["z"][:]
        r_pos = (
            sp["position"]["r"][:]
            if "r" in sp["position"]
            else (
                np.sqrt(sp["position"]["x"][:] ** 2 + sp["position"]["y"][:] ** 2)
                if "x" in sp["position"]
                else np.zeros_like(z_pos)
            )
        )

        pz = sp["momentum"]["z"][:]
        pr = sp["momentum"]["r"][:] if "r" in sp["momentum"] else (np.zeros_like(pz))

        w = sp["weighting"][:] if "weighting" in sp else np.ones_like(z_pos)

    # Convert momentum to velocity
    vz = pz / species_mass
    vr = pr / species_mass

    # Create grid
    z_min, z_max = z_pos.min(), z_pos.max()
    r_max = max(r_pos.max(), 1e-6)
    z_edges = np.linspace(z_min, z_max, nz + 1)
    r_edges = np.linspace(0.0, r_max, nr + 1)
    z_grid = 0.5 * (z_edges[:-1] + z_edges[1:])
    r_grid = 0.5 * (r_edges[:-1] + r_edges[1:])

    dz = z_edges[1] - z_edges[0]
    dr = r_edges[1] - r_edges[0]

    # Bin particles — MLX path uses (pos-min)/dz cast-to-int (faster on Metal)
    if use_mlx:
        iz = _compute_bin_indices_mlx(z_pos, z_edges, nz)
        ir = _compute_bin_indices_mlx(r_pos, r_edges, nr)
    else:
        iz = np.clip(np.searchsorted(z_edges, z_pos) - 1, 0, nz - 1)
        ir = np.clip(np.searchsorted(r_edges, r_pos) - 1, 0, nr - 1)

    density = np.zeros((nr, nz))
    vr_sum = np.zeros((nr, nz))
    vz_sum = np.zeros((nr, nz))
    vr2_sum = np.zeros((nr, nz))
    vz2_sum = np.zeros((nr, nz))

    np.add.at(density, (ir, iz), w)
    np.add.at(vr_sum, (ir, iz), w * vr)
    np.add.at(vz_sum, (ir, iz), w * vz)
    np.add.at(vr2_sum, (ir, iz), w * vr**2)
    np.add.at(vz2_sum, (ir, iz), w * vz**2)

    # Cell volume for density: 2π r dr dz (cylindrical)
    r_centers = r_grid[:, np.newaxis]
    cell_volume = 2 * np.pi * np.maximum(r_centers, 1e-10) * dr * dz

    mask = density > 0

    if use_mlx:
        import mlx.core as mx

        dens_mx = to_mx(density)
        cell_mx = to_mx(cell_volume)
        vrs_mx = to_mx(vr_sum)
        vzs_mx = to_mx(vz_sum)
        vr2s_mx = to_mx(vr2_sum)
        vz2s_mx = to_mx(vz2_sum)
        mask_mx = dens_mx > 0
        safe_dens = mx.where(mask_mx, dens_mx, 1.0)

        n_mx = mx.where(mask_mx, dens_mx / cell_mx, 0.0)
        vr_mean_mx = mx.where(mask_mx, vrs_mx / safe_dens, 0.0)
        vz_mean_mx = mx.where(mask_mx, vzs_mx / safe_dens, 0.0)
        vr2_mean_mx = mx.where(mask_mx, vr2s_mx / safe_dens, 0.0)
        vz2_mean_mx = mx.where(mask_mx, vz2s_mx / safe_dens, 0.0)

        p_rr_mx = mx.where(
            mask_mx,
            n_mx * float(species_mass) * (vr2_mean_mx - vr_mean_mx * vr_mean_mx),
            0.0,
        )
        p_zz_mx = mx.where(
            mask_mx,
            n_mx * float(species_mass) * (vz2_mean_mx - vz_mean_mx * vz_mean_mx),
            0.0,
        )

        n = to_np(n_mx)
        vr_mean = to_np(vr_mean_mx)
        vz_mean = to_np(vz_mean_mx)
        p_rr = to_np(p_rr_mx)
        p_zz = to_np(p_zz_mx)
    else:
        # Normalize
        n = np.zeros((nr, nz))
        n[mask] = density[mask] / cell_volume[mask]

        vr_mean = np.zeros((nr, nz))
        vz_mean = np.zeros((nr, nz))
        vr_mean[mask] = vr_sum[mask] / density[mask]
        vz_mean[mask] = vz_sum[mask] / density[mask]

        # Pressure = n * m * (<v^2> - <v>^2)
        p_rr = np.zeros((nr, nz))
        p_zz = np.zeros((nr, nz))
        if np.any(mask):
            vr2_mean = np.zeros((nr, nz))
            vz2_mean = np.zeros((nr, nz))
            vr2_mean[mask] = vr2_sum[mask] / density[mask]
            vz2_mean[mask] = vz2_sum[mask] / density[mask]
            p_rr[mask] = n[mask] * species_mass * (vr2_mean[mask] - vr_mean[mask] ** 2)
            p_zz[mask] = n[mask] * species_mass * (vz2_mean[mask] - vz_mean[mask] ** 2)

    return MomentData(
        species=species_name,
        density=n,
        vr_mean=vr_mean,
        vz_mean=vz_mean,
        pressure_rr=p_rr,
        pressure_zz=p_zz,
        r_grid=r_grid,
        z_grid=z_grid,
    )

Pulsed Engines

helicon.postprocess.pulsed.compute_pulsed_metrics(output_dir, *, species_name='D_plus', species_mass=3.3435837724e-27, pulse_period_s=None, n_pulses=None, bfield_path=None)

Compute per-pulse propulsion metrics from time-resolved output.

Analyzes time-series of particle snapshots to identify individual pulses and compute metrics for each.

Parameters:

Name Type Description Default
output_dir path

WarpX output directory with multiple time snapshots.

required
species_name str

Species to analyze.

'D_plus'
species_mass float

Species mass [kg].

3.3435837724e-27
pulse_period_s float

Expected pulse repetition period [s]. If None, auto-detected.

None
n_pulses int

Expected number of pulses. If None, auto-detected.

None
bfield_path path

Path to a pre-computed B-field HDF5 file (written by BField.save()). When provided, per-pulse detachment efficiency is computed via field-line classification. If None, detachment_efficiency is set to None for every pulse.

None
Source code in src/helicon/postprocess/pulsed.py
def compute_pulsed_metrics(
    output_dir: str | Path,
    *,
    species_name: str = "D_plus",
    species_mass: float = 3.3435837724e-27,
    pulse_period_s: float | None = None,
    n_pulses: int | None = None,
    bfield_path: str | Path | None = None,
) -> PulsedResult:
    """Compute per-pulse propulsion metrics from time-resolved output.

    Analyzes time-series of particle snapshots to identify individual
    pulses and compute metrics for each.

    Parameters
    ----------
    output_dir : path
        WarpX output directory with multiple time snapshots.
    species_name : str
        Species to analyze.
    species_mass : float
        Species mass [kg].
    pulse_period_s : float, optional
        Expected pulse repetition period [s]. If None, auto-detected.
    n_pulses : int, optional
        Expected number of pulses. If None, auto-detected.
    bfield_path : path, optional
        Path to a pre-computed B-field HDF5 file (written by
        ``BField.save()``).  When provided, per-pulse detachment
        efficiency is computed via field-line classification.  If None,
        ``detachment_efficiency`` is set to ``None`` for every pulse.
    """
    import h5py

    output_dir = Path(output_dir)
    h5_files = sorted(output_dir.glob("**/*.h5"))

    if len(h5_files) < 2:
        msg = "Need multiple time snapshots for pulsed analysis"
        raise FileNotFoundError(msg)

    # Read time series of momentum flux
    times = []
    momentum_z = []

    for h5_path in h5_files:
        with h5py.File(h5_path, "r") as f:
            base = _navigate_openpmd(f)
            if "particles" not in base or species_name not in base["particles"]:
                continue

            sp = base["particles"][species_name]
            pz = sp["momentum"]["z"][:]
            w = sp["weighting"][:] if "weighting" in sp else np.ones_like(pz)

            # Try to read time from openPMD attributes
            t = 0.0
            if hasattr(base, "attrs") and "time" in base.attrs:
                t = float(base.attrs["time"])
            elif hasattr(base, "attrs") and "dt" in base.attrs:
                t = float(base.attrs.get("time", len(times) * base.attrs["dt"]))

            times.append(t)
            momentum_z.append(float(np.sum(w * pz)))

    if not times:
        return PulsedResult(
            pulses=[],
            total_impulse_Ns=0.0,
            mean_detachment_efficiency=0.0,
            mean_impulse_bit_Ns=0.0,
            n_pulses=0,
            repetition_rate_Hz=None,
            impulse_Ns=0.0,
            energy_J=None,
            isp_s=None,
        )

    times_arr = np.array(times)
    pz_arr = np.array(momentum_z)

    # Detect pulses from momentum signal
    if n_pulses is None:
        # Simple peak detection: count sign changes or threshold crossings
        pz_norm = pz_arr / (np.max(np.abs(pz_arr)) + 1e-30)
        threshold = 0.3
        above = pz_norm > threshold
        # Count rising edges
        edges = np.diff(above.astype(int))
        n_pulses = max(1, int(np.sum(edges > 0)))

    # Divide time series into pulse windows
    dt_total = times_arr[-1] - times_arr[0] if len(times_arr) > 1 else 1.0
    pulse_duration = dt_total / n_pulses if n_pulses > 0 else dt_total

    pulses = []
    for i in range(n_pulses):
        t_start = times_arr[0] + i * pulse_duration
        t_end = t_start + pulse_duration
        mask = (times_arr >= t_start) & (times_arr < t_end)

        if np.sum(mask) == 0:
            continue

        pz_pulse = pz_arr[mask]
        impulse = float(np.trapezoid(pz_pulse, times_arr[mask])) if np.sum(mask) > 1 else 0.0
        peak = float(np.max(np.abs(pz_pulse)))

        pulses.append(
            PulseMetrics(
                pulse_index=i,
                impulse_bit_Ns=abs(impulse),
                detachment_efficiency=None,  # filled below if bfield_path given
                peak_thrust_N=peak,
                pulse_duration_s=float(pulse_duration),
                particle_count=int(np.sum(mask)),
            )
        )

    # Compute per-pulse detachment efficiency via field-line classification
    if bfield_path is not None and pulses:
        try:
            from helicon.fields.biot_savart import BField
            from helicon.postprocess.fieldline_classify import classify_particles

            bfield = BField.load(str(bfield_path))
            classification = classify_particles(output_dir, bfield, species_name=species_name)
            # Detachment efficiency = fraction of particles on open field lines
            eta = (
                classification.n_open / classification.n_total
                if classification.n_total > 0
                else None
            )
            for pulse in pulses:
                pulse.detachment_efficiency = eta
        except Exception:
            pass  # leave detachment_efficiency as None if classification fails

    total_impulse = sum(p.impulse_bit_Ns for p in pulses)
    mean_impulse = total_impulse / len(pulses) if pulses else 0.0
    eta_values = [
        p.detachment_efficiency for p in pulses if p.detachment_efficiency is not None
    ]
    mean_eta = float(np.mean(eta_values)) if eta_values else 0.0

    rep_rate = 1.0 / pulse_duration if pulse_duration > 0 else None

    return PulsedResult(
        pulses=pulses,
        total_impulse_Ns=total_impulse,
        mean_detachment_efficiency=float(mean_eta),
        mean_impulse_bit_Ns=mean_impulse,
        n_pulses=len(pulses),
        repetition_rate_Hz=rep_rate,
        impulse_Ns=total_impulse,
        energy_J=None,
        isp_s=None,
    )

Run Report

helicon.postprocess.report.RunReport(helicon_version, config_hash, thrust_N, isp_s, exhaust_velocity_ms, mass_flow_rate_kgs, detachment_momentum, detachment_particle, detachment_energy, plume_half_angle_deg, beam_efficiency, thrust_coefficient, radial_loss_fraction, nozzle_type=None, plasma_source=None, thrust_relative_change_last_10pct=None, particle_count_exit=None, mass_ratio_reduced=False, validation_proximity=None, energy_conservation_relative_error=None) dataclass

Complete report of a simulation run.

Functions

to_spec_dict(*, config=None, timestamp=None)

Return a spec §6.3 compliant nested dictionary.

Parameters:

Name Type Description Default
config SimConfig

Original simulation configuration; used to populate nozzle_type and plasma_source when not already set.

None
timestamp str

ISO-8601 timestamp string. Defaults to the current UTC time.

None

Returns:

Type Description
dict
Source code in src/helicon/postprocess/report.py
def to_spec_dict(
    self,
    *,
    config: Any = None,
    timestamp: str | None = None,
) -> dict:
    """Return a spec §6.3 compliant nested dictionary.

    Parameters
    ----------
    config : SimConfig, optional
        Original simulation configuration; used to populate
        ``nozzle_type`` and ``plasma_source`` when not already set.
    timestamp : str, optional
        ISO-8601 timestamp string.  Defaults to the current UTC time.

    Returns
    -------
    dict
    """
    import datetime

    if timestamp is None:
        timestamp = datetime.datetime.now(datetime.UTC).strftime("%Y-%m-%dT%H:%M:%SZ")

    # Resolve nozzle_type
    nozzle_type: str | None = self.nozzle_type
    plasma_source: dict | None = self.plasma_source
    if config is not None:
        if nozzle_type is None and hasattr(config, "nozzle"):
            nozzle_type = getattr(config.nozzle, "type", None)
        if plasma_source is None and hasattr(config, "plasma"):
            try:
                plasma_source = config.plasma.model_dump(mode="python")
            except AttributeError:
                plasma_source = None

    # Compute validation proximity if config provided and not already set
    validation_proximity = self.validation_proximity
    if validation_proximity is None and config is not None:
        try:
            from helicon.validate.proximity import config_proximity

            prox = config_proximity(config)
            validation_proximity = {
                "nearest_case": prox.nearest_case,
                "distance": prox.distance,
                "in_validated_region": prox.in_validated_region,
                "parameter_distances": prox.parameter_distances,
                "warning": prox.warning,
            }
        except Exception:
            pass

    return {
        "helicon_version": self.helicon_version,
        "config_hash": self.config_hash,
        "timestamp": timestamp,
        "nozzle_type": nozzle_type,
        "plasma_source": plasma_source,
        "mass_ratio_reduced": self.mass_ratio_reduced,
        "results": {
            "thrust_N": self.thrust_N,
            "isp_s": self.isp_s,
            "exhaust_velocity_ms": self.exhaust_velocity_ms,
            "mass_flow_rate_kgs": self.mass_flow_rate_kgs,
            "detachment_efficiency": {
                "momentum_based": self.detachment_momentum,
                "particle_based": self.detachment_particle,
                "energy_based": self.detachment_energy,
            },
            "plume_half_angle_deg": self.plume_half_angle_deg,
            "beam_efficiency": self.beam_efficiency,
            "radial_loss_fraction": self.radial_loss_fraction,
            "convergence": {
                "thrust_relative_change_last_10pct": (
                    self.thrust_relative_change_last_10pct
                ),
                "particle_count_exit": self.particle_count_exit,
            },
        },
        "validation_flags": self._compute_validation_flags(),
        "validation_proximity": validation_proximity,
    }

helicon.postprocess.report.generate_report(output_dir, *, config_hash=None, config=None)

Generate a summary report from all available postprocessing results.

Attempts to compute each metric; missing data results in None values.

Source code in src/helicon/postprocess/report.py
def generate_report(
    output_dir: str | Path,
    *,
    config_hash: str | None = None,
    config: Any = None,
) -> RunReport:
    """Generate a summary report from all available postprocessing results.

    Attempts to compute each metric; missing data results in None values.
    """
    output_dir = Path(output_dir)

    # Derive mass_ratio_reduced from config if provided
    mass_ratio_reduced = False
    if config is not None and hasattr(config, "plasma"):
        mr = getattr(config.plasma, "mass_ratio", None)
        mass_ratio_reduced = mr is not None and mr < 1836.0

    report = RunReport(
        helicon_version=helicon.__version__,
        config_hash=config_hash,
        thrust_N=None,
        isp_s=None,
        exhaust_velocity_ms=None,
        mass_flow_rate_kgs=None,
        detachment_momentum=None,
        detachment_particle=None,
        detachment_energy=None,
        plume_half_angle_deg=None,
        beam_efficiency=None,
        thrust_coefficient=None,
        radial_loss_fraction=None,
        mass_ratio_reduced=mass_ratio_reduced,
    )

    # Thrust
    try:
        from helicon.postprocess.thrust import compute_thrust

        thrust = compute_thrust(output_dir)
        report.thrust_N = thrust.thrust_N
        report.isp_s = thrust.isp_s
        report.exhaust_velocity_ms = thrust.exhaust_velocity_ms
        report.mass_flow_rate_kgs = thrust.mass_flow_rate_kgs
    except (FileNotFoundError, ValueError):
        pass

    # Detachment
    try:
        from helicon.postprocess.detachment import compute_detachment

        det = compute_detachment(output_dir)
        report.detachment_momentum = det.momentum_based
        report.detachment_particle = det.particle_based
        report.detachment_energy = det.energy_based
    except (FileNotFoundError, ValueError):
        pass

    # Plume
    try:
        from helicon.postprocess.plume import compute_plume_metrics

        plume = compute_plume_metrics(output_dir)
        report.plume_half_angle_deg = plume.divergence_half_angle_deg
        report.beam_efficiency = plume.beam_efficiency
        report.thrust_coefficient = plume.thrust_coefficient
        report.radial_loss_fraction = plume.radial_loss_fraction
    except (FileNotFoundError, ValueError):
        pass

    # Convergence diagnostics from time-series snapshots
    report.thrust_relative_change_last_10pct, report.particle_count_exit = (
        _compute_convergence_diagnostics(output_dir)
    )

    # Energy conservation error from WarpX reduced diagnostics (§7.1)
    report.energy_conservation_relative_error = _compute_energy_conservation(output_dir)

    return report

helicon.postprocess.report.save_report(report, path, *, config=None)

Save a report to JSON using the spec §6.3 nested format.

Source code in src/helicon/postprocess/report.py
def save_report(report: RunReport, path: str | Path, *, config: Any = None) -> None:
    """Save a report to JSON using the spec §6.3 nested format."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    data = report.to_spec_dict(config=config)
    path.write_text(json.dumps(data, indent=2, default=str))

helicon.postprocess.report.load_report(path)

Load a report from JSON.

Source code in src/helicon/postprocess/report.py
def load_report(path: str | Path) -> dict:
    """Load a report from JSON."""
    return json.loads(Path(path).read_text())

PropBench

helicon.postprocess.propbench.PropBenchResult(propbench_version='0.1', code_name='Helicon', code_version='', timestamp='', config_hash=None, nozzle_type=None, species=None, n0_m3=None, T_i_eV=None, T_e_eV=None, thrust_N=None, isp_s=None, exhaust_velocity_ms=None, mass_flow_rate_kgs=None, detachment_efficiency_momentum=None, detachment_efficiency_particle=None, detachment_efficiency_energy=None, plume_half_angle_deg=None, beam_efficiency=None, mass_ratio_reduced=False, electron_model='kinetic') dataclass

PropBench-compatible result record.

helicon.postprocess.propbench.to_propbench(run_report, config=None, *, mass_ratio_reduced=False, electron_model='kinetic')

Convert a RunReport to a PropBenchResult.

Parameters:

Name Type Description Default
run_report RunReport

Simulation results from postprocess/report.py.

required
config SimConfig or None

Optional simulation configuration to extract plasma parameters.

None
mass_ratio_reduced bool

Whether a reduced mass ratio was used.

False
electron_model str

Electron treatment model used (e.g., "kinetic", "fluid").

'kinetic'

Returns:

Type Description
PropBenchResult
Source code in src/helicon/postprocess/propbench.py
def to_propbench(
    run_report: RunReport,
    config: object | None = None,
    *,
    mass_ratio_reduced: bool = False,
    electron_model: str = "kinetic",
) -> PropBenchResult:
    """Convert a RunReport to a PropBenchResult.

    Parameters
    ----------
    run_report : RunReport
        Simulation results from postprocess/report.py.
    config : SimConfig or None
        Optional simulation configuration to extract plasma parameters.
    mass_ratio_reduced : bool
        Whether a reduced mass ratio was used.
    electron_model : str
        Electron treatment model used (e.g., "kinetic", "fluid").

    Returns
    -------
    PropBenchResult
    """
    result = PropBenchResult(
        code_version=run_report.helicon_version,
        timestamp=datetime.now(tz=UTC).isoformat(),
        config_hash=run_report.config_hash,
        thrust_N=run_report.thrust_N,
        isp_s=run_report.isp_s,
        exhaust_velocity_ms=run_report.exhaust_velocity_ms,
        mass_flow_rate_kgs=run_report.mass_flow_rate_kgs,
        detachment_efficiency_momentum=run_report.detachment_momentum,
        detachment_efficiency_particle=run_report.detachment_particle,
        detachment_efficiency_energy=run_report.detachment_energy,
        plume_half_angle_deg=run_report.plume_half_angle_deg,
        beam_efficiency=run_report.beam_efficiency,
        mass_ratio_reduced=mass_ratio_reduced,
        electron_model=electron_model,
    )

    if config is not None:
        # Extract from SimConfig-like object
        if hasattr(config, "nozzle"):
            result.nozzle_type = getattr(config.nozzle, "type", None)
        if hasattr(config, "plasma"):
            plasma = config.plasma
            result.species = list(getattr(plasma, "species", []))
            result.n0_m3 = getattr(plasma, "n0", None)
            result.T_i_eV = getattr(plasma, "T_i_eV", None)
            result.T_e_eV = getattr(plasma, "T_e_eV", None)

    return result

helicon.postprocess.propbench.save_propbench(result, path)

Write a PropBenchResult to a JSON file.

Source code in src/helicon/postprocess/propbench.py
def save_propbench(result: PropBenchResult, path: str | Path) -> None:
    """Write a PropBenchResult to a JSON file."""
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    data = asdict(result)
    path.write_text(json.dumps(data, indent=2, default=str))

helicon.postprocess.propbench.load_propbench(path)

Load a PropBenchResult from a JSON file.

Source code in src/helicon/postprocess/propbench.py
def load_propbench(path: str | Path) -> PropBenchResult:
    """Load a PropBenchResult from a JSON file."""
    data = json.loads(Path(path).read_text())
    return PropBenchResult(**data)