tradecipher-blockchain-back.../tradecipher_blockchain_back.../adapters.py

76 lines
1.9 KiB
Python

"""Stub venue adapters for MVP wiring."""
from __future__ import annotations
from typing import Dict
from .contracts import LocalTrade
class VenueAdapter:
name: str
def translate(self, payload: Dict[str, object]) -> LocalTrade:
raise NotImplementedError
class VenueAAdapter(VenueAdapter):
name = "VenueAAdapter"
def translate(self, payload: Dict[str, object]) -> LocalTrade:
# Minimal translation: expect payload has id, symbol, qty, price
return LocalTrade(
id=str(payload.get("id", "A-UNKNOWN")),
symbol=str(payload.get("symbol", "UNKNOWN")),
quantity=float(payload.get("qty", 0.0)),
price=float(payload.get("price", 0.0)),
)
class VenueBAdapter(VenueAdapter):
name = "VenueBAdapter"
def translate(self, payload: Dict[str, object]) -> LocalTrade:
return LocalTrade(
id=str(payload.get("order_id", "B-UNKNOWN")),
symbol=str(payload.get("ticker", "UNKNOWN")),
quantity=float(payload.get("size", 0.0)),
price=float(payload.get("limit", 0.0)),
)
class AdaptersRegistry:
def __init__(self) -> None:
self._adapters: Dict[str, VenueAdapter] = {}
def register(self, adapter: VenueAdapter) -> None:
self._adapters[adapter.name] = adapter
def get(self, name: str) -> VenueAdapter:
return self._adapters[name]
def list_names(self) -> list:
return list(self._adapters.keys())
_REGISTRY = AdaptersRegistry()
def bootstrap_adapters() -> None:
# Install starter adapters into the registry
_REGISTRY.register(VenueAAdapter())
_REGISTRY.register(VenueBAdapter())
def get_adapters_registry() -> AdaptersRegistry:
return _REGISTRY
__all__ = [
"VenueAdapter",
"VenueAAdapter",
"VenueBAdapter",
"AdaptersRegistry",
"bootstrap_adapters",
"get_adapters_registry",
]