diff --git a/marketmesh_privacy_preserving_federated_/core.py b/marketmesh_privacy_preserving_federated_/core.py index 7d71a4a..93f9301 100644 --- a/marketmesh_privacy_preserving_federated_/core.py +++ b/marketmesh_privacy_preserving_federated_/core.py @@ -1,4 +1,6 @@ """Core primitives for MarketMesh MVP: contracts, delta-sync, and aggregation.""" +import json +import hashlib from typing import Dict, Any, List import math import random @@ -28,6 +30,52 @@ class DeltaSync: def __repr__(self) -> str: return f"DeltaSync(contract_id={self.contract_id}, version_vector={self.version_vector}, hash={self.hash})" + def merge(self, other: "DeltaSync") -> "DeltaSync": + """Deterministically merge two DeltaSync payloads for the same contract. + + Rules (deterministic and simple for MVP): + - contract_id must match + - version_vector is the element-wise maximum across both vectors + - payload per KPI is averaged if both deltas provide a value for the KPI; + otherwise the available value is used + - hash is recomputed from the merged contract_id, version_vector, and payload + """ + if self.contract_id != other.contract_id: + raise ValueError("Cannot merge DeltaSync with different contract_id") + + # Merge version vectors by taking per-peer max + all_keys = set(self.version_vector.keys()) | set(other.version_vector.keys()) + merged_version_vector: Dict[str, int] = {} + for k in all_keys: + merged_version_vector[k] = max(self.version_vector.get(k, 0), other.version_vector.get(k, 0)) + + # Merge payloads: average where both exist, otherwise take the existing value + all_kpis = set(self.payload.keys()) | set(other.payload.keys()) + merged_payload: Dict[str, float] = {} + for k in all_kpis: + v1 = self.payload.get(k) + v2 = other.payload.get(k) + if (v1 is not None) and (v2 is not None): + merged_payload[k] = (float(v1) + float(v2)) / 2.0 + elif v1 is not None: + merged_payload[k] = float(v1) + elif v2 is not None: + merged_payload[k] = float(v2) + else: + merged_payload[k] = 0.0 + + # Deterministic hash over the merged content + payload_for_hash = { + "contract_id": self.contract_id, + "version_vector": merged_version_vector, + "payload": merged_payload, + } + merged_hash = hashlib.sha256( + json.dumps(payload_for_hash, sort_keys=True).encode("utf-8") + ).hexdigest() + + return DeltaSync(contract_id=self.contract_id, version_vector=merged_version_vector, payload=merged_payload, hash_=merged_hash) + class Aggregator: """In-memory federated aggregator with optional DP-like noise for each KPI.""" diff --git a/tests/test_delta_sync_merge.py b/tests/test_delta_sync_merge.py new file mode 100644 index 0000000..8028fee --- /dev/null +++ b/tests/test_delta_sync_merge.py @@ -0,0 +1,38 @@ +import json +import hashlib +import pytest + +from marketmesh_privacy_preserving_federated_.core import DeltaSync + + +def test_delta_sync_merge_basic(): + ds1 = DeltaSync( + contract_id="c1", + version_vector={"p1": 1}, + payload={"revenue": 10.0, "customers": 5.0}, + hash_="h1", + ) + ds2 = DeltaSync( + contract_id="c1", + version_vector={"p1": 2, "p2": 1}, + payload={"revenue": 20.0, "customers": 3.0, "activation": 0.5}, + hash_="h2", + ) + + merged = ds1.merge(ds2) + + # version vector should be per-peer max + assert merged.contract_id == "c1" + assert merged.version_vector["p1"] == 2 + assert merged.version_vector["p2"] == 1 + + # KPI merging: revenue and customers present in both -> average; new KPI from ds2 is included + assert pytest.approx(merged.payload["revenue"]) == (10.0 + 20.0) / 2.0 + assert pytest.approx(merged.payload["customers"]) == (5.0 + 3.0) / 2.0 + assert pytest.approx(merged.payload["activation"]) == 0.5 + + # Merging again with the same pair should produce the same result (deterministic) + merged_again = ds1.merge(ds2) + assert merged.payload == merged_again.payload + assert merged.version_vector == merged_again.version_vector + assert merged.hash == merged_again.hash