42 lines
1.7 KiB
Python
42 lines
1.7 KiB
Python
"""Minimal local planning core for NovaPlan MVP.
|
|
|
|
This module provides a tiny LocalProblem definition and a naive ADMM-like
|
|
heartbeat to combine local objectives with a shared variable. The implementation
|
|
is intentionally small and deterministic to enable unit tests and demonstrations.
|
|
"""
|
|
from __future__ import annotations
|
|
from typing import Dict, Any
|
|
|
|
class LocalProblem:
|
|
"""A tiny local optimization problem.
|
|
|
|
Attributes:
|
|
id: Unique identifier for the agent.
|
|
objective: A callable that computes a scalar objective given a dict of variables.
|
|
variables: Local decision variables for this agent.
|
|
constraints: Optional dict describing simple bound constraints.
|
|
"""
|
|
|
|
def __init__(self, id: str, objective, variables: Dict[str, float], constraints: Dict[str, Any] | None = None):
|
|
self.id = id
|
|
self.objective = objective
|
|
self.variables = variables
|
|
self.constraints = constraints or {}
|
|
|
|
def evaluate(self, shared_vars: Dict[str, float]) -> float:
|
|
"""Evaluate local objective using local variables and shared_vars."""
|
|
return float(self.objective(self.variables, shared_vars))
|
|
|
|
def simple_admm_step(local: LocalProblem, shared_vars: Dict[str, float], rho: float = 1.0) -> Dict[str, float]:
|
|
"""Perform a toy ADMM-like update step and return updated local variables.
|
|
|
|
This is intentionally simple: we adjust each local variable toward the corresponding
|
|
shared variable with a step proportional to the difference times rho.
|
|
"""
|
|
new_vars = {}
|
|
for k, v in local.variables.items():
|
|
s = shared_vars.get(k, 0.0)
|
|
new_vars[k] = v + rho * (s - v)
|
|
local.variables = new_vars
|
|
return new_vars
|