build(agent): new-agents-4#58ba63 iteration
This commit is contained in:
parent
e5c8ab2647
commit
3ed38c0c33
|
|
@ -1,4 +1,6 @@
|
|||
"""Core primitives for MarketMesh MVP: contracts, delta-sync, and aggregation."""
|
||||
import json
|
||||
import hashlib
|
||||
from typing import Dict, Any, List
|
||||
import math
|
||||
import random
|
||||
|
|
@ -28,6 +30,52 @@ class DeltaSync:
|
|||
def __repr__(self) -> str:
|
||||
return f"DeltaSync(contract_id={self.contract_id}, version_vector={self.version_vector}, hash={self.hash})"
|
||||
|
||||
def merge(self, other: "DeltaSync") -> "DeltaSync":
|
||||
"""Deterministically merge two DeltaSync payloads for the same contract.
|
||||
|
||||
Rules (deterministic and simple for MVP):
|
||||
- contract_id must match
|
||||
- version_vector is the element-wise maximum across both vectors
|
||||
- payload per KPI is averaged if both deltas provide a value for the KPI;
|
||||
otherwise the available value is used
|
||||
- hash is recomputed from the merged contract_id, version_vector, and payload
|
||||
"""
|
||||
if self.contract_id != other.contract_id:
|
||||
raise ValueError("Cannot merge DeltaSync with different contract_id")
|
||||
|
||||
# Merge version vectors by taking per-peer max
|
||||
all_keys = set(self.version_vector.keys()) | set(other.version_vector.keys())
|
||||
merged_version_vector: Dict[str, int] = {}
|
||||
for k in all_keys:
|
||||
merged_version_vector[k] = max(self.version_vector.get(k, 0), other.version_vector.get(k, 0))
|
||||
|
||||
# Merge payloads: average where both exist, otherwise take the existing value
|
||||
all_kpis = set(self.payload.keys()) | set(other.payload.keys())
|
||||
merged_payload: Dict[str, float] = {}
|
||||
for k in all_kpis:
|
||||
v1 = self.payload.get(k)
|
||||
v2 = other.payload.get(k)
|
||||
if (v1 is not None) and (v2 is not None):
|
||||
merged_payload[k] = (float(v1) + float(v2)) / 2.0
|
||||
elif v1 is not None:
|
||||
merged_payload[k] = float(v1)
|
||||
elif v2 is not None:
|
||||
merged_payload[k] = float(v2)
|
||||
else:
|
||||
merged_payload[k] = 0.0
|
||||
|
||||
# Deterministic hash over the merged content
|
||||
payload_for_hash = {
|
||||
"contract_id": self.contract_id,
|
||||
"version_vector": merged_version_vector,
|
||||
"payload": merged_payload,
|
||||
}
|
||||
merged_hash = hashlib.sha256(
|
||||
json.dumps(payload_for_hash, sort_keys=True).encode("utf-8")
|
||||
).hexdigest()
|
||||
|
||||
return DeltaSync(contract_id=self.contract_id, version_vector=merged_version_vector, payload=merged_payload, hash_=merged_hash)
|
||||
|
||||
|
||||
class Aggregator:
|
||||
"""In-memory federated aggregator with optional DP-like noise for each KPI."""
|
||||
|
|
|
|||
|
|
@ -0,0 +1,38 @@
|
|||
import json
|
||||
import hashlib
|
||||
import pytest
|
||||
|
||||
from marketmesh_privacy_preserving_federated_.core import DeltaSync
|
||||
|
||||
|
||||
def test_delta_sync_merge_basic():
|
||||
ds1 = DeltaSync(
|
||||
contract_id="c1",
|
||||
version_vector={"p1": 1},
|
||||
payload={"revenue": 10.0, "customers": 5.0},
|
||||
hash_="h1",
|
||||
)
|
||||
ds2 = DeltaSync(
|
||||
contract_id="c1",
|
||||
version_vector={"p1": 2, "p2": 1},
|
||||
payload={"revenue": 20.0, "customers": 3.0, "activation": 0.5},
|
||||
hash_="h2",
|
||||
)
|
||||
|
||||
merged = ds1.merge(ds2)
|
||||
|
||||
# version vector should be per-peer max
|
||||
assert merged.contract_id == "c1"
|
||||
assert merged.version_vector["p1"] == 2
|
||||
assert merged.version_vector["p2"] == 1
|
||||
|
||||
# KPI merging: revenue and customers present in both -> average; new KPI from ds2 is included
|
||||
assert pytest.approx(merged.payload["revenue"]) == (10.0 + 20.0) / 2.0
|
||||
assert pytest.approx(merged.payload["customers"]) == (5.0 + 3.0) / 2.0
|
||||
assert pytest.approx(merged.payload["activation"]) == 0.5
|
||||
|
||||
# Merging again with the same pair should produce the same result (deterministic)
|
||||
merged_again = ds1.merge(ds2)
|
||||
assert merged.payload == merged_again.payload
|
||||
assert merged.version_vector == merged_again.version_vector
|
||||
assert merged.hash == merged_again.hash
|
||||
Loading…
Reference in New Issue