65 lines
2.7 KiB
Python
65 lines
2.7 KiB
Python
"""Minimal local planning core for NovaPlan MVP.
|
|
|
|
This module provides a tiny LocalProblem definition and a naive ADMM-like
|
|
heartbeat to combine local objectives with a shared variable. The implementation
|
|
is intentionally small and deterministic to enable unit tests and demonstrations.
|
|
"""
|
|
from __future__ import annotations
|
|
from typing import Dict, Any
|
|
import time
|
|
from nova_plan.contracts import PlanDelta
|
|
|
|
class LocalProblem:
|
|
"""A tiny local optimization problem.
|
|
|
|
Attributes:
|
|
id: Unique identifier for the agent.
|
|
objective: A callable that computes a scalar objective given a dict of variables.
|
|
variables: Local decision variables for this agent.
|
|
constraints: Optional dict describing simple bound constraints.
|
|
"""
|
|
|
|
def __init__(self, id: str, objective, variables: Dict[str, float], constraints: Dict[str, Any] | None = None):
|
|
self.id = id
|
|
self.objective = objective
|
|
self.variables = variables
|
|
self.constraints = constraints or {}
|
|
|
|
def evaluate(self, shared_vars: Dict[str, float]) -> float:
|
|
"""Evaluate local objective using local variables and shared_vars."""
|
|
return float(self.objective(self.variables, shared_vars))
|
|
|
|
def simple_admm_step(local: LocalProblem, shared_vars: Dict[str, float], rho: float = 1.0) -> Dict[str, float]:
|
|
"""Perform a toy ADMM-like update step and return updated local variables.
|
|
|
|
This is intentionally simple: we adjust each local variable toward the corresponding
|
|
shared variable with a step proportional to the difference times rho.
|
|
"""
|
|
new_vars = {}
|
|
for k, v in local.variables.items():
|
|
s = shared_vars.get(k, 0.0)
|
|
new_vars[k] = v + rho * (s - v)
|
|
local.variables = new_vars
|
|
return new_vars
|
|
|
|
def delta_sync(local: LocalProblem, shared_vars: Dict[str, float], agent_id: str | None = None, rho: float = 1.0) -> PlanDelta:
|
|
"""Create a PlanDelta representing the current local delta with shared_vars.
|
|
|
|
This is a lightweight helper to bootstrap delta exchange between agents
|
|
in an ADMM-like loop. It mirrors the minimal API used by tests and MVP
|
|
demos, returning a serializable PlanDelta.
|
|
"""
|
|
delta = {}
|
|
for k, v in local.variables.items():
|
|
s = shared_vars.get(k, 0.0)
|
|
delta[k] = v - s
|
|
# Optionally, we could apply a small adjustment here using rho if desired.
|
|
local_val = local.variables.get(k, 0.0)
|
|
shared_val = shared_vars.get(k, 0.0)
|
|
# Apply a tiny step toward the shared value to reflect a one-pass update.
|
|
local.variables[k] = local_val + rho * (shared_val - local_val)
|
|
|
|
# Build PlanDelta for governance/traceability
|
|
ts = time.time()
|
|
return PlanDelta(agent_id=agent_id or getattr(local, "id", "unknown"), delta=delta, timestamp=ts)
|