28 lines
1.0 KiB
Python
28 lines
1.0 KiB
Python
"""Tiny rover adapter stub for NovaPlan MVP.
|
|
|
|
Demonstrates how a rover planner could expose a LocalProblem and consume
|
|
shared signals via delta-sync, returning a PlanDelta for governance.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from nova_plan.planner import LocalProblem, simple_admm_step, delta_sync
|
|
from nova_plan.contracts import PlanDelta
|
|
|
|
|
|
def make_local_problem() -> LocalProblem:
|
|
def objective(local_vars, shared_vars):
|
|
# Simple objective: prefer lower local values while considering shareds
|
|
return sum(local_vars.values()) - 0.5 * sum(shared_vars.values())
|
|
|
|
return LocalProblem(id="rover-1", objective=objective, variables={"x": 1.0, "y": 0.5})
|
|
|
|
|
|
class RoverAdapter:
|
|
def __init__(self):
|
|
self.local = make_local_problem()
|
|
|
|
def step(self, shared_vars: dict[str, float]) -> PlanDelta:
|
|
# Perform a tiny ADMM-like step and emit a delta reflecting local changes
|
|
simple_admm_step(self.local, shared_vars, rho=1.0)
|
|
return delta_sync(self.local, shared_vars, agent_id=self.local.id, rho=1.0)
|