Source code for simulator.tools.tool_bsfc_map_epa

# simulator/tools/tool_bsfc_map_epa.py

from __future__ import annotations
from pathlib import Path
from typing import Dict, Any, List
import json
import csv
import math
import argparse

import numpy as np
import plotly.graph_objects as go

from ..core import EngineSimulator
from .. import io

ROOT = Path(__file__).resolve().parents[1]
IN_DIR = ROOT / "in"
OUT_DIR = ROOT / "out"


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _deep_copy_cfg(cfg: Dict[str, Any]) -> Dict[str, Any]:
    """Cheap deep copy via JSON round-trip."""
    return json.loads(json.dumps(cfg))


def _run_grid_point(cfg_base: Dict[str, Any], speed_rpm: float, pint_bar: float) -> Dict[str, Any]:
    """Run a single operating point and return BSFC / η_th / BMEP / torque / power.

    Axes:
      - speed_rpm: engine speed [rpm]
      - pint_bar:  intake absolute pressure [bar]
    """
    cfg = _deep_copy_cfg(cfg_base)
    op = cfg.setdefault("operating", {})
    op["engine_speed_rpm"] = float(speed_rpm)
    op["intake_pressure_Pa"] = float(pint_bar * 1e5)

    sim = EngineSimulator.from_dict(cfg)
    res = sim.run(cycles=1)

    return {
        "speed_rpm": float(speed_rpm),
        "pint_bar": float(pint_bar),
        "bmep_bar": res.bmep_bar,
        "torque_Nm": res.brake_torque_Nm,
        "power_kW": res.brake_power_kW,
        "bsfc_g_per_kWh": res.bsfc_g_per_kWh,
        "eta_b_th": res.brake_thermal_efficiency,
    }


def _ensure_dir(path: Path) -> None:
    """Ensure parent directory for a file path exists."""
    if path.suffix:
        path.parent.mkdir(parents=True, exist_ok=True)
    else:
        path.mkdir(parents=True, exist_ok=True)


def _write_csv(points: List[Dict[str, Any]], path: Path) -> None:
    if not points:
        print(f"[BSFC-MAP-EPA] No points to write for {path.name}, skipping CSV.")
        return

    _ensure_dir(path)
    fieldnames = [
        "speed_rpm",
        "pint_bar",
        "bmep_bar",
        "torque_Nm",
        "power_kW",
        "bsfc_g_per_kWh",
        "eta_b_th",
    ]
    with path.open("w", newline="", encoding="utf-8") as f:
        w = csv.DictWriter(f, fieldnames=fieldnames)
        w.writeheader()
        for r in sorted(points, key=lambda r: (r["speed_rpm"], r["pint_bar"])):
            w.writerow(r)

    print(f"[BSFC-MAP-EPA] Wrote CSV grid: {path}")


def _bmep_to_torque_Nm(bmep_bar: float, Vd_total_m3: float, stroke_type: str) -> float:
    """Convert BMEP [bar] to torque [Nm] for the full engine.

    For 4-stroke:   BMEP = 4πT / Vd  →  T = BMEP * Vd / (4π)
    For 2-stroke:   BMEP = 2πT / Vd  →  T = BMEP * Vd / (2π)
    """
    if Vd_total_m3 <= 0.0:
        return 0.0

    stroke = (stroke_type or "").lower()
    k = 2.0 if ("two" in stroke or "2-" in stroke) else 4.0
    bmep_Pa = float(bmep_bar) * 1e5
    return bmep_Pa * Vd_total_m3 / (k * math.pi)


# ---------------------------------------------------------------------------
# Build regular N–BMEP grids from N–p_int samples
# ---------------------------------------------------------------------------

