44 lines
1.4 KiB
Python
44 lines
1.4 KiB
Python
import time
|
|
from src.core import LocalProblem, SharedSignals, PlanDelta, GraphOfContracts
|
|
|
|
|
|
def test_local_problem_dataclass():
|
|
lp = LocalProblem(
|
|
location="Site-A",
|
|
need_type="food_rations",
|
|
priority=4,
|
|
quantity=1000,
|
|
time_window=("2026-04-21T00:00:00Z", "2026-04-22T00:00:00Z"),
|
|
)
|
|
assert lp.location == "Site-A"
|
|
assert lp.need_type == "food_rations"
|
|
assert lp.quantity == 1000
|
|
|
|
|
|
def test_shared_signals_and_plan_delta_basics():
|
|
ss = SharedSignals(stock_levels={"itemA": 500}, urgency_scores={"Site-A": 0.8})
|
|
assert ss.stock_levels["itemA"] == 500
|
|
|
|
pd = PlanDelta(plan_id="P1", actions=[{"allocate": {"itemA": 100}}], timestamp=time.time())
|
|
assert pd.plan_id == "P1"
|
|
assert isinstance(pd.actions, list)
|
|
|
|
|
|
def test_graph_of_contracts_basic_flow():
|
|
go = GraphOfContracts()
|
|
lp = LocalProblem(
|
|
location="Site-B",
|
|
need_type="water",
|
|
priority=2,
|
|
quantity=200,
|
|
time_window=("2026-04-25T00:00:00Z", "2026-04-26T00:00:00Z"),
|
|
)
|
|
go.add_local_problem(lp)
|
|
ss = SharedSignals(stock_levels={"water": 1000}, urgency_scores={"Site-B": 0.5})
|
|
go.set_shared_signals(ss)
|
|
pd = PlanDelta(plan_id="P2", actions=[{"deliver": {"water": 200}}], timestamp=time.time())
|
|
go.add_plan_delta(pd)
|
|
assert len(go.local_problems) == 1
|
|
assert go.shared_signals is ss
|
|
assert len(go.plan_deltas) == 1
|