44 lines
1.7 KiB
Python
44 lines
1.7 KiB
Python
import time
|
|
from datetime import datetime
|
|
|
|
from robotproof_studio.contracts import LocalProblem, SharedSignals, PlanDelta, DualVariables
|
|
from robotproof_studio.admm import AdmmCoordinator
|
|
from robotproof_studio.go_registry import GraphOfContractsRegistry
|
|
|
|
|
|
def test_contracts_basic():
|
|
lp = LocalProblem(robot_id="robot-1", problem_type="path/planning", payload={"energy_budget": 10.0, "time_window": [0, 100]})
|
|
sv = SharedSignals(version=1, signals={"obs": []})
|
|
pd = PlanDelta(delta_id="d1", timestamp=datetime.utcnow().isoformat() + "Z", data={"step": 1})
|
|
pd.sign("tester")
|
|
div = DualVariables(coupling={"energy": 1.0, "time": 1.0})
|
|
|
|
assert lp.robot_id == "robot-1"
|
|
assert lp.version == 1
|
|
assert sv.version == 1
|
|
assert pd.proof.startswith("signed-by:")
|
|
assert div.coupling["energy"] == 1.0
|
|
|
|
|
|
def test_admm_coordinator_basic():
|
|
lps = [
|
|
LocalProblem(robot_id="r1", problem_type="path", payload={"energy_budget": 5.0}),
|
|
LocalProblem(robot_id="r2", problem_type="path", payload={"energy_budget": 7.0}),
|
|
]
|
|
dual = DualVariables(coupling={"energy": 0.5, "time": 0.5})
|
|
coord = AdmmCoordinator(lps, dual)
|
|
delta, sigs = coord.iterate()
|
|
assert isinstance(delta, PlanDelta)
|
|
assert delta.proof.startswith("signed-by:")
|
|
assert isinstance(sigs, type(SharedSignals(version=1, signals={}) ))
|
|
# dual should have updated at least once
|
|
assert dual.version >= 1
|
|
|
|
|
|
def test_registry_basic():
|
|
reg = GraphOfContractsRegistry()
|
|
reg.register("ros2-navigator", kind="adapter", metadata={"binds_to": "ROS2"})
|
|
reg.register("local-problem-contract", kind="contract")
|
|
entries = reg.list()
|
|
assert len(entries) == 2
|