def _build_bmep_grids(records: List[Dict[str, Any]], n_bmep: int = 25) -> Dict[str, Any]:
    """Convert flat list of (N, p_int) records into regular (N, BMEP) grids.

    Returns:
      - speeds: sorted unique speeds [rpm]
      - bmep:   regular BMEP levels [bar]
      - z_bsfc: BSFC [g/kWh]      shape = (len(bmep), len(speeds))
      - z_eta:  brake η_th [%]    same shape
      - z_pwr:  brake power [kW]  same shape
      - z_trq:  brake torque [Nm] same shape
      - bmep_min / bmep_max
    """
    if not records:
        raise ValueError("No records provided to build N–BMEP grids")

    speeds = sorted({float(r["speed_rpm"]) for r in records})
    by_speed: Dict[float, List[Dict[str, Any]]] = {N: [] for N in speeds}
    for r in records:
        by_speed[float(r["speed_rpm"])].append(r)

    bmep_vals = [
        float(r["bmep_bar"])
        for r in records
        if r.get("bmep_bar") is not None
    ]
    if not bmep_vals:
        raise ValueError("No BMEP data found in records")

    bmep_min = max(0.0, min(bmep_vals))
    bmep_max = max(bmep_vals)
    if bmep_max <= bmep_min:
        bmep_max = bmep_min + 0.1

    bmep_levels = np.linspace(bmep_min, bmep_max, int(max(n_bmep, 5)))

    nx = len(speeds)
    ny = len(bmep_levels)

    def _nan_grid() -> List[List[float]]:
        return [[float("nan")] * nx for _ in range(ny)]

    z_bsfc = _nan_grid()
    z_eta = _nan_grid()
    z_pwr = _nan_grid()
    z_trq = _nan_grid()

    for ix, N in enumerate(speeds):
        pts = by_speed[N]
        pts_valid = [p for p in pts if p.get("bmep_bar") is not None]
        if not pts_valid:
            continue

        pts_valid.sort(key=lambda r: r["bmep_bar"])
        bmep_arr = np.array([float(p["bmep_bar"]) for p in pts_valid], dtype=float)

        def _to_array(key: str, scale: float = 1.0) -> np.ndarray:
            vals: List[float] = []
            for p in pts_valid:
                v = p.get(key)
                if v is None:
                    vals.append(float("nan"))
                else:
                    vals.append(float(v) * scale)
            return np.array(vals, dtype=float)

        bsfc_arr = _to_array("bsfc_g_per_kWh", scale=1.0)
        eta_arr = _to_array("eta_b_th", scale=100.0)  # → %
        pwr_arr = _to_array("power_kW", scale=1.0)
        trq_arr = _to_array("torque_Nm", scale=1.0)

        for iy, target in enumerate(bmep_levels):
            idx = int(np.argmin(np.abs(bmep_arr - target)))
            if 0 <= idx < bmep_arr.size:
                if np.isfinite(bsfc_arr[idx]):
                    z_bsfc[iy][ix] = float(bsfc_arr[idx])
                if np.isfinite(eta_arr[idx]):
                    z_eta[iy][ix] = float(eta_arr[idx])
                if np.isfinite(pwr_arr[idx]):
                    z_pwr[iy][ix] = float(pwr_arr[idx])
                if np.isfinite(trq_arr[idx]):
                    z_trq[iy][ix] = float(trq_arr[idx])

    return {
        "speeds": [float(s) for s in speeds],
        "bmep": [float(b) for b in bmep_levels],
        "z_bsfc": z_bsfc,
        "z_eta": z_eta,
        "z_pwr": z_pwr,
        "z_trq": z_trq,
        "bmep_min": float(bmep_min),
        "bmep_max": float(bmep_max),
    }


# ---------------------------------------------------------------------------
# Plotting – EPA style
# ---------------------------------------------------------------------------

