49 lines
1.7 KiB
Python
49 lines
1.7 KiB
Python
import pytest
|
|
from idea168_crisispulse_federated_resource.ledger import LocalLedger
|
|
from idea168_crisispulse_federated_resource.contract_registry import GraphOfContracts
|
|
from idea168_crisispulse_federated_resource.adapters import SolarMicrogridAdapter, WaterPurifierAdapter
|
|
from idea168_crisispulse_federated_resource.governance import GovernanceLog
|
|
from idea168_crisispulse_federated_resource.privacy import SimplePrivacyAggregator
|
|
|
|
|
|
def test_ledger_basic_and_merkle():
|
|
L = LocalLedger("test")
|
|
L.add_entry("e1", {"resource": "water", "qty": 100})
|
|
L.add_entry("e2", {"resource": "food", "qty": 200})
|
|
root = L.merkle_root()
|
|
assert isinstance(root, str) and len(root) == 64
|
|
|
|
# delta with another ledger
|
|
L2 = LocalLedger("other")
|
|
L2.add_entry("e1", {"resource": "water", "qty": 100})
|
|
delta = L.delta_with(L2)
|
|
assert isinstance(delta, list)
|
|
L2.apply_delta(delta)
|
|
assert L2.get_entry("e1")["payload"]["resource"] == "water"
|
|
|
|
|
|
def test_contract_registry_basic():
|
|
reg = GraphOfContracts()
|
|
reg.register_schema("energy", "1.0", {"type": "object", "properties": {"peak": "float"}})
|
|
schema = reg.get_schema("energy", "1.0")
|
|
assert schema is not None
|
|
assert schema["name"] == "energy"
|
|
|
|
|
|
def test_adapters_and_governance():
|
|
s = SolarMicrogridAdapter()
|
|
w = WaterPurifierAdapter()
|
|
assert s.connect() is True
|
|
assert w.connect() is True
|
|
g = GovernanceLog(secret=b'secret-key')
|
|
ev = {"action": "deploy", "entity": "solar"}
|
|
signed = g.add_event(ev)
|
|
assert g.verify_event(signed) is True
|
|
|
|
|
|
def test_privacy_aggregator():
|
|
agg = SimplePrivacyAggregator()
|
|
signals = [{"d1": 1.0}, {"d2": 2.0}]
|
|
total = agg.aggregate(signals)
|
|
assert total >= 0
|