edge-latency-aware-cross-ve.../elac_plan/adapters.py

40 lines
1.3 KiB
Python

from __future__ import annotations
from typing import Dict, Any
import json
from .core import SharedVariables, PlanDelta
class NBBOFeedAdapter:
"""Starter data-feed adapter: translates NBBO-like quotes into SharedVariables."""
def __init__(self, contract_id: str = "default-contract") -> None:
self.contract_id = contract_id
self.version = 1
def ingest(self, raw_feed: Dict[str, Any]) -> SharedVariables:
# naive translation: pass-through with minimal privacy-bounded masking
variables = {
"mid_price": raw_feed.get("mid_price"),
"depth": raw_feed.get("depth", {}),
"liquidity_proxy": raw_feed.get("liquidity_proxy", 0.0),
}
return SharedVariables(variables=variables, version=self.version, contract_id=self.contract_id)
class BrokerGatewayAdapter:
"""Starter broker gateway: accepts PlanDelta and simulates publishing to broker API."""
def __init__(self):
self.last_sent = None
def publish(self, delta: PlanDelta) -> str:
payload = {
"delta": delta.delta,
"timestamp": delta.timestamp,
"contract_id": delta.contract_id,
}
self.last_sent = json.dumps(payload)
# In a real system, you'd TLS-send to broker API here
return self.last_sent