From 02f4d00ab0a44d1159bcec3387035c8c82cfab79 Mon Sep 17 00:00:00 2001 From: agent-7e3bbc424e07835b Date: Mon, 20 Apr 2026 14:51:09 +0200 Subject: [PATCH] build(agent): new-agents-2#7e3bbc iteration --- mercurymesh/pipeline/__init__.py | 8 ++++ mercurymesh/pipeline/aggregator.py | 59 ++++++++++++++++++++++++++++++ mercurymesh/provenance.py | 30 +++++++++++++++ tests/test_pipeline.py | 36 ++++++++++++++++++ 4 files changed, 133 insertions(+) create mode 100644 mercurymesh/pipeline/__init__.py create mode 100644 mercurymesh/pipeline/aggregator.py create mode 100644 mercurymesh/provenance.py create mode 100644 tests/test_pipeline.py diff --git a/mercurymesh/pipeline/__init__.py b/mercurymesh/pipeline/__init__.py new file mode 100644 index 0000000..5f8de11 --- /dev/null +++ b/mercurymesh/pipeline/__init__.py @@ -0,0 +1,8 @@ +"""MercuryMesh Pipeline: lightweight toy cross-venue analytics pipeline. + +Exposes a minimal aggregator interface used by MVP demonstrations. +""" + +from .aggregator import aggregate_signals # noqa: F401 + +__all__ = ["aggregate_signals"] diff --git a/mercurymesh/pipeline/aggregator.py b/mercurymesh/pipeline/aggregator.py new file mode 100644 index 0000000..9c29f43 --- /dev/null +++ b/mercurymesh/pipeline/aggregator.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from typing import List, Dict +from datetime import datetime +import uuid +from mercurymesh.contracts import MarketSignal, AggregatedSignal +from mercurymesh.provenance import merkle_proof_for_signal + + +def _average_features(signals: List[MarketSignal]) -> Dict[str, float]: + # Collect keys across signals + keys: set[str] = set() + for s in signals: + keys.update(s.features.keys()) + + # Initialize sums + sums: Dict[str, float] = {k: 0.0 for k in keys} + counts: Dict[str, int] = {k: 0 for k in keys} + + for s in signals: + for k in keys: + if k in s.features: + sums[k] += s.features[k] + counts[k] += 1 + + # Compute averages for present keys + avg: Dict[str, float] = {} + for k in keys: + if counts[k] > 0: + avg[k] = sums[k] / counts[k] + return avg + + +def aggregate_signals(signals: List[MarketSignal]) -> AggregatedSignal: + """Create an AggregatedSignal from a list of MarketSignal objects. + + This is a lightweight MVP aggregation: average feature values across venues + and emit a Merkle-style provenance proof based on the input signals. + """ + if not signals: + raise ValueError("signals list must not be empty") + + venues = [s.venue_id for s in signals] + feature_vector = _average_features(signals) + privacy_budget_used = 0.0 + nonce = uuid.uuid4().hex + + # Create a simple merkle_proof from the input signals for provenance traceability + # We reuse merkle_proof_for_signal on the first signal as a compact probe + merkle_proof = merkle_proof_for_signal(signals[0]) if signals else None + + aggregated = AggregatedSignal( + venues=venues, + feature_vector=feature_vector, + privacy_budget_used=privacy_budget_used, + nonce=nonce, + merkle_proof=merkle_proof, + ) + return aggregated diff --git a/mercurymesh/provenance.py b/mercurymesh/provenance.py new file mode 100644 index 0000000..b132030 --- /dev/null +++ b/mercurymesh/provenance.py @@ -0,0 +1,30 @@ +from __future__ import annotations + +import hashlib +from datetime import datetime +from mercurymesh.contracts import MarketSignal + + +def _hash_string(s: str) -> str: + return hashlib.sha256(s.encode("utf-8")).hexdigest() + + +def merkle_proof_for_signal(signal: MarketSignal) -> str: + """Generate a lightweight, deterministic Merkle-like proof for a signal. + + This is a simple stand-in for a real Merkle proof suitable for MVP testing. + It hashes the concatenation of venue_id, symbol, timestamp, and a sorted + representation of features. + """ + features_items = sorted(signal.features.items()) + features_str = ",".join([f"{k}={v}" for k, v in features_items]) + base = f"{signal.venue_id}|{signal.symbol}|{signal.timestamp.isoformat()}|{features_str}" + return _hash_string(base) + + +def verify_provenance(aggregated_proof: str, signals: List[MarketSignal]) -> bool: + """Very lightweight verification: recompute proof from the first signal and compare.""" + if not signals: + return False + expected = merkle_proof_for_signal(signals[0]) + return expected == aggregated_proof diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..9bf38c6 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,36 @@ +from datetime import datetime +from mercurymesh.contracts import MarketSignal, AggregatedSignal +from mercurymesh.pipeline import aggregate_signals +from mercurymesh.provenance import merkle_proof_for_signal, verify_provenance + + +def _make_signal(venue_id: str, symbol: str, t: datetime, f: dict) -> MarketSignal: + return MarketSignal(venue_id=venue_id, symbol=symbol, timestamp=t, features=f) + + +def test_aggregate_signals_basic(): + t = datetime.utcnow() + s1 = _make_signal("venue-a", "ABC", t, {"liquidity_proxy": 1.0, "order_flow_intensity": 0.5}) + s2 = _make_signal("venue-b", "XYZ", t, {"liquidity_proxy": 0.5, "order_flow_intensity": 0.8}) + + agg = aggregate_signals([s1, s2]) + + # Basic sanity checks + assert isinstance(agg, AggregatedSignal) + assert len(agg.venues) == 2 + assert "liquidity_proxy" in agg.feature_vector + assert agg.nonce is not None + # merkle_proof should be present (deterministic from first signal) + assert agg.merkle_proof is not None + + +def test_provenance_roundtrip(): + s = MarketSignal( + venue_id="venue-a", + symbol="ABC", + timestamp=datetime.utcnow(), + features={"liquidity_proxy": 1.0, "order_flow_intensity": 0.5}, + ) + proof = merkle_proof_for_signal(s) + # Use verify_provenance to check it matches the first-signal-derived proof + assert verify_provenance(proof, [s])