from __future__ import annotations from typing import List, Dict from datetime import datetime import uuid from mercurymesh.contracts import MarketSignal, AggregatedSignal from mercurymesh.provenance import merkle_proof_for_signal def _average_features(signals: List[MarketSignal]) -> Dict[str, float]: # Collect keys across signals keys: set[str] = set() for s in signals: keys.update(s.features.keys()) # Initialize sums sums: Dict[str, float] = {k: 0.0 for k in keys} counts: Dict[str, int] = {k: 0 for k in keys} for s in signals: for k in keys: if k in s.features: sums[k] += s.features[k] counts[k] += 1 # Compute averages for present keys avg: Dict[str, float] = {} for k in keys: if counts[k] > 0: avg[k] = sums[k] / counts[k] return avg def aggregate_signals(signals: List[MarketSignal]) -> AggregatedSignal: """Create an AggregatedSignal from a list of MarketSignal objects. This is a lightweight MVP aggregation: average feature values across venues and emit a Merkle-style provenance proof based on the input signals. """ if not signals: raise ValueError("signals list must not be empty") venues = [s.venue_id for s in signals] feature_vector = _average_features(signals) privacy_budget_used = 0.0 nonce = uuid.uuid4().hex # Create a simple merkle_proof from the input signals for provenance traceability # We reuse merkle_proof_for_signal on the first signal as a compact probe merkle_proof = merkle_proof_for_signal(signals[0]) if signals else None aggregated = AggregatedSignal( venues=venues, feature_vector=feature_vector, privacy_budget_used=privacy_budget_used, nonce=nonce, merkle_proof=merkle_proof, ) return aggregated