45 lines
2.0 KiB
Python
45 lines
2.0 KiB
Python
import unittest
|
|
|
|
from marketmesh_privacy_preserving_federated_.core import Contract, DeltaSync, Aggregator
|
|
from marketmesh_privacy_preserving_federated_.adapters.stripe import StripeAdapter
|
|
from marketmesh_privacy_preserving_federated_.adapters.shopify import ShopifyAdapter
|
|
|
|
|
|
class TestAggregationPipeline(unittest.TestCase):
|
|
def test_protocol_and_aggregation_basic(self):
|
|
# Define a minimal contract with a few KPIs
|
|
contract = Contract(contract_id="c1", version=1, kpis=["revenue", "activation_rate", "visits"])
|
|
self.assertIsNotNone(contract)
|
|
|
|
# Prepare two participant contributions via adapters mapping to canonical KPIs
|
|
stripe = StripeAdapter("stripe-c1")
|
|
shopify = ShopifyAdapter("shopify-c1")
|
|
|
|
p1_raw = {"amount": 1200.0, "subscriber_count": 40, "visits": 3000, "activation_rate": 0.55, "signups": 250}
|
|
p2_raw = {"amount": 800.0, "subscriber_count": 25, "visits": 1800, "activation_rate": 0.6, "signups": 180}
|
|
|
|
p1 = stripe.map_to_canonical(p1_raw)
|
|
p2 = shopify.map_to_canonical(p2_raw)
|
|
|
|
# The aggregated KPI set should include the union of keys
|
|
contributions = [p1, p2]
|
|
aggregator = Aggregator(epsilon_per_kpi={"revenue": 1.0, "activation_rate": 0.5, "visits": 0.5})
|
|
aggregated = aggregator.aggregate(contributions)
|
|
|
|
# Basic sanity: keys exist and values are numeric
|
|
for k in ["revenue", "activation_rate", "visits"]:
|
|
self.assertIn(k, aggregated)
|
|
self.assertIsInstance(aggregated[k], float)
|
|
|
|
def test_delta_sync_and_hash(self):
|
|
# Simple delta sync payload creation and representation test
|
|
payload = {"revenue": 1500.0, "customers": 60}
|
|
ds = DeltaSync(contract_id="c1", version_vector={"v1": 1}, payload=payload, hash_="abcd1234")
|
|
self.assertEqual(ds.contract_id, "c1")
|
|
self.assertEqual(ds.payload, payload)
|
|
self.assertEqual(ds.hash, "abcd1234")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|