# equations/api.py
from __future__ import annotations
"""
equations.api
Public, stable facade for the EES-ish nonlinear equation solving layer.
Role of this file:
- Accept either an EquationSystemSpec or a JSON/YAML mapping
- Validate/normalize the spec
- Adapt rich spec -> solver.py's duck-typed shape
- Keep imports light; heavy deps remain optional and are imported lazily
Upgrades in this version:
- Works with the *new* spec.py model (VarSpec/ParamSpec/EquationSpec/EquationSystemSpec)
- Supports spec.solve (raw solve-block dict) and forwards it to solver.py
- Preserves non-numeric params (e.g., fluid="Nitrogen") for PropsSI(..., fluid)
- API args override spec/solve-block ONLY when explicitly provided
(backend/method/tol/max_iter/max_restarts/use_units)
- Best-effort unit conversion support via units if present
- Explicit backend availability checks (helpful errors) when user requests scipy/gekko
- Registers property functions into the evaluation context when equations reference them:
* PropsSI / PhaseSI
* HAPropsSI
* CTPropsSI / CTPropsMulti / CTBatchProps (Cantera)
* LiBrPropsSI / LiBrH2OPropsSI (LiBr–H2O engine)
* LiBr helper calls (EES-style convenience):
- LiBrX_TP, LiBrH_TX, LiBrRho_TXP, LiBrT_HX
* NH3–H2O (ammonia-water) property engine + helpers (native backend):
- NH3H2O / NH3H2O_STATE / NH3H2O_TPX / NH3H2O_STATE_TPX
- optional aliases: prop_tpx / state_tpx / props_multi_tpx / batch_prop_tpx
- optional PropsSI-like aliases if you expose them:
NH3H2OPropsSI / NH3H2OPropsMulti / NH3H2OBatchProps
Notes:
- If equations contain Python-callable property-function calls (PropsSI/PhaseSI/HAPropsSI/CTPropsSI/ASPropsSI/LiBr*/NH3H2O*),
GEKKO backends cannot handle them. In that case we require SciPy (or auto -> SciPy).
Contract note (Feb 2026):
- `thermo_props.coolprop_backend` is the *central contract/registration point* for thermo callables.
Cantera is implemented in `thermo_props.cantera_backend`, but we prefer to access CTPropsSI
via coolprop_backend's delegation wrappers to keep a single import surface.
"""
from dataclasses import dataclass
from importlib import import_module
from typing import Any, Dict, List, Literal, Mapping, Optional, Sequence, Tuple, Union, TYPE_CHECKING
import re
from .spec import (
EquationSpec,
EquationSystemSpec,
ParamSpec,
VarSpec,
system_from_mapping,
)
if TYPE_CHECKING: # pragma: no cover
from .solver import SolveResult as Solution # noqa: F401
else:
Solution = Any # runtime-friendly alias
BackendKind = Literal["auto", "gekko", "scipy"]
[docs]
class BackendUnavailableError(RuntimeError):
"""Raised when a requested backend isn't installed/available."""
# Public aliases expected by equations/__init__.py
EquationSystem = EquationSystemSpec
Var = VarSpec
Param = ParamSpec
# ------------------------------ backend availability ------------------------------
def _has_gekko() -> bool:
try:
import gekko # type: ignore # noqa: F401
except Exception:
return False
return True
def _has_scipy() -> bool:
try:
import scipy # type: ignore # noqa: F401
except Exception:
return False
return True
def _normalize_backend_name(x: Any) -> str:
s = str(x or "").strip().lower()
if s in {"", "none"}:
return "auto"
if s in {"root", "scipy-root", "optimize"}:
return "scipy"
if s in {"ipopt", "apopt"}:
return "gekko"
if s in {"scipyoptimize"}:
return "scipy"
return s
def _ensure_backend_available(backend: str) -> None:
b = _normalize_backend_name(backend)
if b in {"", "none", "auto"}:
return
if b == "gekko" and not _has_gekko():
raise BackendUnavailableError("Backend 'gekko' requested but GEKKO is not installed.")
if b == "scipy" and not _has_scipy():
raise BackendUnavailableError("Backend 'scipy' requested but SciPy is not installed.")
# ------------------------------ optional units integration ------------------------------
@dataclass(frozen=True)
class _UnitAdapter:
"""
Tiny wrapper for optional unit conversion.
Expected capability (best-effort):
- convert(value, from_units, to_units) -> float
"""
convert: Any # Callable[[float, str, str], float]
def _try_units_adapter() -> Optional[_UnitAdapter]:
"""
Attempt to import units lazily.
Supported minimal APIs (best-effort):
- convert(value: float, from_units: str, to_units: str) -> float
OR
- DEFAULT_REGISTRY with .convert(value, from_units, to_units)
"""
try:
import units as _units
except Exception:
return None
conv = getattr(_units, "convert", None)
if callable(conv):
return _UnitAdapter(convert=conv)
reg = getattr(_units, "DEFAULT_REGISTRY", None)
if reg is not None and callable(getattr(reg, "convert", None)):
return _UnitAdapter(convert=reg.convert)
return None
def _norm_units(u: Optional[str]) -> Optional[str]:
if u is None:
return None
s = str(u).strip()
return s if s else None
# ------------------------------ adapter types ------------------------------
@dataclass
class _VarLike:
"""
Internal variable object matching what solver.py expects via duck-typing.
solver.py checks unknowns using (in order):
- v.unknown (bool) if present
- v.kind == "unknown" if present
- v.value is None
"""
name: str
kind: str
value: float | None
guess: float | None = None
lower: float | None = None
upper: float | None = None
@property
def unknown(self) -> bool:
return str(self.kind).lower() == "unknown"
@dataclass
class _SpecLike:
"""
Internal spec object matching what solver.py expects via duck-typing.
"""
equations: List[str]
variables: List[_VarLike]
params: Dict[str, Any]
solve: Dict[str, Any] | None = None
backend: str | None = None
solver: str | None = None
method: str | None = None
tol: float | None = None
max_iter: int | None = None
maxiter: int | None = None
max_restarts: int | None = None
# ------------------------------ conversion helpers ------------------------------
def _equations_to_strings(eqs: Sequence[EquationSpec]) -> List[str]:
out: List[str] = []
for e in eqs:
if e.kind == "expr":
out.append(str(e.expr))
continue
if e.kind == "residual":
raise NotImplementedError(
"EquationSpec(kind='residual') is not implemented yet in the backend adapter. "
"For now use kind='expr' with an explicit equation string."
)
raise ValueError(f"Unknown EquationSpec.kind: {e.kind!r}")
return out
def _extract_base_units(spec: EquationSystemSpec) -> Tuple[Optional[Dict[str, str]], Optional[Dict[str, str]]]:
meta = getattr(spec, "meta", None)
if not isinstance(meta, Mapping):
return None, None
u = meta.get("units")
if not isinstance(u, Mapping):
return None, None
vars_u = u.get("vars")
params_u = u.get("params")
vars_base = dict(vars_u) if isinstance(vars_u, Mapping) else None
params_base = dict(params_u) if isinstance(params_u, Mapping) else None
return vars_base, params_base
def _params_to_values(
params_map: Mapping[str, ParamSpec],
*,
units: Optional[_UnitAdapter] = None,
base_units: Optional[Mapping[str, str]] = None,
) -> Dict[str, Any]:
out: Dict[str, Any] = {}
for p in params_map.values():
name = str(p.name)
val: Any = p.value
if isinstance(val, (int, float)) and not isinstance(val, bool):
x = float(val)
if units is not None and base_units is not None:
u_from = _norm_units(getattr(p, "unit", None))
u_to = _norm_units(base_units.get(name))
if u_from and u_to and u_from != u_to:
try:
x = float(units.convert(x, u_from, u_to))
except Exception:
pass
out[name] = x
else:
out[name] = val
return out
def _vars_to_varlikes(
vars_map: Mapping[str, VarSpec],
*,
units: Optional[_UnitAdapter] = None,
base_units: Optional[Mapping[str, str]] = None,
) -> List[_VarLike]:
out: List[_VarLike] = []
for v in vars_map.values():
name = str(v.name)
u_from = _norm_units(getattr(v, "unit", None))
u_to = _norm_units(base_units.get(name)) if base_units is not None else None
def _conv(x: float | None) -> float | None:
if x is None:
return None
if units is None or base_units is None:
return float(x)
if not u_from or not u_to or u_from == u_to:
return float(x)
try:
return float(units.convert(float(x), u_from, u_to))
except Exception:
return float(x)
lo2 = _conv(v.lower)
hi2 = _conv(v.upper)
if bool(v.fixed):
if v.value is None:
raise ValueError(f"Fixed variable {name!r} is missing a value.")
value = _conv(float(v.value))
out.append(
_VarLike(
name=name,
kind="fixed",
value=float(value) if value is not None else float(v.value),
guess=None,
lower=lo2,
upper=hi2,
)
)
else:
guess = _conv(float(v.guess_value(default=1.0)))
out.append(
_VarLike(
name=name,
kind="unknown",
value=None,
guess=float(guess) if guess is not None else float(v.guess_value(default=1.0)),
lower=lo2,
upper=hi2,
)
)
return out
def _strip_solve_keys(solve: Dict[str, Any], keys: Sequence[str]) -> None:
for k in keys:
if k in solve:
solve.pop(k, None)
def _import_first(candidates: Sequence[str]) -> Any | None:
for mod in candidates:
try:
return import_module(f"thermo_props.{mod}")
except Exception:
continue
return None
# ------------------------------ property-function detection + injection ------------------------------
_RE_PROPS = re.compile(r"\bPropsSI\s*\(")
_RE_PHASE = re.compile(r"\bPhaseSI\s*\(")
_RE_HA = re.compile(r"\bHAPropsSI\s*\(")
# Cantera (CTPropsSI family) + optional cache helpers
_RE_CT = re.compile(
r"\b(?:"
r"CTPropsSI|CTPropsMulti|CTBatchProps|"
r"ctprops_si|ctprops_multi|batch_ctprops|"
r"cantera_available|"
r"ctprops_cache_info|clear_ctprops_caches"
r")\s*\("
)
# AbstractState family (CoolProp AbstractState wrappers)
_RE_AS_PROPS = re.compile(r"\bASPropsSI\s*\(")
_RE_AS_MULTI = re.compile(r"\bASPropsMulti\s*\(")
_RE_AS_BATCH = re.compile(r"\bASBatchProps\s*\(")
_RE_FUG = re.compile(r"\bFugacitySI\s*\(")
_RE_FUGCOEFF = re.compile(r"\bFugacityCoeffSI\s*\(")
_RE_LNPHI = re.compile(r"\bLnFugacityCoeffSI\s*\(")
_RE_CHEMPOT = re.compile(r"\bChemicalPotentialSI\s*\(")
# AbstractState internal aliases (if users call the backend helpers directly)
_RE_AS_SI = re.compile(r"\bas_props_si\s*\(")
_RE_AS_MULTI2 = re.compile(r"\bas_props_multi\s*\(")
_RE_AS_BATCH2 = re.compile(r"\bbatch_as_props\s*\(")
# LiBr family (PropsSI-like + helpers + internal aliases)
_RE_LIBR = re.compile(
r"\b(?:"
r"LiBrPropsSI|LiBrH2OPropsSI|"
r"LiBrX_TP|LiBrH_TX|LiBrRho_TXP|LiBrT_HX|"
r"LiBrPropsMulti|LiBrBatchProps|"
r"librh2o_props_si|librh2o_props_multi|batch_librh2o_props"
r")\s*\("
)
_RE_LIBR_X_TP = re.compile(r"\bLiBrX_TP\s*\(")
_RE_LIBR_H_TX = re.compile(r"\bLiBrH_TX\s*\(")
_RE_LIBR_RHO = re.compile(r"\bLiBrRho_TXP\s*\(")
_RE_LIBR_T_HX = re.compile(r"\bLiBrT_HX\s*\(")
# NH3–H2O family (native backend + optional aliases + optional PropsSI-like)
_RE_NH3H2O = re.compile(
r"\b(?:"
r"NH3H2O|NH3H2O_STATE|NH3H2O_TPX|NH3H2O_STATE_TPX|"
r"NH3H2OPropsSI|NH3H2OPropsMulti|NH3H2OBatchProps|"
r"prop_tpx|state_tpx|props_multi_tpx|batch_prop_tpx"
r")\s*\("
)
def _needs_python_property_funcs(eq_strings: Sequence[str]) -> bool:
"""
True if equations contain Python-callable property functions.
NOTE: we do NOT include the cache helper names here because they are not numeric
thermo-property calls and typically shouldn't appear in equations.
"""
text = "\n".join(eq_strings)
return bool(
_RE_PROPS.search(text)
or _RE_PHASE.search(text)
or _RE_HA.search(text)
or _RE_AS_PROPS.search(text)
or _RE_AS_MULTI.search(text)
or _RE_AS_BATCH.search(text)
or _RE_FUG.search(text)
or _RE_FUGCOEFF.search(text)
or _RE_LNPHI.search(text)
or _RE_CHEMPOT.search(text)
or _RE_AS_SI.search(text)
or _RE_AS_MULTI2.search(text)
or _RE_AS_BATCH2.search(text)
or _RE_LIBR.search(text)
or _RE_NH3H2O.search(text)
or re.search(r"\b(?:CTPropsSI|CTPropsMulti|CTBatchProps|ctprops_si|ctprops_multi|batch_ctprops)\s*\(", text)
)
def _inject_property_functions(eq_strings: Sequence[str], params: Dict[str, Any]) -> None:
text = "\n".join(eq_strings)
# -------------------- CoolProp contract surface --------------------
# We import this only if needed.
needs_cp = bool(
_RE_PROPS.search(text)
or _RE_PHASE.search(text)
or _RE_HA.search(text)
or _RE_AS_PROPS.search(text)
or _RE_AS_MULTI.search(text)
or _RE_AS_BATCH.search(text)
or _RE_FUG.search(text)
or _RE_FUGCOEFF.search(text)
or _RE_LNPHI.search(text)
or _RE_CHEMPOT.search(text)
or _RE_AS_SI.search(text)
or _RE_AS_MULTI2.search(text)
or _RE_AS_BATCH2.search(text)
or _RE_CT.search(text)
)
_cp = None
if needs_cp:
try:
from thermo_props import coolprop_backend as _cp # local import
except Exception:
_cp = None
if _cp is None:
missing: List[str] = []
if _RE_PROPS.search(text):
missing.append("PropsSI")
if _RE_PHASE.search(text):
missing.append("PhaseSI")
if _RE_HA.search(text):
missing.append("HAPropsSI")
if _RE_AS_PROPS.search(text) or _RE_AS_SI.search(text):
missing.append("ASPropsSI")
if _RE_AS_MULTI.search(text) or _RE_AS_MULTI2.search(text):
missing.append("ASPropsMulti")
if _RE_AS_BATCH.search(text) or _RE_AS_BATCH2.search(text):
missing.append("ASBatchProps")
if _RE_FUG.search(text):
missing.append("FugacitySI")
if _RE_FUGCOEFF.search(text):
missing.append("FugacityCoeffSI")
if _RE_LNPHI.search(text):
missing.append("LnFugacityCoeffSI")
if _RE_CHEMPOT.search(text):
missing.append("ChemicalPotentialSI")
if _RE_CT.search(text):
missing.append("CTPropsSI")
raise BackendUnavailableError(
"Equation system references thermo property functions "
f"({', '.join(missing)}), but thermo_props.coolprop_backend could not be imported."
)
# register CoolProp functions
if _cp is not None:
if _RE_PROPS.search(text):
params.setdefault("PropsSI", _cp.props_si)
if _RE_PHASE.search(text):
params.setdefault("PhaseSI", _cp.phase_si)
if _RE_HA.search(text):
params.setdefault("HAPropsSI", _cp.haprops_si)
params.setdefault("ha_props_si", _cp.ha_props_si)
params.setdefault("ha_props_multi", _cp.ha_props_multi)
params.setdefault("batch_ha_props", _cp.batch_ha_props)
# AbstractState wrappers
if (
_RE_AS_PROPS.search(text)
or _RE_AS_MULTI.search(text)
or _RE_AS_BATCH.search(text)
or _RE_FUG.search(text)
or _RE_FUGCOEFF.search(text)
or _RE_LNPHI.search(text)
or _RE_CHEMPOT.search(text)
or _RE_AS_SI.search(text)
or _RE_AS_MULTI2.search(text)
or _RE_AS_BATCH2.search(text)
):
fn_avail = getattr(_cp, "abstractstate_available", None)
if callable(fn_avail) and not bool(fn_avail()):
raise BackendUnavailableError(
"Equation system references CoolProp AbstractState functions (ASPropsSI/Fugacity*), "
"but CoolProp AbstractState is unavailable in this environment."
)
fn_as = getattr(_cp, "as_props_si", None) or getattr(_cp, "ASPropsSI", None)
if not callable(fn_as):
raise BackendUnavailableError(
"Equation system references ASPropsSI/Fugacity* calls, but coolprop_backend "
"does not expose as_props_si (expected callable)."
)
params.setdefault("ASPropsSI", fn_as)
params.setdefault("as_props_si", fn_as)
fn_multi = getattr(_cp, "as_props_multi", None) or getattr(_cp, "ASPropsMulti", None)
if callable(fn_multi):
params.setdefault("ASPropsMulti", fn_multi)
params.setdefault("as_props_multi", fn_multi)
elif _RE_AS_MULTI.search(text) or _RE_AS_MULTI2.search(text):
raise BackendUnavailableError(
"Equation system references ASPropsMulti/as_props_multi, but coolprop_backend does not provide it."
)
fn_batch = getattr(_cp, "batch_as_props", None) or getattr(_cp, "ASBatchProps", None)
if callable(fn_batch):
params.setdefault("ASBatchProps", fn_batch)
params.setdefault("batch_as_props", fn_batch)
elif _RE_AS_BATCH.search(text) or _RE_AS_BATCH2.search(text):
raise BackendUnavailableError(
"Equation system references ASBatchProps/batch_as_props, but coolprop_backend does not provide it."
)
for _nm, _re_pat in (
("FugacitySI", _RE_FUG),
("FugacityCoeffSI", _RE_FUGCOEFF),
("LnFugacityCoeffSI", _RE_LNPHI),
("ChemicalPotentialSI", _RE_CHEMPOT),
):
_fn = getattr(_cp, _nm, None)
if callable(_fn):
params.setdefault(_nm, _fn)
elif _re_pat.search(text):
raise BackendUnavailableError(
f"Equation system references {_nm}, but coolprop_backend does not provide it."
)
# Cantera wrappers are delegated via coolprop_backend contract
if _RE_CT.search(text):
fn_avail = getattr(_cp, "cantera_available", None)
if callable(fn_avail):
params.setdefault("cantera_available", fn_avail)
needs_ct_calls = bool(
re.search(r"\b(?:CTPropsSI|CTPropsMulti|CTBatchProps|ctprops_si|ctprops_multi|batch_ctprops)\s*\(", text)
)
if needs_ct_calls and callable(fn_avail) and not bool(fn_avail()):
raise BackendUnavailableError(
"Equation system references Cantera property functions (CTPropsSI/CTPropsMulti/CTBatchProps), "
"but Cantera is unavailable in this environment. Install with: pip install cantera"
)
fn_si = getattr(_cp, "ctprops_si", None) or getattr(_cp, "CTPropsSI", None)
fn_multi = getattr(_cp, "ctprops_multi", None) or getattr(_cp, "CTPropsMulti", None)
fn_batch = getattr(_cp, "batch_ctprops", None) or getattr(_cp, "CTBatchProps", None)
if callable(fn_si):
params.setdefault("CTPropsSI", fn_si)
params.setdefault("ctprops_si", fn_si)
elif re.search(r"\b(?:CTPropsSI|ctprops_si)\s*\(", text):
raise BackendUnavailableError(
"Equation system references CTPropsSI/ctprops_si, but coolprop_backend does not provide it."
)
if callable(fn_multi):
params.setdefault("CTPropsMulti", fn_multi)
params.setdefault("ctprops_multi", fn_multi)
elif re.search(r"\b(?:CTPropsMulti|ctprops_multi)\s*\(", text):
raise BackendUnavailableError(
"Equation system references CTPropsMulti/ctprops_multi, but coolprop_backend does not provide it."
)
if callable(fn_batch):
params.setdefault("CTBatchProps", fn_batch)
params.setdefault("batch_ctprops", fn_batch)
elif re.search(r"\b(?:CTBatchProps|batch_ctprops)\s*\(", text):
raise BackendUnavailableError(
"Equation system references CTBatchProps/batch_ctprops, but coolprop_backend does not provide it."
)
# Optional cache helpers (safe to inject if referenced)
fn_ci = getattr(_cp, "ctprops_cache_info", None)
fn_cc = getattr(_cp, "clear_ctprops_caches", None)
if callable(fn_ci):
params.setdefault("ctprops_cache_info", fn_ci)
if callable(fn_cc):
params.setdefault("clear_ctprops_caches", fn_cc)
# -------------------- LiBr–H2O engine + helpers --------------------
if _RE_LIBR.search(text):
_lb = _import_first(("librh2o_ashrae_backend", "librh2o_backend"))
if _lb is None:
raise BackendUnavailableError(
"Equation system references LiBr–H2O property functions, but no LiBr backend module "
"could be imported from thermo_props."
)
lb_si = getattr(_lb, "librh2o_props_si", None) or getattr(_lb, "LiBrPropsSI", None)
lb_multi = getattr(_lb, "librh2o_props_multi", None) or getattr(_lb, "LiBrPropsMulti", None)
lb_batch = getattr(_lb, "batch_librh2o_props", None) or getattr(_lb, "LiBrBatchProps", None)
if not callable(lb_si):
raise BackendUnavailableError("LiBr backend imported but does not expose librh2o_props_si.")
params.setdefault("LiBrPropsSI", lb_si)
params.setdefault("LiBrH2OPropsSI", lb_si)
params.setdefault("librh2o_props_si", lb_si)
if callable(lb_multi):
params.setdefault("LiBrPropsMulti", lb_multi)
params.setdefault("librh2o_props_multi", lb_multi)
if callable(lb_batch):
params.setdefault("LiBrBatchProps", lb_batch)
params.setdefault("batch_librh2o_props", lb_batch)
def _lb_try(out_codes: Sequence[str], pairs: Sequence[Tuple[str, Any]]) -> Any:
norm_pairs: List[Tuple[str, Any]] = []
for k, v in pairs:
kk = str(k)
vv: Any = float(v) if isinstance(v, (int, float)) and not isinstance(v, bool) else v
norm_pairs.append((kk, vv))
def _call(out_key: str, use_pairs: Sequence[Tuple[str, Any]]) -> Any:
args: List[Any] = [out_key]
for kk, vv in use_pairs:
args.extend([kk, vv])
return lb_si(*args) # type: ignore[misc]
last_exc: Exception | None = None
for out_key in out_codes:
try:
return _call(out_key, norm_pairs)
except Exception as e:
last_exc = e
if len(norm_pairs) >= 3:
try:
return _call(out_key, norm_pairs[:2])
except Exception as e:
last_exc = e
alt_pairs: List[Tuple[str, Any]] = []
for kk, vv in norm_pairs:
if kk == "X":
alt_pairs.append(("x", vv))
elif kk == "x":
alt_pairs.append(("X", vv))
else:
alt_pairs.append((kk, vv))
if alt_pairs != norm_pairs:
try:
return _call(out_key, alt_pairs)
except Exception as e:
last_exc = e
if len(alt_pairs) >= 3:
try:
return _call(out_key, alt_pairs[:2])
except Exception as e:
last_exc = e
if last_exc is not None:
raise last_exc
raise RuntimeError("LiBr property call failed (no attempts executed).")
if _RE_LIBR_X_TP.search(text):
def LiBrX_TP(T: Any, P: Any) -> Any: # noqa: N802
return _lb_try(out_codes=("X", "x"), pairs=(("T", T), ("P", P)))
params.setdefault("LiBrX_TP", LiBrX_TP)
if _RE_LIBR_H_TX.search(text):
def LiBrH_TX(T: Any, X: Any) -> Any: # noqa: N802
return _lb_try(out_codes=("H", "h"), pairs=(("T", T), ("X", X)))
params.setdefault("LiBrH_TX", LiBrH_TX)
if _RE_LIBR_RHO.search(text):
def LiBrRho_TXP(T: Any, X: Any, P: Any) -> Any: # noqa: N802
return _lb_try(out_codes=("D", "rho", "RHO"), pairs=(("T", T), ("X", X), ("P", P)))
params.setdefault("LiBrRho_TXP", LiBrRho_TXP)
if _RE_LIBR_T_HX.search(text):
def LiBrT_HX(H: Any, X: Any) -> Any: # noqa: N802
return _lb_try(out_codes=("T",), pairs=(("H", H), ("X", X)))
params.setdefault("LiBrT_HX", LiBrT_HX)
# -------------------- NH3–H2O native backend + helpers --------------------
if _RE_NH3H2O.search(text):
ref_names = (
"NH3H2O",
"NH3H2O_STATE",
"NH3H2O_TPX",
"NH3H2O_STATE_TPX",
"NH3H2OPropsSI",
"NH3H2OPropsMulti",
"NH3H2OBatchProps",
"prop_tpx",
"state_tpx",
"props_multi_tpx",
"batch_prop_tpx",
)
has_user_funcs = any(k in params for k in ref_names)
_aw = None
if not has_user_funcs:
_aw = _import_first(("nh3h2o_backend", "nh3h2o_native_backend", "nh3h2o_ik_backend", "nh3h2o_ik93_backend"))
if _aw is None and not has_user_funcs:
raise BackendUnavailableError(
"Equation system references NH3–H2O property calls (NH3H2O*/prop_tpx/state_tpx), "
"but no NH3–H2O backend module could be imported from thermo_props."
)
if _aw is not None:
fn_prop = getattr(_aw, "NH3H2O_TPX", None) or getattr(_aw, "prop_tpx", None) or getattr(_aw, "NH3H2O", None)
fn_state = getattr(_aw, "NH3H2O_STATE_TPX", None) or getattr(_aw, "state_tpx", None) or getattr(_aw, "NH3H2O_STATE", None)
fn_multi = getattr(_aw, "props_multi_tpx", None) or getattr(_aw, "NH3H2OPropsMulti", None)
fn_batch = getattr(_aw, "batch_prop_tpx", None) or getattr(_aw, "NH3H2OBatchProps", None)
fn_propssi = getattr(_aw, "NH3H2OPropsSI", None)
if callable(fn_prop):
params.setdefault("NH3H2O_TPX", fn_prop)
params.setdefault("prop_tpx", fn_prop)
params.setdefault("NH3H2O", fn_prop)
if callable(fn_state):
params.setdefault("NH3H2O_STATE_TPX", fn_state)
params.setdefault("state_tpx", fn_state)
params.setdefault("NH3H2O_STATE", fn_state)
if callable(fn_multi):
params.setdefault("props_multi_tpx", fn_multi)
params.setdefault("NH3H2OPropsMulti", fn_multi)
if callable(fn_batch):
params.setdefault("batch_prop_tpx", fn_batch)
params.setdefault("NH3H2OBatchProps", fn_batch)
if callable(fn_propssi):
params.setdefault("NH3H2OPropsSI", fn_propssi)
missing: List[str] = []
if re.search(r"\bNH3H2O_TPX\s*\(", text) and not callable(params.get("NH3H2O_TPX")):
missing.append("NH3H2O_TPX")
if re.search(r"\bNH3H2O_STATE_TPX\s*\(", text) and not callable(params.get("NH3H2O_STATE_TPX")):
missing.append("NH3H2O_STATE_TPX")
if re.search(r"\bprop_tpx\s*\(", text) and not callable(params.get("prop_tpx")):
missing.append("prop_tpx")
if re.search(r"\bstate_tpx\s*\(", text) and not callable(params.get("state_tpx")):
missing.append("state_tpx")
if re.search(r"\bNH3H2OPropsSI\s*\(", text) and not callable(params.get("NH3H2OPropsSI")):
missing.append("NH3H2OPropsSI")
if missing:
raise BackendUnavailableError(
"NH3–H2O backend imported but missing required callables referenced by equations: "
+ ", ".join(missing)
)
# ------------------------------ public API ------------------------------
def _is_optimize_mapping(m: Mapping[str, Any]) -> bool:
"""Return True if the incoming mapping represents an optimization problem."""
try:
pt = str(m.get("problem_type", "") or "").strip().lower()
except Exception:
pt = ""
if pt == "optimize":
return True
# Back-compat / permissive detection
if "objective" in m or "constraints" in m:
return True
return False
def _mapping_params_to_values(m: Mapping[str, Any]) -> Dict[str, Any]:
"""
Build the evaluation-parameter dict from an interpreter-produced mapping.
The interpreter/build_spec layer typically emits numeric `constants`.
We also accept `params` if the caller provided them directly.
"""
params: Dict[str, Any] = {}
raw_params = m.get("params", None)
if isinstance(raw_params, Mapping):
params.update(dict(raw_params))
raw_constants = m.get("constants", None)
if isinstance(raw_constants, Mapping):
params.update(dict(raw_constants))
return params
def _mapping_vars_to_varlikes(var_items: Sequence[Any]) -> List[_VarLike]:
"""Convert variable entries from interpreter specs into the internal _VarLike shape."""
out: List[_VarLike] = []
# If the mapping already contains VarSpec objects, reuse the existing converter.
if var_items and all(isinstance(v, VarSpec) for v in var_items): # type: ignore[arg-type]
return _vars_to_varlikes(var_items) # type: ignore[arg-type]
def _as_float(x: Any) -> float | None:
if x is None:
return None
try:
return float(x)
except Exception:
return None
for v in var_items:
if isinstance(v, Mapping):
name = str(v.get("name", v.get("var", v.get("id", ""))) or "").strip()
if not name:
continue
kind = str(v.get("kind", "") or ("unknown" if v.get("unknown", False) else "unknown")).strip() or "unknown"
value = _as_float(v.get("value", None))
guess = _as_float(v.get("guess", v.get("init", v.get("x0", None))))
lower = _as_float(v.get("lower", v.get("lb", None)))
upper = _as_float(v.get("upper", v.get("ub", None)))
out.append(_VarLike(name=name, kind=kind, value=value, guess=guess, lower=lower, upper=upper))
elif isinstance(v, VarSpec):
# Single VarSpec among dicts/strings: handle it.
out.extend(_vars_to_varlikes([v]))
elif isinstance(v, str) and v.strip():
out.append(_VarLike(name=v.strip(), kind="unknown", value=None))
else:
# Ignore unknown shapes
continue
return out
[docs]
def solve_optimize(
problem: Mapping[str, Any],
*,
backend: Optional[BackendKind] = None,
method: Optional[str] = None,
tol: Optional[float] = None,
max_iter: Optional[int] = None,
max_restarts: Optional[int] = None,
use_units: Optional[bool] = None,
) -> Solution:
"""
Solve an optimization problem produced by interpreter/build_spec.py.
Expected mapping keys (build_spec output):
- problem_type: "optimize"
- objective: "<expr>"
- sense: "min"|"max"
- constraints: ["<eq/residual>", ...] (residual==0 constraints)
- variables: [...], constants: {...}, solve: {...}
Backend notes:
- Current optimizer implementation is SciPy-based (scipy.optimize.minimize).
- GEKKO optimization is *not* wired here (yet).
"""
if not isinstance(problem, Mapping):
raise TypeError("solve_optimize expects a Mapping[str, Any]")
objective = str(problem.get("objective", "") or "").strip()
if not objective:
raise ValueError("Optimization spec is missing 'objective'.")
sense = str(problem.get("sense", problem.get("objective_sense", "min")) or "min").strip().lower()
if sense not in {"min", "max"}:
raise ValueError(f"Invalid optimization 'sense': {sense!r} (expected 'min' or 'max').")
constraints = problem.get("constraints", None)
if constraints is None:
constraints = problem.get("equations", [])
if not isinstance(constraints, Sequence) or isinstance(constraints, (str, bytes)):
raise TypeError("Optimization spec 'constraints' must be a list of strings.")
constraint_strings = [str(c) for c in constraints if str(c).strip()]
solve_block_raw = problem.get("solve", None)
solve_block_in: Dict[str, Any] = dict(solve_block_raw) if isinstance(solve_block_raw, Mapping) else {}
spec_backend_pref = _normalize_backend_name(
solve_block_in.get("backend", solve_block_in.get("solver", None))
or problem.get("backend", None)
or problem.get("solver", None)
or "auto"
)
spec_method_pref = str(solve_block_in.get("method", None) or problem.get("method", None) or "SLSQP")
spec_tol_pref = float(solve_block_in.get("tol", None) or problem.get("tol", None) or 1e-6)
spec_max_iter_pref = int(
solve_block_in.get("max_iter", solve_block_in.get("maxiter", None))
or problem.get("max_iter", None)
or problem.get("maxiter", None)
or 200
)
spec_max_restarts_pref = int(
solve_block_in.get("max_restarts", solve_block_in.get("restarts", None))
or problem.get("max_restarts", None)
or 0
)
spec_use_units_pref = bool(
solve_block_in.get("use_units", None)
if "use_units" in solve_block_in
else bool(problem.get("use_units", False))
)
eff_backend = _normalize_backend_name(backend) if backend is not None else spec_backend_pref
eff_method = str(method).strip() if method is not None else spec_method_pref
eff_tol = float(tol) if tol is not None else spec_tol_pref
eff_max_iter = int(max_iter) if max_iter is not None else spec_max_iter_pref
eff_max_restarts = int(max_restarts) if max_restarts is not None else spec_max_restarts_pref
eff_use_units = bool(use_units) if use_units is not None else bool(spec_use_units_pref)
# For now, optimization is SciPy-based.
if eff_backend in {"gekko"}:
raise BackendUnavailableError(
"Optimization requested with backend 'gekko', but the optimization API is currently SciPy-based. "
"Use backend='scipy' (or omit backend for auto)."
)
if eff_backend in {"", "none", "auto"}:
eff_backend = "scipy"
# Build evaluation params + inject thermo/property callables as needed.
params = _mapping_params_to_values(problem)
all_exprs = list(constraint_strings) + [objective]
_inject_property_functions(all_exprs, params)
# Optimization always requires SciPy at present.
if not _has_scipy():
raise BackendUnavailableError("Optimization requires SciPy, but SciPy is not installed. Install with: pip install scipy")
# Additional check: if the user used Python-callable thermo functions and explicitly asked for GEKKO, we already rejected.
_ensure_backend_available(eff_backend)
# Forward any remaining solve-block keys to the optimizer implementation.
solve_block_forward = dict(solve_block_in)
_strip_solve_keys(
solve_block_forward,
[
"backend", "solver",
"method",
"tol",
"max_iter", "maxiter",
"max_restarts", "restarts",
"use_units",
],
)
var_items_raw = problem.get("variables", problem.get("vars", []))
# Optimization variables may arrive in two common shapes:
# (A) interpreter/build_spec shape: a list of variable dicts
# e.g. [{"name":"x","guess":0.2,"lower":0,"upper":1}, ...]
# (B) equation-system mapping shape: a mapping name->payload
# e.g. {"x":{"guess":0.2,"lower":0,"upper":1}, "y":{"guess":0.8}}
# Support both for robustness.
if isinstance(var_items_raw, Mapping):
var_items_list: List[Any] = []
for k, v in var_items_raw.items():
nm = str(k).strip()
if not nm:
continue
if isinstance(v, Mapping):
d: Dict[str, Any] = {"name": nm}
d.update(dict(v))
var_items_list.append(d)
else:
var_items_list.append({"name": nm, "guess": v})
var_items: Any = var_items_list
else:
var_items = var_items_raw
if not isinstance(var_items, Sequence) or isinstance(var_items, (str, bytes)):
raise TypeError("Optimization spec 'variables' must be a list or mapping.")
varlikes = _mapping_vars_to_varlikes(list(var_items))
# Lazy import: optimizer module may not exist until you add it (next step).
try:
from .optimizer import solve_optimize as _solve_optimize_impl # type: ignore
except Exception as e:
raise BackendUnavailableError(
"Optimization spec detected, but equations.optimizer is not available yet. "
"Create equations/optimizer.py with a solve_optimize(...) entry point."
) from e
# Duck-typed spec passed downstream (keeps API stable while optimizer evolves)
opt_like = {
"objective": str(objective),
"sense": str(sense),
"constraints": list(constraint_strings),
"variables": varlikes,
"params": params,
"solve": solve_block_forward if solve_block_forward else {},
"backend": str(eff_backend),
"method": str(eff_method),
"tol": float(eff_tol),
"max_iter": int(eff_max_iter),
"max_restarts": int(eff_max_restarts),
"use_units": bool(eff_use_units),
"title": str(problem.get("title", "") or ""),
}
return _solve_optimize_impl(
opt_like,
backend=str(eff_backend),
method=str(eff_method),
tol=float(eff_tol),
max_iter=int(eff_max_iter),
)
[docs]
def solve(
system: Union[EquationSystemSpec, Mapping[str, Any]],
*,
backend: Optional[BackendKind] = None,
method: Optional[str] = None,
tol: Optional[float] = None,
max_iter: Optional[int] = None,
max_restarts: Optional[int] = None,
use_units: Optional[bool] = None,
) -> Solution:
"""Convenience alias that routes equation systems and optimization problems."""
return solve_system(
system,
backend=backend,
method=method,
tol=tol,
max_iter=max_iter,
max_restarts=max_restarts,
use_units=use_units,
)
[docs]
def solve_system(
system: Union[EquationSystemSpec, Mapping[str, Any]],
*,
backend: Optional[BackendKind] = None,
method: Optional[str] = None,
tol: Optional[float] = None,
max_iter: Optional[int] = None,
max_restarts: Optional[int] = None,
use_units: Optional[bool] = None,
) -> Solution:
"""
Solve an EES-ish nonlinear equation system.
Accepts:
- EquationSystemSpec, OR
- a JSON/YAML mapping compatible with spec.system_from_mapping()
Override semantics:
- If an API kwarg is None, we fall back to spec.solve / spec fields / defaults.
- If provided, the API kwarg overrides spec/solve-block.
"""
if isinstance(system, Mapping):
# Optimization routing (build_spec emits problem_type='optimize' + objective/constraints)
if _is_optimize_mapping(system):
return solve_optimize(
system,
backend=backend,
method=method,
tol=tol,
max_iter=max_iter,
max_restarts=max_restarts,
use_units=use_units,
)
spec = system_from_mapping(system)
else:
spec = system
spec.check_square()
solve_block_raw = getattr(spec, "solve", None)
solve_block_in: Dict[str, Any] = dict(solve_block_raw) if isinstance(solve_block_raw, Mapping) else {}
spec_backend_pref = _normalize_backend_name(
solve_block_in.get("backend", solve_block_in.get("solver", None))
or getattr(spec, "backend", None)
or getattr(spec, "solver", None)
or "auto"
)
spec_method_pref = str(solve_block_in.get("method", None) or getattr(spec, "method", None) or "hybr")
spec_tol_pref = float(solve_block_in.get("tol", None) or getattr(spec, "tol", None) or 1e-9)
spec_max_iter_pref = int(
solve_block_in.get("max_iter", solve_block_in.get("maxiter", None))
or getattr(spec, "max_iter", None)
or getattr(spec, "maxiter", None)
or 200
)
spec_max_restarts_pref = int(
solve_block_in.get("max_restarts", solve_block_in.get("restarts", None))
or getattr(spec, "max_restarts", None)
or 2
)
spec_use_units_pref = bool(
solve_block_in.get("use_units", None)
if "use_units" in solve_block_in
else getattr(spec, "use_units", False)
)
eff_backend = _normalize_backend_name(backend) if backend is not None else spec_backend_pref
eff_method = str(method).strip() if method is not None else spec_method_pref
eff_tol = float(tol) if tol is not None else spec_tol_pref
eff_max_iter = int(max_iter) if max_iter is not None else spec_max_iter_pref
eff_max_restarts = int(max_restarts) if max_restarts is not None else spec_max_restarts_pref
eff_use_units = bool(use_units) if use_units is not None else bool(spec_use_units_pref)
units_adapter = _try_units_adapter() if eff_use_units else None
var_base_units, param_base_units = _extract_base_units(spec) if units_adapter is not None else (None, None)
eq_strings = _equations_to_strings(spec.equations)
varlikes = _vars_to_varlikes(spec.vars, units=units_adapter, base_units=var_base_units)
params = _params_to_values(spec.params, units=units_adapter, base_units=param_base_units)
_inject_property_functions(eq_strings, params)
needs_props = _needs_python_property_funcs(eq_strings)
if needs_props:
if eff_backend == "gekko":
raise BackendUnavailableError(
"This equation system uses property-function calls (PropsSI/PhaseSI/HAPropsSI/CTPropsSI/ASPropsSI/LiBr*/NH3H2O*), "
"which require the SciPy backend. GEKKO cannot evaluate these Python-callable thermo functions."
)
if eff_backend == "auto":
eff_backend = "scipy"
if not _has_scipy():
raise BackendUnavailableError(
"This equation system uses property-function calls (PropsSI/PhaseSI/HAPropsSI/CTPropsSI/ASPropsSI/LiBr*/NH3H2O*), "
"but SciPy is not installed. Install with: pip install scipy"
)
_ensure_backend_available(eff_backend)
solve_block_forward = dict(solve_block_in)
_strip_solve_keys(
solve_block_forward,
[
"backend", "solver",
"method",
"tol",
"max_iter", "maxiter",
"max_restarts", "restarts",
"use_units",
],
)
spec_like = _SpecLike(
equations=eq_strings,
variables=varlikes,
params=params,
solve=solve_block_forward if solve_block_forward else {},
backend=eff_backend,
solver=eff_backend,
method=eff_method,
tol=eff_tol,
max_iter=eff_max_iter,
maxiter=eff_max_iter,
max_restarts=eff_max_restarts,
)
from .solver import solve_system as _solve_system_impl # local import
return _solve_system_impl(
spec_like,
backend=str(eff_backend),
method=str(eff_method),
tol=float(eff_tol),
max_iter=int(eff_max_iter),
max_restarts=int(eff_max_restarts),
)
__all__ = [
"BackendUnavailableError",
"BackendKind",
"EquationSystem",
"Param",
"Solution",
"Var",
"solve",
"solve_optimize",
"solve_system",
]