def _plot_bsfc_map_epa(html_path: Path, grid: Dict[str, Any],
                       Vd_total_m3: float, stroke_type: str) -> None:
    speeds: List[float] = grid["speeds"]
    bmep: List[float] = grid["bmep"]
    z_bsfc: List[List[float]] = grid["z_bsfc"]
    z_pwr: List[List[float]] = grid["z_pwr"]
    z_trq: List[List[float]] = grid["z_trq"]
    bmep_min: float = grid["bmep_min"]
    bmep_max: float = grid["bmep_max"]

    z_bsfc_arr = np.array(z_bsfc, dtype=float)
    z_pwr_arr = np.array(z_pwr, dtype=float)
    z_trq_arr = np.array(z_trq, dtype=float)

    # Best BSFC point
    best_speed = best_bmep = best_bsfc = None
    if np.isfinite(z_bsfc_arr).any():
        flat_idx = int(np.nanargmin(z_bsfc_arr))
        iy, ix = np.unravel_index(flat_idx, z_bsfc_arr.shape)
        best_speed = speeds[ix]
        best_bmep = bmep[iy]
        best_bsfc = float(z_bsfc_arr[iy, ix])

    # Power contour levels
    pwr_min = np.nanmin(z_pwr_arr) if np.isfinite(z_pwr_arr).any() else 0.0
    pwr_max = np.nanmax(z_pwr_arr) if np.isfinite(z_pwr_arr).any() else 0.0
    if pwr_max <= 0.0:
        pwr_min, pwr_max = 0.0, 0.0

    # More power traces: 20 kW step (40 if huge range)
    step = 20.0
    if pwr_max - pwr_min > 400.0:
        step = 40.0
    if pwr_max > 0.0:
        start = step * math.ceil(max(pwr_min, 10.0) / step)
        end = step * math.floor(pwr_max / step)
        if end <= start:
            start, end = max(step, pwr_min), pwr_max
    else:
        start, end = 0.0, 0.0

    fig = go.Figure()

    # customdata for richer hover: torque, power
    custom_bsfc = np.dstack([z_trq_arr, z_pwr_arr])

    # BSFC filled contours
    fig.add_trace(
        go.Contour(
            x=speeds,
            y=bmep,
            z=z_bsfc,
            colorbar_title="BSFC [g/kWh]",
            colorscale="Plasma",
            contours=dict(
                showlabels=True,
                labelfont=dict(size=10, color="white"),
            ),
            line=dict(width=0.6, color="rgba(0,0,0,0.3)"),
            name="BSFC",
            customdata=custom_bsfc,
            hovertemplate=(
                "N = %{x:.0f} rpm<br>"
                "BMEP = %{y:.1f} bar<br>"
                "T = %{customdata[0]:.1f} Nm<br>"
                "P_b = %{customdata[1]:.1f} kW<br>"
                "BSFC = %{z:.1f} g/kWh"
                "<extra></extra>"
            ),
        )
    )

    # Constant brake power lines (dashed lime, fluorescent text)
    if pwr_max > 0.0:
        fig.add_trace(
            go.Contour(
                x=speeds,
                y=bmep,
                z=z_pwr,
                contours=dict(
                    showlines=True,
                    showlabels=True,
                    labelfont=dict(size=9, color="lime"),
                    coloring="none",
                    start=start,
                    end=end,
                    size=step,
                ),
                line=dict(width=1.2, color="lime", dash="dash"),
                showscale=False,
                name="Brake power [kW]",
                hovertemplate=(
                    "N = %{x:.0f} rpm<br>"
                    "BMEP = %{y:.1f} bar<br>"
                    "P_b = %{z:.1f} kW"
                    "<extra></extra>"
                ),
            )
        )

    # Best BSFC marker – red text
    if best_speed is not None and best_bmep is not None and best_bsfc is not None:
        fig.add_trace(
            go.Scatter(
                x=[best_speed],
                y=[best_bmep],
                mode="markers+text",
                text=[f"{best_bsfc:.1f}"],
                textposition="top center",
                textfont=dict(color="red", size=12),
                marker=dict(symbol="star", size=12, line=dict(width=1), color="lime"),
                name="Best BSFC",
            )
        )

    # Torque ticks mapped onto BMEP axis
    bmep_ticks = np.linspace(bmep_min, bmep_max, 6)
    torque_ticks = [_bmep_to_torque_Nm(b, Vd_total_m3, stroke_type) for b in bmep_ticks]

    fig.update_layout(
        title="Baby F1 Toy Engine – EPA-style BSFC Map",
        xaxis=dict(title="Speed [rpm]"),
        yaxis=dict(title="BMEP [bar]", range=[bmep_min, bmep_max]),
        # Right side shows torque numbers at same vertical positions
        yaxis2=dict(
            title="Brake torque [Nm]",
            overlaying="y",
            side="right",
            showgrid=False,
            tickvals=list(bmep_ticks),
            ticktext=[f"{t:.0f}" for t in torque_ticks],
        ),
        legend=dict(x=0.02, y=0.98, bgcolor="rgba(255,255,255,0.6)"),
    )

    fig.add_annotation(
        xref="paper",
        yref="paper",
        x=0.01,
        y=1.08,
        showarrow=False,
        text="Filled contours: BSFC [g/kWh]; dashed lime lines: Brake power [kW]",
    )

    _ensure_dir(html_path)
    fig.write_html(str(html_path))
    print(f"[BSFC-MAP-EPA] Wrote EPA-style BSFC map: {html_path}")


