35 lines
1.2 KiB
Python
35 lines
1.2 KiB
Python
"""Lightweight adapters for broker and exchange gateways.
|
|
|
|
These are minimal stubs that map internal decision signals to canonical signals
|
|
exposed to external systems. They are intentionally small MVP implementations.
|
|
"""
|
|
from __future__ import annotations
|
|
from typing import Dict, Any
|
|
|
|
|
|
class BrokerGatewayAdapter:
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
self.config = config or {}
|
|
|
|
def adapt_order(self, order_event: Dict[str, Any]) -> Dict[str, Any]:
|
|
# Example mapping: extract a few key signals from an internal order event
|
|
return {
|
|
"type": "order_decision",
|
|
"order_id": order_event.get("order_id"),
|
|
"venue": order_event.get("venue"),
|
|
"signal": order_event.get("signal", "unknown"),
|
|
}
|
|
|
|
|
|
class ExchangeGatewayAdapter:
|
|
def __init__(self, config: Dict[str, Any] = None):
|
|
self.config = config or {}
|
|
|
|
def adapt_execution(self, execution_event: Dict[str, Any]) -> Dict[str, Any]:
|
|
return {
|
|
"type": "execution_report",
|
|
"order_id": execution_event.get("order_id"),
|
|
"price": execution_event.get("price"),
|
|
"latency_ms": execution_event.get("latency_ms"),
|
|
}
|