novaplan-decentralized-priv.../nova_plan/planner.py

65 lines
2.7 KiB
Python

"""Minimal local planning core for NovaPlan MVP.
This module provides a tiny LocalProblem definition and a naive ADMM-like
heartbeat to combine local objectives with a shared variable. The implementation
is intentionally small and deterministic to enable unit tests and demonstrations.
"""
from __future__ import annotations
from typing import Dict, Any
import time
from nova_plan.contracts import PlanDelta
class LocalProblem:
"""A tiny local optimization problem.
Attributes:
id: Unique identifier for the agent.
objective: A callable that computes a scalar objective given a dict of variables.
variables: Local decision variables for this agent.
constraints: Optional dict describing simple bound constraints.
"""
def __init__(self, id: str, objective, variables: Dict[str, float], constraints: Dict[str, Any] | None = None):
self.id = id
self.objective = objective
self.variables = variables
self.constraints = constraints or {}
def evaluate(self, shared_vars: Dict[str, float]) -> float:
"""Evaluate local objective using local variables and shared_vars."""
return float(self.objective(self.variables, shared_vars))
def simple_admm_step(local: LocalProblem, shared_vars: Dict[str, float], rho: float = 1.0) -> Dict[str, float]:
"""Perform a toy ADMM-like update step and return updated local variables.
This is intentionally simple: we adjust each local variable toward the corresponding
shared variable with a step proportional to the difference times rho.
"""
new_vars = {}
for k, v in local.variables.items():
s = shared_vars.get(k, 0.0)
new_vars[k] = v + rho * (s - v)
local.variables = new_vars
return new_vars
def delta_sync(local: LocalProblem, shared_vars: Dict[str, float], agent_id: str | None = None, rho: float = 1.0) -> PlanDelta:
"""Create a PlanDelta representing the current local delta with shared_vars.
This is a lightweight helper to bootstrap delta exchange between agents
in an ADMM-like loop. It mirrors the minimal API used by tests and MVP
demos, returning a serializable PlanDelta.
"""
delta = {}
for k, v in local.variables.items():
s = shared_vars.get(k, 0.0)
delta[k] = v - s
# Optionally, we could apply a small adjustment here using rho if desired.
local_val = local.variables.get(k, 0.0)
shared_val = shared_vars.get(k, 0.0)
# Apply a tiny step toward the shared value to reflect a one-pass update.
local.variables[k] = local_val + rho * (shared_val - local_val)
# Build PlanDelta for governance/traceability
ts = time.time()
return PlanDelta(agent_id=agent_id or getattr(local, "id", "unknown"), delta=delta, timestamp=ts)