idea131-fleetopt-verifiable.../tests/test_fleetopt.py

73 lines
2.6 KiB
Python

import time
try:
from core.models import LocalRobotPlan, SharedSignals
from core.registry import GraphOfContracts
from core.ledger import AuditLog
from core.solver import coordinate_fleets
except ModuleNotFoundError:
# Fallback for environments where the repo root isn't on PYTHONPATH.
import importlib.util
import os
import sys
base_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
core_dir = os.path.join(base_dir, "core")
def _load_module(name, path):
spec = importlib.util.spec_from_file_location(name, path)
assert spec is not None
m = importlib.util.module_from_spec(spec)
# Ensure the module is discoverable in sys.modules prior to execution
sys.modules[name] = m
spec.loader.exec_module(m) # type: ignore
return m
core_models = _load_module("core.models", os.path.join(core_dir, "models.py"))
LocalRobotPlan = core_models.LocalRobotPlan
SharedSignals = core_models.SharedSignals
core_registry = _load_module("core.registry", os.path.join(core_dir, "registry.py"))
GraphOfContracts = core_registry.GraphOfContracts
core_ledger = _load_module("core.ledger", os.path.join(core_dir, "ledger.py"))
AuditLog = core_ledger.AuditLog
core_solver = _load_module("core.solver", os.path.join(core_dir, "solver.py"))
coordinate_fleets = core_solver.coordinate_fleets
def test_two_fleets_cross_exchange_basic():
# Create two local plans representing two fleets with two robots
plan_a = LocalRobotPlan(
fleet_id="fleet-A",
robot_id="robot-1",
tasks=["pickup", "deliver"],
path=[{"x": 0.0, "y": 0.0}, {"x": 1.0, "y": 1.0}],
objectives={"energy": 0.5},
)
plan_b = LocalRobotPlan(
fleet_id="fleet-B",
robot_id="robot-2",
tasks=["inspect"],
path=[{"x": 0.0, "y": 0.0}, {"x": -1.0, "y": 2.0}],
objectives={"energy": 0.3},
)
registry = GraphOfContracts()
signals = {"energy": 0.4}
registry.register_signal("contract-1", SharedSignals(signals=signals))
# Use solver to coordinate; should return a PlanDelta-like object via the helper
# Since coordinate_fleets is async, run it in event loop
import asyncio
async def run():
from core.models import SharedSignals
shared = SharedSignals(signals={"energy": 0.4})
delta = await coordinate_fleets(plan_a, plan_b, shared)
return delta
delta = asyncio.get_event_loop().run_until_complete(run())
assert delta.fleet_id == plan_a.fleet_id
assert isinstance(delta.changes, dict)