# equations/optimizer.py
from __future__ import annotations
"""
equations.optimizer
SciPy-based optimization backend for TDPy.
This module is intentionally *small* and *duck-typed*:
- It consumes the "opt_like" mapping built by equations/api.py (solve_optimize()).
- It reuses the same safe expression compiler used by the equation solver:
- Objective: safe_eval.compile_expression()
- Constraints: safe_eval.compile_residual() (treated as equality constraints: residual == 0)
- It evaluates expressions via safe_eval.eval_compiled() with a restricted scope:
SAFE_FUNCS/SAFE_CONSTS + injected callables/constants + current variable values.
Design goals:
- Do not break existing equation-solving behavior (this file is only used when
api.py routes a spec with problem_type == "optimize").
- Be robust to thermo/property-call evaluation errors by applying a penalty
(avoids SciPy blowing up mid-iteration).
- Keep the returned type compatible with solver.py results (SolveResult).
Important limitation (first iteration):
- Inequality constraints like "g(x) <= 0" are NOT supported as raw DSL because
safe_eval disallows comparisons. Use:
- bounds: (preferred) provided by build_spec.py
- OR encode inequality as a smooth penalty in the objective yourself.
"""
from dataclasses import dataclass
import math
from typing import Any, Callable, Dict, List, Mapping, Optional, Sequence, Tuple
from .safe_eval import (
CompiledExpr,
ParseError,
UnsafeExpressionError,
compile_expression,
compile_residual,
eval_compiled,
)
from .solver import SolveResult
def _compile_expression_compat(
expr: str,
*,
extra_funcs: Optional[Mapping[str, Callable[..., Any]]] = None,
extra_consts: Optional[Mapping[str, Any]] = None,
) -> CompiledExpr:
"""
Compatibility wrapper: older safe_eval.compile_expression may not accept
extra_funcs/extra_consts. Try the richest signature first.
"""
try:
return compile_expression(expr, extra_funcs=extra_funcs, extra_consts=extra_consts) # type: ignore[call-arg]
except TypeError:
try:
return compile_expression(expr, extra_funcs=extra_funcs) # type: ignore[call-arg]
except TypeError:
return compile_expression(expr) # type: ignore[call-arg]
def _compile_residual_compat(
expr: str,
*,
extra_funcs: Optional[Mapping[str, Callable[..., Any]]] = None,
extra_consts: Optional[Mapping[str, Any]] = None,
) -> CompiledExpr:
"""
Compatibility wrapper: older safe_eval.compile_residual may not accept
extra_funcs/extra_consts.
"""
try:
return compile_residual(expr, extra_funcs=extra_funcs, extra_consts=extra_consts) # type: ignore[call-arg]
except TypeError:
try:
return compile_residual(expr, extra_funcs=extra_funcs) # type: ignore[call-arg]
except TypeError:
return compile_residual(expr) # type: ignore[call-arg]
def _eval_compiled_compat(
c: CompiledExpr,
*,
values: Optional[Mapping[str, Any]] = None,
params: Optional[Mapping[str, Any]] = None,
extra_funcs: Optional[Mapping[str, Callable[..., Any]]] = None,
extra_consts: Optional[Mapping[str, Any]] = None,
) -> float:
"""
Compatibility wrapper: older safe_eval.eval_compiled may not accept
extra_funcs/extra_consts.
"""
try:
return float(
eval_compiled(
c,
values=values,
params=params,
extra_funcs=extra_funcs,
extra_consts=extra_consts,
)
)
except TypeError:
try:
return float(eval_compiled(c, values=values, params=params, extra_funcs=extra_funcs)) # type: ignore[call-arg]
except TypeError:
return float(eval_compiled(c, values=values, params=params)) # type: ignore[call-arg]
def _is_mapping(x: Any) -> bool:
try:
from collections.abc import Mapping as _Mapping
return isinstance(x, _Mapping)
except Exception: # pragma: no cover
return hasattr(x, "keys") and hasattr(x, "__getitem__")
def _var_field(v: Any, name: str, default: Any = None) -> Any:
"""Duck-typed accessor for VarLike dataclass or dict."""
if v is None:
return default
if hasattr(v, name):
try:
return getattr(v, name)
except Exception:
return default
if isinstance(v, dict):
return v.get(name, default)
return default
def _as_float(x: Any) -> Optional[float]:
if x is None:
return None
try:
if isinstance(x, bool):
return None
return float(x)
except Exception:
return None
def _l2_norm(vals: Sequence[float]) -> float:
return float(math.sqrt(sum(float(v) * float(v) for v in vals)))
def _extract_callable_params(params: Mapping[str, Any]) -> Dict[str, Callable[..., Any]]:
"""
Anything callable in params can be treated as extra_funcs for compilation
and evaluation. This lets safe_eval validate function names that are only
provided at runtime (e.g., PropsSI wrappers), without expanding the global
allowlist.
"""
out: Dict[str, Callable[..., Any]] = {}
for k, v in params.items():
if callable(v):
out[str(k)] = v # type: ignore[assignment]
return out
def _extract_extra_consts(params: Mapping[str, Any]) -> Dict[str, Any]:
"""
Non-callable injected names (strings, numbers) can be passed as extra_consts
to help name extraction. For evaluation, we also pass params directly.
"""
out: Dict[str, Any] = {}
for k, v in params.items():
if callable(v):
continue
out[str(k)] = v
return out
@dataclass(frozen=True)
class _CompiledOpt:
objective: CompiledExpr
sense: str
constraints: List[CompiledExpr]
unknown_names: List[str]
bounds: List[Tuple[Optional[float], Optional[float]]]
x0: List[float]
def _compile_problem(problem: Mapping[str, Any]) -> _CompiledOpt:
objective = str(problem.get("objective", "") or "").strip()
if not objective:
raise ValueError("Optimization spec is missing 'objective'.")
sense = str(problem.get("sense", "min") or "min").strip().lower()
if sense not in {"min", "max"}:
raise ValueError(f"Invalid optimization sense: {sense!r}")
params = problem.get("params", {})
if not _is_mapping(params):
params = {}
params = dict(params) # type: ignore[arg-type]
variables = problem.get("variables", [])
if not isinstance(variables, Sequence) or isinstance(variables, (str, bytes)):
raise TypeError("Optimization spec 'variables' must be a list of var-like objects.")
unknowns: List[Any] = []
for v in variables:
unk = _var_field(v, "unknown", None)
kind = str(_var_field(v, "kind", "") or "").lower()
val = _var_field(v, "value", None)
is_unknown = False
if isinstance(unk, bool):
is_unknown = bool(unk)
elif kind:
is_unknown = kind == "unknown"
else:
is_unknown = val is None
if is_unknown:
unknowns.append(v)
if not unknowns:
unknowns = list(variables)
unknown_names = [str(_var_field(v, "name", "") or "").strip() for v in unknowns]
if any(not n for n in unknown_names):
raise ValueError("One or more optimization variables are missing a valid 'name'.")
bounds: List[Tuple[Optional[float], Optional[float]]] = []
x0: List[float] = []
for v in unknowns:
lo = _as_float(_var_field(v, "lower", None))
hi = _as_float(_var_field(v, "upper", None))
bounds.append((lo, hi))
g = _as_float(_var_field(v, "guess", None))
if g is None:
g = _as_float(_var_field(v, "value", None))
if g is None:
g = 1.0
if lo is not None and g < lo:
g = lo
if hi is not None and g > hi:
g = hi
x0.append(float(g))
constraints_in = problem.get("constraints", [])
if constraints_in is None:
constraints_in = []
if not isinstance(constraints_in, Sequence) or isinstance(constraints_in, (str, bytes)):
raise TypeError("Optimization spec 'constraints' must be a list of strings.")
constraints_str = [str(s) for s in constraints_in if str(s).strip()]
extra_funcs = _extract_callable_params(params)
extra_consts = _extract_extra_consts(params)
try:
c_obj = _compile_expression_compat(objective, extra_funcs=extra_funcs, extra_consts=extra_consts)
except (UnsafeExpressionError, ParseError) as e:
raise ParseError(f"Objective expression could not be compiled: {objective!r}: {e}") from e
c_cons: List[CompiledExpr] = []
for s in constraints_str:
try:
c = _compile_residual_compat(s, extra_funcs=extra_funcs, extra_consts=extra_consts)
except (UnsafeExpressionError, ParseError) as e:
raise ParseError(f"Constraint could not be compiled: {s!r}: {e}") from e
c_cons.append(c)
return _CompiledOpt(
objective=c_obj,
sense=sense,
constraints=c_cons,
unknown_names=unknown_names,
bounds=bounds,
x0=x0,
)
@dataclass
class _PenaltyCounter:
obj_penalty: int = 0
cons_penalty: int = 0
def _make_eval_context(
params: Mapping[str, Any],
unknown_names: Sequence[str],
x: Sequence[float],
) -> Dict[str, Any]:
values = {str(k): float(v) for k, v in zip(unknown_names, x)}
return {"values": values, "params": dict(params)}
def _eval_objective(
compiled: CompiledExpr,
*,
sense: str,
params: Mapping[str, Any],
unknown_names: Sequence[str],
x: Sequence[float],
penalty_value: float,
counter: _PenaltyCounter,
) -> float:
ctx = _make_eval_context(params, unknown_names, x)
try:
y = float(_eval_compiled_compat(compiled, values=ctx["values"], params=ctx["params"]))
except Exception:
counter.obj_penalty += 1
y = float(penalty_value)
if sense == "max":
return -y
return y
def _eval_constraints(
compiled_list: Sequence[CompiledExpr],
*,
params: Mapping[str, Any],
unknown_names: Sequence[str],
x: Sequence[float],
penalty_value: float,
counter: _PenaltyCounter,
) -> List[float]:
if not compiled_list:
return []
ctx = _make_eval_context(params, unknown_names, x)
out: List[float] = []
for c in compiled_list:
try:
r = float(_eval_compiled_compat(c, values=ctx["values"], params=ctx["params"]))
except Exception:
counter.cons_penalty += 1
r = float(penalty_value)
out.append(r)
return out
[docs]
def solve_optimize(
problem: Mapping[str, Any],
*,
backend: Optional[str] = None,
method: Optional[str] = None,
tol: Optional[float] = None,
max_iter: Optional[int] = None,
) -> SolveResult:
"""
Solve an optimization problem.
Parameters are passed through by equations/api.py, but this function also
reads defaults from the `problem` mapping itself.
Expected keys in `problem` (duck-typed; see equations/api.py solve_optimize):
- objective: str
- sense: "min" | "max"
- constraints: list[str] # treated as equality residuals == 0
- variables: list[var-like] # each has name/kind/value/guess/lower/upper
- params: dict[str, Any] # numeric constants, symbols, and property callables
- tol, max_iter, method (optional)
Returns:
SolveResult (same shape as equations/solver.py).
"""
try:
import numpy as np # type: ignore
from scipy.optimize import Bounds, minimize # type: ignore
except Exception as e:
raise RuntimeError(
"SciPy is required for optimization (scipy.optimize.minimize). "
"Install scipy to use equations.optimizer."
) from e
method_eff = str(method or problem.get("method", "") or "SLSQP").strip()
tol_eff = float(tol if tol is not None else problem.get("tol", 1e-8))
max_iter_eff = int(max_iter if max_iter is not None else problem.get("max_iter", 200))
penalty_value = float(problem.get("thermo_penalty", 1e20))
compiled = _compile_problem(problem)
params = problem.get("params", {})
params = dict(params) if _is_mapping(params) else {}
lo = [(-np.inf if b[0] is None else float(b[0])) for b in compiled.bounds]
hi = [(np.inf if b[1] is None else float(b[1])) for b in compiled.bounds]
bounds = Bounds(lo, hi)
counter = _PenaltyCounter()
def fun(x: "np.ndarray") -> float:
return _eval_objective(
compiled.objective,
sense=compiled.sense,
params=params,
unknown_names=compiled.unknown_names,
x=list(map(float, x)),
penalty_value=penalty_value,
counter=counter,
)
scipy_constraints: List[Dict[str, Any]] = []
if compiled.constraints:
for c in compiled.constraints:
def _make_fun(ci: CompiledExpr) -> Callable[["np.ndarray"], float]:
def _f(x: "np.ndarray") -> float:
rr = _eval_constraints(
[ci],
params=params,
unknown_names=compiled.unknown_names,
x=list(map(float, x)),
penalty_value=penalty_value,
counter=counter,
)
return float(rr[0])
return _f
scipy_constraints.append({"type": "eq", "fun": _make_fun(c)})
method_supports_constraints = {"SLSQP", "trust-constr", "COBYLA"}
method_supports_bounds = {"SLSQP", "trust-constr", "L-BFGS-B", "TNC", "Powell"}
method_used = method_eff
forced_fallback_note = None
if scipy_constraints and method_used not in method_supports_constraints:
forced_fallback_note = f"Requested method {method_used!r} does not support equality constraints; using 'SLSQP'."
method_used = "SLSQP"
if (any(b[0] is not None or b[1] is not None for b in compiled.bounds)) and method_used not in method_supports_bounds:
forced_fallback_note = (forced_fallback_note or "") + f" Requested method {method_eff!r} does not support bounds; using 'SLSQP'."
method_used = "SLSQP"
max_restarts_eff = int(problem.get("max_restarts", 0) or 0)
x0_base = np.array(compiled.x0, dtype=float)
best = None
best_meta = None
def _run_one(x0: "np.ndarray", run_ix: int) -> Tuple[Any, Dict[str, Any]]:
res = minimize(
fun,
x0,
method=method_used,
bounds=bounds,
constraints=scipy_constraints,
tol=tol_eff,
options={"maxiter": max_iter_eff},
)
x_star = np.array(res.x, dtype=float)
cons_vals = _eval_constraints(
compiled.constraints,
params=params,
unknown_names=compiled.unknown_names,
x=list(map(float, x_star)),
penalty_value=penalty_value,
counter=counter,
)
cons_norm = _l2_norm(cons_vals)
cons_maxabs = max((abs(v) for v in cons_vals), default=0.0)
meta = {
"scipy": {
"success": bool(getattr(res, "success", False)),
"status": int(getattr(res, "status", -1)),
"message": str(getattr(res, "message", "")),
"nit": int(getattr(res, "nit", 0) or 0),
"nfev": int(getattr(res, "nfev", 0) or 0),
"njev": int(getattr(res, "njev", 0) or 0),
"method": str(method_used),
"fun": float(getattr(res, "fun", float("nan"))),
},
"run_ix": int(run_ix),
"constraint_norm": float(cons_norm),
"constraint_maxabs": float(cons_maxabs),
"penalties": {
"objective": int(counter.obj_penalty),
"constraints": int(counter.cons_penalty),
},
}
if forced_fallback_note:
meta["note"] = forced_fallback_note
return res, meta
res0, meta0 = _run_one(x0_base, 0)
best = res0
best_meta = meta0
for k in range(1, max_restarts_eff + 1):
sgn = -1.0 if (k % 2 == 0) else 1.0
scale = 1.0 + sgn * min(0.10 * k, 0.5)
x0 = x0_base * scale
x0 = np.where(np.abs(x0) < 1e-12, x0_base + sgn * (0.1 * k), x0)
x0 = np.minimum(np.maximum(x0, bounds.lb), bounds.ub)
res_k, meta_k = _run_one(x0, k)
choose_new = False
if bool(getattr(res_k, "success", False)) and not bool(getattr(best, "success", False)):
choose_new = True
elif bool(getattr(res_k, "success", False)) == bool(getattr(best, "success", False)):
fk = float(getattr(res_k, "fun", float("inf")))
fb = float(getattr(best, "fun", float("inf")))
ck = float(meta_k.get("constraint_norm", float("inf")))
cb = float(best_meta.get("constraint_norm", float("inf"))) if isinstance(best_meta, dict) else float("inf")
if ck < cb - 1e-12:
choose_new = True
elif abs(ck - cb) <= 1e-12 and fk < fb:
choose_new = True
if choose_new:
best = res_k
best_meta = meta_k
x_best = list(map(float, getattr(best, "x", compiled.x0)))
vars_out = {name: float(val) for name, val in zip(compiled.unknown_names, x_best)}
residuals = _eval_constraints(
compiled.constraints,
params=params,
unknown_names=compiled.unknown_names,
x=x_best,
penalty_value=penalty_value,
counter=counter,
)
residual_norm = _l2_norm(residuals)
msg = str(getattr(best, "message", ""))
ok = bool(getattr(best, "success", False))
nfev = int(getattr(best, "nfev", 0) or 0)
meta: Dict[str, Any] = {
"objective_value": float(getattr(best, "fun", float("nan"))),
"sense": str(compiled.sense),
"unknown_names": list(compiled.unknown_names),
"bounds": [
[None if lo is None else float(lo), None if hi is None else float(hi)]
for lo, hi in compiled.bounds
],
"requested_backend": str(backend or problem.get("backend", "scipy") or "scipy"),
"requested_method": str(method or problem.get("method", "") or method_eff),
"backend": "scipy",
"method": str(method_used),
"penalty_value": float(penalty_value),
}
if isinstance(best_meta, dict):
meta.update(best_meta)
return SolveResult(
ok=ok,
backend="scipy",
method=str(method_used),
message=msg,
nfev=nfev,
variables=vars_out,
residuals=[float(r) for r in residuals],
residual_norm=float(residual_norm),
meta=meta,
)
__all__ = ["solve_optimize"]