Source code for simulator.pumps.water_pump

from __future__ import annotations

"""Centrifugal water-pump models for vICE.

This module is the first production-style pumps package component. It focuses
on the coolant pump use case: curve-based centrifugal pump matching, affinity
law speed scaling, power absorption, BEP ratio, and NPSH/cavitation margin.
"""

from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Mapping
import csv
import json

from .affinity import AffinityScale
from .cavitation import SuctionState, npsh_margin_ft
from .curves import Curve1D
from .power import brake_horsepower_from_efficiency, flow_to_gpm, hp_to_kw, water_horsepower_from_curve_flow
from .system_curve import QuadraticSystemCurve


[docs] @dataclass(frozen=True) class CentrifugalWaterPump: name: str reference_speed_rpm: float flow_unit: str head_unit: str head_curve: Curve1D efficiency_curve: Curve1D | None = None npshr_curve: Curve1D | None = None bep_flow: float | None = None bep_head: float | None = None specific_gravity: float = 1.0 valid_flow_min: float | None = None valid_flow_max: float | None = None
[docs] @classmethod def from_dict(cls, data: Mapping[str, Any]) -> "CentrifugalWaterPump": curves = dict(data.get("curves", {})) if "head" not in curves: raise ValueError("Pump JSON must contain curves.head") head_curve = Curve1D.from_dict("head", curves["head"]) efficiency_curve = Curve1D.from_dict("efficiency", curves["efficiency"]) if "efficiency" in curves else None npshr_curve = Curve1D.from_dict("npshr", curves["npshr"]) if "npshr" in curves else None flow_min = data.get("valid_flow_min") flow_max = data.get("valid_flow_max") if flow_min is None: flow_min = head_curve.x_min() if flow_max is None: flow_max = head_curve.x_max() return cls( name=str(data.get("name", "Unnamed centrifugal water pump")), reference_speed_rpm=float(data.get("reference_speed_rpm", data.get("speed_rpm", 1.0))), flow_unit=str(data.get("flow_unit", "gpm")), head_unit=str(data.get("head_unit", "ft")), head_curve=head_curve, efficiency_curve=efficiency_curve, npshr_curve=npshr_curve, bep_flow=_optional_float(data.get("bep_flow")), bep_head=_optional_float(data.get("bep_head")), specific_gravity=float(data.get("specific_gravity", 1.0)), valid_flow_min=_optional_float(flow_min), valid_flow_max=_optional_float(flow_max), )
[docs] @classmethod def from_json(cls, path: str | Path) -> "CentrifugalWaterPump": with Path(path).open("r", encoding="utf-8") as f: return cls.from_dict(json.load(f))
[docs] def speed_scale(self, pump_speed_rpm: float) -> AffinityScale: return AffinityScale(reference_speed_rpm=self.reference_speed_rpm, target_speed_rpm=float(pump_speed_rpm))
[docs] def head_ft(self, flow: float, pump_speed_rpm: float | None = None) -> float: if pump_speed_rpm is None: pump_speed_rpm = self.reference_speed_rpm scale = self.speed_scale(float(pump_speed_rpm)) q_ref = scale.flow_to_reference(float(flow)) return scale.head_from_reference(self.head_curve.y(q_ref))
[docs] def efficiency(self, flow: float, pump_speed_rpm: float | None = None) -> float | None: if self.efficiency_curve is None: return None if pump_speed_rpm is None: pump_speed_rpm = self.reference_speed_rpm scale = self.speed_scale(float(pump_speed_rpm)) q_ref = scale.flow_to_reference(float(flow)) eta = self.efficiency_curve.y(q_ref) if eta > 1.0: eta = eta / 100.0 return max(0.0, min(1.0, eta))
[docs] def npshr_ft(self, flow: float, pump_speed_rpm: float | None = None) -> float | None: if self.npshr_curve is None: return None if pump_speed_rpm is None: pump_speed_rpm = self.reference_speed_rpm scale = self.speed_scale(float(pump_speed_rpm)) q_ref = scale.flow_to_reference(float(flow)) # Approximate scaling, useful for mock/preliminary analysis. Real supplier # NPSHR data at multiple speeds should replace this when available. return scale.head_from_reference(self.npshr_curve.y(q_ref))
[docs] def bep_flow_at_speed(self, pump_speed_rpm: float) -> float | None: if self.bep_flow is None: return None return self.speed_scale(float(pump_speed_rpm)).flow_from_reference(self.bep_flow)
[docs] def flow_bounds_at_speed(self, pump_speed_rpm: float) -> tuple[float, float]: scale = self.speed_scale(float(pump_speed_rpm)) qmin = self.valid_flow_min if self.valid_flow_min is not None else 0.0 qmax = self.valid_flow_max if self.valid_flow_max is not None else max(self.bep_flow or 1.0, 1.0) * 2.0 return scale.flow_from_reference(float(qmin)), scale.flow_from_reference(float(qmax))
[docs] def to_summary_dict(self) -> dict[str, Any]: return { "name": self.name, "reference_speed_rpm": self.reference_speed_rpm, "flow_unit": self.flow_unit, "head_unit": self.head_unit, "bep_flow": self.bep_flow, "bep_head": self.bep_head, "specific_gravity": self.specific_gravity, "valid_flow_min": self.valid_flow_min, "valid_flow_max": self.valid_flow_max, }
[docs] @dataclass(frozen=True) class PumpOperatingPoint: pump_name: str pump_speed_rpm: float engine_speed_rpm: float | None flow: float flow_gpm: float head_pump_ft: float head_system_ft: float efficiency: float | None water_hp: float brake_hp: float | None brake_kw: float | None npsha_ft: float | None npshr_ft: float | None npsh_margin_ft: float | None bep_flow: float | None bep_ratio: float | None status: str
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] def load_system_json(path: str | Path) -> dict[str, Any]: with Path(path).open("r", encoding="utf-8") as f: return json.load(f)
[docs] def match_system( pump: CentrifugalWaterPump, system: QuadraticSystemCurve, pump_speed_rpm: float, *, suction: SuctionState | None = None, engine_speed_rpm: float | None = None, npsh_margin_required_ft: float = 3.0, preferred_bep_min: float = 0.85, preferred_bep_max: float = 1.10, acceptable_bep_min: float = 0.70, acceptable_bep_max: float = 1.25, ) -> PumpOperatingPoint: q_lo, q_hi = pump.flow_bounds_at_speed(float(pump_speed_rpm)) q_lo = max(q_lo, 0.0) if q_hi <= q_lo: raise ValueError("Invalid pump flow bounds") def residual(q: float) -> float: return pump.head_ft(q, pump_speed_rpm) - system.head_ft(q) bracket = _find_bracket(residual, q_lo, q_hi, n=300) if bracket is None: # Pick the closest residual as a diagnostic fallback, but fail loudly. samples = [(q_lo + (q_hi - q_lo) * i / 300.0) for i in range(301)] q_best = min(samples, key=lambda q: abs(residual(q))) raise ValueError( "Could not bracket a pump/system intersection. " f"Closest point: Q={q_best:.6g}, residual={residual(q_best):.6g} ft" ) q = _bisect(residual, bracket[0], bracket[1], tol=1e-9, max_iter=100) hpump = pump.head_ft(q, pump_speed_rpm) hsys = system.head_ft(q) eta = pump.efficiency(q, pump_speed_rpm) q_gpm = flow_to_gpm(q, pump.flow_unit) whp = water_horsepower_from_curve_flow(q, pump.flow_unit, hpump, specific_gravity=pump.specific_gravity) bhp = brake_horsepower_from_efficiency(whp, eta) bkw = hp_to_kw(bhp) npsha = suction.npsha_ft(q) if suction is not None else None npshr = pump.npshr_ft(q, pump_speed_rpm) margin = npsh_margin_ft(npsha, npshr) bep_flow = pump.bep_flow_at_speed(pump_speed_rpm) bep_ratio = (q / bep_flow) if bep_flow and bep_flow > 0.0 else None status = _status( bep_ratio, margin, npsh_margin_required_ft, preferred_bep_min, preferred_bep_max, acceptable_bep_min, acceptable_bep_max, ) return PumpOperatingPoint( pump_name=pump.name, pump_speed_rpm=float(pump_speed_rpm), engine_speed_rpm=float(engine_speed_rpm) if engine_speed_rpm is not None else None, flow=q, flow_gpm=q_gpm, head_pump_ft=hpump, head_system_ft=hsys, efficiency=eta, water_hp=whp, brake_hp=bhp, brake_kw=bkw, npsha_ft=npsha, npshr_ft=npshr, npsh_margin_ft=margin, bep_flow=bep_flow, bep_ratio=bep_ratio, status=status, )
[docs] def rpm_sweep( pump: CentrifugalWaterPump, system: QuadraticSystemCurve, *, engine_rpm_min: float, engine_rpm_max: float, engine_rpm_step: float, pulley_ratio: float = 1.0, suction: SuctionState | None = None, npsh_margin_required_ft: float = 3.0, preferred_bep_min: float = 0.85, preferred_bep_max: float = 1.10, acceptable_bep_min: float = 0.70, acceptable_bep_max: float = 1.25, ) -> list[PumpOperatingPoint]: if engine_rpm_step <= 0.0: raise ValueError("engine_rpm_step must be positive") points: list[PumpOperatingPoint] = [] N = float(engine_rpm_min) while N <= float(engine_rpm_max) + 1e-9: pump_rpm = N * float(pulley_ratio) points.append( match_system( pump, system, pump_rpm, suction=suction, engine_speed_rpm=N, npsh_margin_required_ft=npsh_margin_required_ft, preferred_bep_min=preferred_bep_min, preferred_bep_max=preferred_bep_max, acceptable_bep_min=acceptable_bep_min, acceptable_bep_max=acceptable_bep_max, ) ) N += float(engine_rpm_step) return points
[docs] def write_points_json(path: str | Path, payload: Mapping[str, Any]) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(payload, f, indent=2)
[docs] def write_points_csv(path: str | Path, points: list[PumpOperatingPoint]) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) rows = [p.to_dict() for p in points] if not rows: path.write_text("", encoding="utf-8") return with path.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows)
def _find_bracket(func: Any, lo: float, hi: float, *, n: int) -> tuple[float, float] | None: q_prev = lo f_prev = func(q_prev) if abs(f_prev) < 1e-12: return q_prev, q_prev for i in range(1, n + 1): q = lo + (hi - lo) * i / n f = func(q) if abs(f) < 1e-12: return q, q if f_prev * f < 0.0: return q_prev, q q_prev, f_prev = q, f return None def _bisect(func: Any, lo: float, hi: float, *, tol: float, max_iter: int) -> float: if lo == hi: return lo f_lo = func(lo) f_hi = func(hi) if f_lo * f_hi > 0.0: raise ValueError("Bisection requires a sign-changing bracket") for _ in range(max_iter): mid = 0.5 * (lo + hi) f_mid = func(mid) if abs(f_mid) < tol or abs(hi - lo) < tol: return mid if f_lo * f_mid <= 0.0: hi = mid f_hi = f_mid else: lo = mid f_lo = f_mid return 0.5 * (lo + hi) def _status( bep_ratio: float | None, npsh_margin: float | None, npsh_margin_required_ft: float, preferred_bep_min: float, preferred_bep_max: float, acceptable_bep_min: float, acceptable_bep_max: float, ) -> str: """Return a graded engineering status for an operating point. The preferred BEP window marks the best operating region. The wider acceptable window is intentionally less alarming and is useful for textbook or preliminary supplier-curve studies where the pump is not exactly at BEP but is still operating in a defensible region. """ flags: list[str] = [] bep_flag = _bep_status( bep_ratio, preferred_bep_min=preferred_bep_min, preferred_bep_max=preferred_bep_max, acceptable_bep_min=acceptable_bep_min, acceptable_bep_max=acceptable_bep_max, ) if bep_flag != "OK": flags.append(bep_flag) if npsh_margin is not None: margin = float(npsh_margin) required = max(float(npsh_margin_required_ft), 0.0) if margin < required: flags.append("CAVITATION_MARGIN_LOW") elif required > 0.0 and margin < 2.0 * required: flags.append("NPSH_MARGIN_WATCH") return "OK" if not flags else ";".join(flags) def _bep_status( bep_ratio: float | None, *, preferred_bep_min: float, preferred_bep_max: float, acceptable_bep_min: float, acceptable_bep_max: float, ) -> str: """Classify an operating point relative to the best-efficiency point.""" if bep_ratio is None: return "OK" ratio = float(bep_ratio) if preferred_bep_min <= ratio <= preferred_bep_max: return "OK" if acceptable_bep_min <= ratio < preferred_bep_min: return "ACCEPTABLE_LEFT_OF_BEP" if preferred_bep_max < ratio <= acceptable_bep_max: return "ACCEPTABLE_RIGHT_OF_BEP" if ratio < acceptable_bep_min: return "OFF_DESIGN_LOW_FLOW_BEP" return "OFF_DESIGN_HIGH_FLOW_BEP" def _optional_float(value: Any) -> float | None: if value is None: return None return float(value)