"""Minimal local planning core for NovaPlan MVP. This module provides a tiny LocalProblem definition and a naive ADMM-like heartbeat to combine local objectives with a shared variable. The implementation is intentionally small and deterministic to enable unit tests and demonstrations. """ from __future__ import annotations from typing import Dict, Any import time from nova_plan.contracts import PlanDelta class LocalProblem: """A tiny local optimization problem. Attributes: id: Unique identifier for the agent. objective: A callable that computes a scalar objective given a dict of variables. variables: Local decision variables for this agent. constraints: Optional dict describing simple bound constraints. """ def __init__(self, id: str, objective, variables: Dict[str, float], constraints: Dict[str, Any] | None = None): self.id = id self.objective = objective self.variables = variables self.constraints = constraints or {} def evaluate(self, shared_vars: Dict[str, float]) -> float: """Evaluate local objective using local variables and shared_vars.""" return float(self.objective(self.variables, shared_vars)) def simple_admm_step(local: LocalProblem, shared_vars: Dict[str, float], rho: float = 1.0) -> Dict[str, float]: """Perform a toy ADMM-like update step and return updated local variables. This is intentionally simple: we adjust each local variable toward the corresponding shared variable with a step proportional to the difference times rho. """ new_vars = {} for k, v in local.variables.items(): s = shared_vars.get(k, 0.0) new_vars[k] = v + rho * (s - v) local.variables = new_vars return new_vars def delta_sync(local: LocalProblem, shared_vars: Dict[str, float], agent_id: str | None = None, rho: float = 1.0) -> PlanDelta: """Create a PlanDelta representing the current local delta with shared_vars. This is a lightweight helper to bootstrap delta exchange between agents in an ADMM-like loop. It mirrors the minimal API used by tests and MVP demos, returning a serializable PlanDelta. """ delta = {} for k, v in local.variables.items(): s = shared_vars.get(k, 0.0) delta[k] = v - s # Optionally, we could apply a small adjustment here using rho if desired. local_val = local.variables.get(k, 0.0) shared_val = shared_vars.get(k, 0.0) # Apply a tiny step toward the shared value to reflect a one-pass update. local.variables[k] = local_val + rho * (shared_val - local_val) # Build PlanDelta for governance/traceability ts = time.time() return PlanDelta(agent_id=agent_id or getattr(local, "id", "unknown"), delta=delta, timestamp=ts)