idea168-crisispulse-federat.../tests/test_basic.py

49 lines
1.7 KiB
Python

import pytest
from idea168_crisispulse_federated_resource.ledger import LocalLedger
from idea168_crisispulse_federated_resource.contract_registry import GraphOfContracts
from idea168_crisispulse_federated_resource.adapters import SolarMicrogridAdapter, WaterPurifierAdapter
from idea168_crisispulse_federated_resource.governance import GovernanceLog
from idea168_crisispulse_federated_resource.privacy import SimplePrivacyAggregator
def test_ledger_basic_and_merkle():
L = LocalLedger("test")
L.add_entry("e1", {"resource": "water", "qty": 100})
L.add_entry("e2", {"resource": "food", "qty": 200})
root = L.merkle_root()
assert isinstance(root, str) and len(root) == 64
# delta with another ledger
L2 = LocalLedger("other")
L2.add_entry("e1", {"resource": "water", "qty": 100})
delta = L.delta_with(L2)
assert isinstance(delta, list)
L2.apply_delta(delta)
assert L2.get_entry("e1")["payload"]["resource"] == "water"
def test_contract_registry_basic():
reg = GraphOfContracts()
reg.register_schema("energy", "1.0", {"type": "object", "properties": {"peak": "float"}})
schema = reg.get_schema("energy", "1.0")
assert schema is not None
assert schema["name"] == "energy"
def test_adapters_and_governance():
s = SolarMicrogridAdapter()
w = WaterPurifierAdapter()
assert s.connect() is True
assert w.connect() is True
g = GovernanceLog(secret=b'secret-key')
ev = {"action": "deploy", "entity": "solar"}
signed = g.add_event(ev)
assert g.verify_event(signed) is True
def test_privacy_aggregator():
agg = SimplePrivacyAggregator()
signals = [{"d1": 1.0}, {"d2": 2.0}]
total = agg.aggregate(signals)
assert total >= 0