idea69-tradeseal-federated-.../tradeseal/adapter_registry.py

37 lines
1.2 KiB
Python

from __future__ import annotations
import random
from typing import List
from tradeseal.core import LocalOrder
class AdapterRegistry:
def __init__(self):
self.adapters = {}
def register(self, name: str, adapter_func):
self.adapters[name] = adapter_func
def get(self, name: str):
return self.adapters.get(name)
def seed_default(self):
self.register("fix_feed", self._fix_feed_adapter)
self.register("rest_brokerage", self._rest_brokerage_adapter)
# Starter adapters that emit sample LocalOrder data
def _fix_feed_adapter(self) -> List[LocalOrder]:
# generate a couple of sample orders
orders = [
LocalOrder(order_id="FIX-ORD-1", venue="FX-EX", symbol="EURUSD", side="buy", quantity=1000, price=1.1000),
LocalOrder(order_id="FIX-ORD-2", venue="FX-EX", symbol="USDJPY", side="sell", quantity=800, price=109.5),
]
return orders
def _rest_brokerage_adapter(self) -> List[LocalOrder]:
orders = [
LocalOrder(order_id="REST-ORD-1", venue="BRK-REST", symbol="EURUSD", side="sell", quantity=600, price=1.1010),
]
return orders