mercurymesh-privacy-preserv.../mercurymesh/pipeline/aggregator.py

60 lines
1.8 KiB
Python

from __future__ import annotations
from typing import List, Dict
from datetime import datetime
import uuid
from mercurymesh.contracts import MarketSignal, AggregatedSignal
from mercurymesh.provenance import merkle_proof_for_signal
def _average_features(signals: List[MarketSignal]) -> Dict[str, float]:
# Collect keys across signals
keys: set[str] = set()
for s in signals:
keys.update(s.features.keys())
# Initialize sums
sums: Dict[str, float] = {k: 0.0 for k in keys}
counts: Dict[str, int] = {k: 0 for k in keys}
for s in signals:
for k in keys:
if k in s.features:
sums[k] += s.features[k]
counts[k] += 1
# Compute averages for present keys
avg: Dict[str, float] = {}
for k in keys:
if counts[k] > 0:
avg[k] = sums[k] / counts[k]
return avg
def aggregate_signals(signals: List[MarketSignal]) -> AggregatedSignal:
"""Create an AggregatedSignal from a list of MarketSignal objects.
This is a lightweight MVP aggregation: average feature values across venues
and emit a Merkle-style provenance proof based on the input signals.
"""
if not signals:
raise ValueError("signals list must not be empty")
venues = [s.venue_id for s in signals]
feature_vector = _average_features(signals)
privacy_budget_used = 0.0
nonce = uuid.uuid4().hex
# Create a simple merkle_proof from the input signals for provenance traceability
# We reuse merkle_proof_for_signal on the first signal as a compact probe
merkle_proof = merkle_proof_for_signal(signals[0]) if signals else None
aggregated = AggregatedSignal(
venues=venues,
feature_vector=feature_vector,
privacy_budget_used=privacy_budget_used,
nonce=nonce,
merkle_proof=merkle_proof,
)
return aggregated