from __future__ import annotations import time from typing import Dict, Any from .core import LocalArbProblem, SharedSignals, PlanDelta class PriceFeedAdapter: """Adapter A: emits LocalArbProblem + SharedSignals on a fixed cadence.""" def __init__(self, venue: str, assets: list[str]): self.venue = venue self.assets = assets self.version = 1 def step(self) -> tuple[LocalArbProblem, SharedSignals]: # Create a simple local arb problem and some market deltas prob = LocalArbProblem( id=f"{self.venue}-p1", venue=self.venue, assets=self.assets, target_misprice=0.001, # placeholder target max_exposure=100000.0, latency_budget=0.1, ) signals = SharedSignals( version=self.version, price_delta_by_asset={a: 0.0001 * (hash(a) % 5) for a in self.assets}, cross_corr={(a1, a2): 0.1 for a1 in self.assets for a2 in self.assets if a1 != a2}, liquidity_estimates={a: 1.0 for a in self.assets}, ) self.version += 1 return prob, signals class BrokerAdapter: """Adapter B: consumes PlanDelta and prints a simulated fill.""" def __init__(self, venue: str): self.venue = venue def execute(self, plan: PlanDelta) -> Dict[str, Any]: # Simulate a fill with deterministic outcome based on delta_actions fills = [] for action in plan.delta_actions: fills.append({ "asset": action.get("asset"), "size": action.get("size"), "from": action.get("from_venue"), "to": action.get("to_venue"), "status": "filled", }) return { "venue": self.venue, "timestamp": plan.timestamp, "fills": fills, "ack": True, }