78 lines
2.8 KiB
Python
78 lines
2.8 KiB
Python
import time
|
|
try:
|
|
from core.models import LocalRobotPlan, SharedSignals
|
|
from core.registry import GraphOfContracts
|
|
from core.ledger import AuditLog
|
|
from core.solver import coordinate_fleets
|
|
except ModuleNotFoundError:
|
|
# Fallback for environments where the repo root isn't on PYTHONPATH.
|
|
import importlib.util
|
|
import os
|
|
import sys
|
|
|
|
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
|
core_dir = os.path.join(base_dir, "core")
|
|
|
|
def _load_module(name, path):
|
|
spec = importlib.util.spec_from_file_location(name, path)
|
|
assert spec is not None
|
|
m = importlib.util.module_from_spec(spec)
|
|
# Ensure the module is discoverable in sys.modules prior to execution
|
|
sys.modules[name] = m
|
|
spec.loader.exec_module(m) # type: ignore
|
|
return m
|
|
|
|
core_models = _load_module("core.models", os.path.join(core_dir, "models.py"))
|
|
LocalRobotPlan = core_models.LocalRobotPlan
|
|
SharedSignals = core_models.SharedSignals
|
|
|
|
core_registry = _load_module("core.registry", os.path.join(core_dir, "registry.py"))
|
|
GraphOfContracts = core_registry.GraphOfContracts
|
|
|
|
core_ledger = _load_module("core.ledger", os.path.join(core_dir, "ledger.py"))
|
|
AuditLog = core_ledger.AuditLog
|
|
|
|
core_solver = _load_module("core.solver", os.path.join(core_dir, "solver.py"))
|
|
coordinate_fleets = core_solver.coordinate_fleets
|
|
|
|
|
|
def test_two_fleets_cross_exchange_basic():
|
|
# Create two local plans representing two fleets with two robots
|
|
plan_a = LocalRobotPlan(
|
|
fleet_id="fleet-A",
|
|
robot_id="robot-1",
|
|
tasks=["pickup", "deliver"],
|
|
path=[{"x": 0.0, "y": 0.0}, {"x": 1.0, "y": 1.0}],
|
|
objectives={"energy": 0.5},
|
|
)
|
|
plan_b = LocalRobotPlan(
|
|
fleet_id="fleet-B",
|
|
robot_id="robot-2",
|
|
tasks=["inspect"],
|
|
path=[{"x": 0.0, "y": 0.0}, {"x": -1.0, "y": 2.0}],
|
|
objectives={"energy": 0.3},
|
|
)
|
|
|
|
registry = GraphOfContracts()
|
|
signals = {"energy": 0.4}
|
|
registry.register_signal("contract-1", SharedSignals(signals=signals))
|
|
|
|
# Use solver to coordinate; should return a PlanDelta-like object via the helper
|
|
# Since coordinate_fleets is async, run it in event loop
|
|
import asyncio
|
|
|
|
async def run():
|
|
from core.models import SharedSignals
|
|
shared = SharedSignals(signals={"energy": 0.4})
|
|
delta = await coordinate_fleets(plan_a, plan_b, shared)
|
|
return delta
|
|
|
|
delta = asyncio.get_event_loop().run_until_complete(run())
|
|
assert delta.fleet_id == plan_a.fleet_id
|
|
assert isinstance(delta.changes, dict)
|
|
# New metadata should be present and well-formed
|
|
assert isinstance(delta.version, int)
|
|
assert delta.version >= 1
|
|
assert isinstance(delta.nonce, str) and len(delta.nonce) > 0
|
|
assert isinstance(delta.timestamp, float)
|