catopt-grid-category-theore.../catopt_grid/admm_lite.py

105 lines
3.8 KiB
Python

import numpy as _np
from typing import List, Tuple, Optional
class LocalProblem:
"""A minimal local optimization problem of the form:
min_x 0.5 x^T Q x + b^T x subject to lb <= x <= ub
which is convex if Q is positive semidefinite.
"""
def __init__(
self,
n: int,
Q: _np.ndarray,
b: _np.ndarray,
lb: Optional[_np.ndarray] = None,
ub: Optional[_np.ndarray] = None,
name: Optional[str] = None,
) -> None:
self.n = int(n)
self.Q = _np.asarray(Q).reshape((self.n, self.n))
self.b = _np.asarray(b).reshape((self.n,))
self.lb = _np.asarray(lb).reshape((self.n,)) if lb is not None else _np.full((self.n,), -_np.inf)
self.ub = _np.asarray(ub).reshape((self.n,)) if ub is not None else _np.full((self.n,), _np.inf)
self.name = name or f"LocalProblem_{self.n}d"
def __repr__(self) -> str:
return f"LocalProblem(name={self.name}, n={self.n})"
class SharedVariable:
"""Represents a global/shared variable (consensus variable) across agents."""
def __init__(self, dim: int):
self.dim = int(dim)
self.value = _np.zeros(self.dim)
def __repr__(self) -> str:
return f"SharedVariable(dim={self.dim})"
class ADMMLiteSolver:
"""A lightweight, ADMM-like solver for separable quadratic problems.
Assumes all LocalProblem instances share the same x dimension. Each agent
solves its own quadratic subproblem with a quadratic penalty coupling term
to the global consensus z. The updates are:
x_i = (Q_i + rho I)^{-1} [ rho*(z - u_i) - b_i ]
z = average_i ( x_i + u_i )
u_i = u_i + x_i - z
and then x_i is clipped to [lb_i, ub_i].
"""
def __init__(self, problems: List[LocalProblem], rho: float = 1.0, max_iter: int = 100, tol: float = 1e-4):
if not problems:
raise ValueError("ADMMLiteSolver requires at least one LocalProblem")
self.problems = problems
self.rho = float(rho)
self.max_iter = int(max_iter)
self.tol = float(tol)
# Validate dimensions and prepare internal structures
self._validate_dimensions()
def _validate_dimensions(self) -> None:
n0 = self.problems[0].n
for p in self.problems:
if p.n != n0:
raise ValueError("All LocalProblem instances must have the same dimension n")
self.n = n0
def solve(self, initial_z: _np.ndarray | None = None) -> Tuple[_np.ndarray, List[_np.ndarray], List[_np.ndarray]]:
N = len(self.problems)
if initial_z is None:
z = _np.zeros(self.n)
else:
z = _np.asarray(initial_z).reshape((self.n,))
us: List[_np.ndarray] = [_np.zeros(self.n) for _ in range(N)]
xs: List[_np.ndarray] = [_np.zeros(self.n) for _ in range(N)]
# Precompute cholesky-like solve factors if Q is diagonalizable; for general Q we'll use numpy.solve
for it in range(self.max_iter):
max_diff = 0.0
# x-update per agent
for i, lp in enumerate(self.problems):
M = lp.Q + self.rho * _np.eye(self.n)
rhs = self.rho * (z - us[i]) - lp.b
xi = _np.linalg.solve(M, rhs)
# apply simple bound projection
xi = _np.minimum(_np.maximum(xi, lp.lb), lp.ub)
xs[i] = xi
diff = _np.linalg.norm(xi - z)
if diff > max_diff:
max_diff = diff
# z-update (consensus)
z_new = sum((xs[i] + us[i]) for i in range(N)) / N
# dual update
for i in range(N):
us[i] = us[i] + xs[i] - z_new
z = z_new
if max_diff < self.tol:
break
return z, xs, us