# ---------------------------------------------------------------------------
# Main / CLI
# ---------------------------------------------------------------------------

def _parse_args(argv: List[str] | None = None) -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Generate an EPA-style BSFC map (N vs BMEP) for any engine JSON config.",
    )
    parser.add_argument(
        "--config",
        type=str,
        default=str(IN_DIR / "sample_si_engine.json"),
        help="Path to engine configuration JSON (default: simulator/in/sample_si_engine.json)",
    )
    parser.add_argument(
        "--html",
        type=str,
        default=str(OUT_DIR / "bsfc_map_epa.html"),
        help="Output HTML path for the BSFC map (default: simulator/out/bsfc_map_epa.html)",
    )
    parser.add_argument(
        "--csv",
        type=str,
        default=str(OUT_DIR / "bsfc_map_epa_points.csv"),
        help="Output CSV path for raw grid points (default: simulator/out/bsfc_map_epa_points.csv)",
    )
    return parser.parse_args(argv)


[docs] def main(argv: List[str] | None = None) -> None: args = _parse_args(argv) base_cfg_path = Path(args.config) if not base_cfg_path.exists(): raise SystemExit(f"Base config not found: {base_cfg_path}") cfg_base = io.load_json(base_cfg_path) # Get displacement and stroke type from a "dummy" simulator instance sim0 = EngineSimulator.from_dict(cfg_base) Vd_cyl = sim0.geometry.displacement_volume() n_cyl = max(int(sim0.operating.num_cylinders), 1) Vd_total_m3 = Vd_cyl * n_cyl stroke_type = sim0.operating.stroke_type # N / p_int grid speeds_rpm = [ 500, 1000, 1500, 2000, 2500, 3000, 3500, 4000, 4500, 5000, 5500, 6000, 6500, 7000, 7500, 8000, ] pint_bar_list = [ 0.8, 0.90, 1.013, 1.20, 1.40, 1.60, 1.80, 2.0, 2.20, 2.40, 2.60, 2.80, 3.0, 3.20, 3.40, 3.60, 3.80, 4.0, ] print("[BSFC-MAP-EPA] Building N–p_int grid ...") points: List[Dict[str, Any]] = [] for N in speeds_rpm: for pint_bar in pint_bar_list: pt = _run_grid_point(cfg_base, N, pint_bar) points.append(pt) print( f" N={N:6.1f} rpm p_int={pint_bar:5.3f} bar " f"BMEP={pt.get('bmep_bar')!r} T={pt.get('torque_Nm')!r} Nm " f"P_b={pt.get('power_kW')!r} kW BSFC={pt.get('bsfc_g_per_kWh')!r}" ) csv_path = Path(args.csv) _write_csv(points, csv_path) print("[BSFC-MAP-EPA] Re-gridding to N–BMEP space ...") grid = _build_bmep_grids(points, n_bmep=25) html_path = Path(args.html) _plot_bsfc_map_epa(html_path, grid, Vd_total_m3, stroke_type)
if __name__ == "__main__": # pragma: no cover main()