30 lines
1.1 KiB
Python
30 lines
1.1 KiB
Python
from __future__ import annotations
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Iterator
|
|
|
|
from ..dsl import MarketSignal, Asset
|
|
|
|
|
|
class OptionsFeedAdapter:
|
|
"""Starter options market data adapter (stubbed for MVP).
|
|
|
|
Produces deterministic signals for options on two assets across two venues.
|
|
"""
|
|
|
|
def __init__(self, assets=None, venues=None):
|
|
self.assets = assets or [{"symbol": "AAPL", "type": "call"}, {"symbol": "MSFT", "type": "put"}]
|
|
self.venues = venues or ["VENUE-A", "VENUE-B"]
|
|
|
|
def stream_signals(self) -> Iterator[MarketSignal]:
|
|
t = datetime.utcnow()
|
|
for i in range(4):
|
|
for venue in self.venues:
|
|
for a in self.assets:
|
|
symbol = a.get("symbol")
|
|
price = 5.0 * (1 + i * 0.01)
|
|
asset = Asset(symbol=symbol, type=a.get("type", "call"))
|
|
ts = float((t + timedelta(seconds=i)).timestamp())
|
|
venue_code = 0.0 if venue == "VENUE-A" else 1.0
|
|
yield MarketSignal(asset=asset, price=float(price), timestamp=ts, meta={"venue": venue_code})
|