Source code for simulator.pumps.combined

from __future__ import annotations

"""Series and parallel centrifugal-pump combinations.

The physical rules are intentionally simple and transparent:

* Parallel pumps add flow at the same head. For N identical pumps, the
  combined head at total flow Q is H_single(Q / N).
* Series pumps add head at the same flow. For N identical pumps, the
  combined head at flow Q is N * H_single(Q).

These rules are the direct computational version of Frank White's pump
combination discussion and are useful for exploratory vICE cooling-system
studies before supplier-grade pump maps are available.
"""

from dataclasses import dataclass, asdict
from pathlib import Path
from typing import Any, Mapping
import csv
import json

from .cavitation import SuctionState, npsh_margin_ft
from .power import (
    brake_horsepower_from_efficiency,
    flow_to_gpm,
    hp_to_kw,
    water_horsepower_from_curve_flow,
)
from .system_curve import QuadraticSystemCurve
from .water_pump import CentrifugalWaterPump, _find_bracket, _bisect


[docs] @dataclass(frozen=True) class CombinedPumpOperatingPoint: """Operating point for an identical-pump series/parallel arrangement.""" pump_name: str arrangement: str number_of_pumps: int pump_speed_rpm: float engine_speed_rpm: float | None flow_total: float flow_total_gpm: float flow_per_pump: float flow_per_pump_gpm: float head_combined_ft: float head_system_ft: float head_per_pump_ft: float efficiency_per_pump: float | None water_hp_total: float brake_hp_total: float | None brake_kw_total: float | None brake_hp_per_pump: float | None brake_kw_per_pump: float | None npsha_ft: float | None npshr_per_pump_ft: float | None npsh_margin_ft: float | None bep_flow_per_pump: float | None bep_ratio_per_pump: float | None status: str
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] @dataclass(frozen=True) class BepSpeedResult: """Result for the speed needed to put a pump on its scaled BEP.""" pump_name: str possible: bool reason: str reference_speed_rpm: float target_speed_rpm: float | None speed_ratio: float | None speed_ratio_squared: float | None bep_flow_reference: float | None bep_head_reference_ft: float | None target_bep_flow: float | None target_bep_flow_gpm: float | None target_bep_head_ft: float | None system_head_at_target_bep_ft: float | None residual_ft: float | None equation: str
[docs] def to_dict(self) -> dict[str, Any]: return asdict(self)
[docs] def match_combined_system( pump: CentrifugalWaterPump, system: QuadraticSystemCurve, pump_speed_rpm: float, *, arrangement: str, number_of_pumps: int = 2, suction: SuctionState | None = None, engine_speed_rpm: float | None = None, npsh_margin_required_ft: float = 3.0, preferred_bep_min: float = 0.85, preferred_bep_max: float = 1.10, acceptable_bep_min: float = 0.70, acceptable_bep_max: float = 1.25, ) -> CombinedPumpOperatingPoint: """Match identical pumps in series or parallel against a system curve.""" n = int(number_of_pumps) if n < 1: raise ValueError("number_of_pumps must be >= 1") mode = str(arrangement).strip().lower() if mode not in {"parallel", "series"}: raise ValueError("arrangement must be 'parallel' or 'series'") q_lo_single, q_hi_single = pump.flow_bounds_at_speed(float(pump_speed_rpm)) q_lo_single = max(q_lo_single, 0.0) if mode == "parallel": q_lo_total = q_lo_single * n q_hi_total = q_hi_single * n else: q_lo_total = q_lo_single q_hi_total = q_hi_single if q_hi_total <= q_lo_total: raise ValueError("Invalid combined-pump flow bounds") def per_pump_flow(q_total: float) -> float: return float(q_total) / n if mode == "parallel" else float(q_total) def combined_head(q_total: float) -> float: q_each = per_pump_flow(q_total) h_each = pump.head_ft(q_each, pump_speed_rpm) return h_each if mode == "parallel" else n * h_each def residual(q_total: float) -> float: return combined_head(q_total) - system.head_ft(q_total) bracket = _find_bracket(residual, q_lo_total, q_hi_total, n=400) if bracket is None: samples = [q_lo_total + (q_hi_total - q_lo_total) * i / 400.0 for i in range(401)] q_best = min(samples, key=lambda q: abs(residual(q))) raise ValueError( "Could not bracket a combined pump/system intersection. " f"Closest point: Q_total={q_best:.6g}, residual={residual(q_best):.6g} ft" ) q_total = _bisect(residual, bracket[0], bracket[1], tol=1e-9, max_iter=120) q_each = per_pump_flow(q_total) h_each = pump.head_ft(q_each, pump_speed_rpm) h_comb = combined_head(q_total) h_sys = system.head_ft(q_total) eta_each = pump.efficiency(q_each, pump_speed_rpm) q_total_gpm = flow_to_gpm(q_total, pump.flow_unit) q_each_gpm = flow_to_gpm(q_each, pump.flow_unit) # Hydraulic power is based on total flow through the combined head. # This is equivalent to summing the per-pump hydraulic powers for identical pumps. whp_total = water_horsepower_from_curve_flow( q_total, pump.flow_unit, h_comb, specific_gravity=pump.specific_gravity, ) bhp_total = brake_horsepower_from_efficiency(whp_total, eta_each) bkw_total = hp_to_kw(bhp_total) bhp_each = (bhp_total / n) if bhp_total is not None else None bkw_each = hp_to_kw(bhp_each) # Shared suction piping often sees total flow. Use total flow for NPSHA, # but each pump only requires NPSHR at the flow it sees. npsha = suction.npsha_ft(q_total) if suction is not None else None npshr_each = pump.npshr_ft(q_each, pump_speed_rpm) margin = npsh_margin_ft(npsha, npshr_each) bep_each = pump.bep_flow_at_speed(pump_speed_rpm) bep_ratio_each = (q_each / bep_each) if bep_each and bep_each > 0.0 else None status = _combined_status( bep_ratio_each, margin, npsh_margin_required_ft, preferred_bep_min, preferred_bep_max, acceptable_bep_min, acceptable_bep_max, ) return CombinedPumpOperatingPoint( pump_name=pump.name, arrangement=mode, number_of_pumps=n, pump_speed_rpm=float(pump_speed_rpm), engine_speed_rpm=float(engine_speed_rpm) if engine_speed_rpm is not None else None, flow_total=q_total, flow_total_gpm=q_total_gpm, flow_per_pump=q_each, flow_per_pump_gpm=q_each_gpm, head_combined_ft=h_comb, head_system_ft=h_sys, head_per_pump_ft=h_each, efficiency_per_pump=eta_each, water_hp_total=whp_total, brake_hp_total=bhp_total, brake_kw_total=bkw_total, brake_hp_per_pump=bhp_each, brake_kw_per_pump=bkw_each, npsha_ft=npsha, npshr_per_pump_ft=npshr_each, npsh_margin_ft=margin, bep_flow_per_pump=bep_each, bep_ratio_per_pump=bep_ratio_each, status=status, )
[docs] def bep_speed_to_match_system( pump: CentrifugalWaterPump, system: QuadraticSystemCurve, ) -> BepSpeedResult: """Return the pump speed needed for the scaled BEP to lie on the system curve. This is the programmatic form of Frank White Example 11.6 part (b). The closed-form solution is exact for a quadratic system curve with the same flow unit as the pump curve: H_bep*r^2 = H_static + K*(Q_bep*r)^2 where r = n_target / n_reference. """ if pump.bep_flow is None or pump.bep_head is None: return BepSpeedResult( pump_name=pump.name, possible=False, reason="Pump JSON must define bep_flow and bep_head.", reference_speed_rpm=pump.reference_speed_rpm, target_speed_rpm=None, speed_ratio=None, speed_ratio_squared=None, bep_flow_reference=pump.bep_flow, bep_head_reference_ft=pump.bep_head, target_bep_flow=None, target_bep_flow_gpm=None, target_bep_head_ft=None, system_head_at_target_bep_ft=None, residual_ft=None, equation="H_bep*r^2 = H_static + K*(Q_bep*r)^2", ) if abs(float(system.exponent) - 2.0) > 1e-12: return BepSpeedResult( pump_name=pump.name, possible=False, reason="Closed-form BEP-speed check currently requires system exponent = 2.", reference_speed_rpm=pump.reference_speed_rpm, target_speed_rpm=None, speed_ratio=None, speed_ratio_squared=None, bep_flow_reference=pump.bep_flow, bep_head_reference_ft=pump.bep_head, target_bep_flow=None, target_bep_flow_gpm=None, target_bep_head_ft=None, system_head_at_target_bep_ft=None, residual_ft=None, equation="H_bep*r^2 = H_static + K*(Q_bep*r)^2", ) q0 = float(pump.bep_flow) h0 = float(pump.bep_head) h_static = float(system.static_head_ft) k = float(system.k) denom = h0 - k * q0 * q0 eqn = f"{h0:g}*r^2 = {h_static:g} + {k:g}*({q0:g}*r)^2" if abs(denom) < 1e-15: reason = "No finite nonzero speed satisfies the scaled BEP/system equation; denominator is zero." r2 = None else: r2 = h_static / denom reason = "OK" if r2 > 0.0 else "No real pump speed can place the scaled BEP point on this system curve." if r2 is None or r2 <= 0.0: return BepSpeedResult( pump_name=pump.name, possible=False, reason=reason, reference_speed_rpm=pump.reference_speed_rpm, target_speed_rpm=None, speed_ratio=None, speed_ratio_squared=r2, bep_flow_reference=q0, bep_head_reference_ft=h0, target_bep_flow=None, target_bep_flow_gpm=None, target_bep_head_ft=None, system_head_at_target_bep_ft=None, residual_ft=None, equation=eqn, ) r = r2 ** 0.5 q_target = q0 * r h_target = h0 * r2 h_system = system.head_ft(q_target) residual = h_target - h_system return BepSpeedResult( pump_name=pump.name, possible=True, reason="Scaled BEP point intersects the system curve.", reference_speed_rpm=pump.reference_speed_rpm, target_speed_rpm=pump.reference_speed_rpm * r, speed_ratio=r, speed_ratio_squared=r2, bep_flow_reference=q0, bep_head_reference_ft=h0, target_bep_flow=q_target, target_bep_flow_gpm=flow_to_gpm(q_target, pump.flow_unit), target_bep_head_ft=h_target, system_head_at_target_bep_ft=h_system, residual_ft=residual, equation=eqn, )
[docs] def write_combined_json(path: str | Path, payload: Mapping[str, Any]) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as f: json.dump(payload, f, indent=2)
[docs] def write_combined_csv(path: str | Path, points: list[CombinedPumpOperatingPoint]) -> None: path = Path(path) path.parent.mkdir(parents=True, exist_ok=True) rows = [p.to_dict() for p in points] if not rows: path.write_text("", encoding="utf-8") return with path.open("w", encoding="utf-8", newline="") as f: writer = csv.DictWriter(f, fieldnames=list(rows[0].keys())) writer.writeheader() writer.writerows(rows)
def _combined_status( bep_ratio: float | None, npsh_margin: float | None, npsh_margin_required_ft: float, preferred_bep_min: float, preferred_bep_max: float, acceptable_bep_min: float, acceptable_bep_max: float, ) -> str: flags: list[str] = [] if bep_ratio is not None: r = float(bep_ratio) if preferred_bep_min <= r <= preferred_bep_max: pass elif acceptable_bep_min <= r < preferred_bep_min: flags.append("ACCEPTABLE_LEFT_OF_BEP") elif preferred_bep_max < r <= acceptable_bep_max: flags.append("ACCEPTABLE_RIGHT_OF_BEP") elif r < acceptable_bep_min: flags.append("OFF_DESIGN_LOW_FLOW_BEP") else: flags.append("OFF_DESIGN_HIGH_FLOW_BEP") if npsh_margin is not None: margin = float(npsh_margin) required = max(float(npsh_margin_required_ft), 0.0) if margin < required: flags.append("CAVITATION_MARGIN_LOW") elif required > 0.0 and margin < 2.0 * required: flags.append("NPSH_MARGIN_WATCH") return "OK" if not flags else ";".join(flags)