60 lines
1.8 KiB
Python
60 lines
1.8 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import List, Dict
|
|
from datetime import datetime
|
|
import uuid
|
|
from mercurymesh.contracts import MarketSignal, AggregatedSignal
|
|
from mercurymesh.provenance import merkle_proof_for_signal
|
|
|
|
|
|
def _average_features(signals: List[MarketSignal]) -> Dict[str, float]:
|
|
# Collect keys across signals
|
|
keys: set[str] = set()
|
|
for s in signals:
|
|
keys.update(s.features.keys())
|
|
|
|
# Initialize sums
|
|
sums: Dict[str, float] = {k: 0.0 for k in keys}
|
|
counts: Dict[str, int] = {k: 0 for k in keys}
|
|
|
|
for s in signals:
|
|
for k in keys:
|
|
if k in s.features:
|
|
sums[k] += s.features[k]
|
|
counts[k] += 1
|
|
|
|
# Compute averages for present keys
|
|
avg: Dict[str, float] = {}
|
|
for k in keys:
|
|
if counts[k] > 0:
|
|
avg[k] = sums[k] / counts[k]
|
|
return avg
|
|
|
|
|
|
def aggregate_signals(signals: List[MarketSignal]) -> AggregatedSignal:
|
|
"""Create an AggregatedSignal from a list of MarketSignal objects.
|
|
|
|
This is a lightweight MVP aggregation: average feature values across venues
|
|
and emit a Merkle-style provenance proof based on the input signals.
|
|
"""
|
|
if not signals:
|
|
raise ValueError("signals list must not be empty")
|
|
|
|
venues = [s.venue_id for s in signals]
|
|
feature_vector = _average_features(signals)
|
|
privacy_budget_used = 0.0
|
|
nonce = uuid.uuid4().hex
|
|
|
|
# Create a simple merkle_proof from the input signals for provenance traceability
|
|
# We reuse merkle_proof_for_signal on the first signal as a compact probe
|
|
merkle_proof = merkle_proof_for_signal(signals[0]) if signals else None
|
|
|
|
aggregated = AggregatedSignal(
|
|
venues=venues,
|
|
feature_vector=feature_vector,
|
|
privacy_budget_used=privacy_budget_used,
|
|
nonce=nonce,
|
|
merkle_proof=merkle_proof,
|
|
)
|
|
return aggregated
|