mercurymesh-privacy-preserv.../mercurymesh/adapters/venue_a.py

30 lines
870 B
Python

"""Starter adapter: Venue A (simulated FIX/WebSocket/REST feed)."""
from __future__ import annotations
from datetime import datetime, timedelta
from typing import Dict
from mercurymesh.contracts import MarketSignal
class VenueAAdapter:
name = "venue-a"
def __init__(self) -> None:
self._start = datetime.utcnow()
def extract_signal(self) -> MarketSignal:
now = datetime.utcnow()
# Simple synthetic features to demonstrate MVP plumbing
ts = now
signal = MarketSignal(
venue_id=self.name,
symbol="ABC",
timestamp=ts,
features={
"liquidity_proxy": max(0.0, (ts - self._start).total_seconds()),
"order_flow_intensity": 0.5,
"volatility_proxy": 0.1 + (ts.second % 5) * 0.01,
},
)
return signal