idea143-crisisguard-federat.../crisisguard/adapters/supply_chain_optimizer.py

36 lines
1.2 KiB
Python

"""Minimal supply-chain adapter for CrisisGuard MVP.
This adapter demonstrates how a delta can modify the LocalPlan's
contents to reflect updated resource allocations. It merges the delta's
updates into the plan contents and bumps the version.
"""
from __future__ import annotations
from crisisguard.core import LocalPlan, PlanDelta
def process(plan: LocalPlan, delta: PlanDelta) -> LocalPlan:
"""Apply a delta to the plan's contents in a deterministic way.
This is a lightweight, toy implementation suitable for a sandbox MVP.
"""
# Deep copy-like behavior for nested dictionaries (simple approach here)
new_contents = {k: v for k, v in plan.contents.items()}
for k, v in delta.updates.items():
# Merge/overwrite at top-level keys; nested dicts could be merged more deeply if needed
if isinstance(v, dict) and isinstance(new_contents.get(k), dict):
merged = dict(new_contents[k])
merged.update(v)
new_contents[k] = merged
else:
new_contents[k] = v
return LocalPlan(
plan_id=plan.plan_id,
neighborhood=plan.neighborhood,
contents=new_contents,
version=plan.version + 1,
timestamp=plan.timestamp,
)