deltaforge-real-time-cross-.../deltaforge/adapters/options_feed.py

27 lines
932 B
Python

from __future__ import annotations
from datetime import datetime, timedelta
from typing import Iterator
from ..dsl import MarketSignal
class OptionsFeedAdapter:
"""Starter options market data adapter (stubbed for MVP).
Produces deterministic signals for options on two assets across two venues.
"""
def __init__(self, assets=None, venues=None):
self.assets = assets or [{"symbol": "AAPL", "type": "call"}, {"symbol": "MSFT", "type": "put"}]
self.venues = venues or ["VENUE-A", "VENUE-B"]
def stream_signals(self) -> Iterator[MarketSignal]:
t = datetime.utcnow()
for i in range(4):
for venue in self.venues:
for a in self.assets:
symbol = a.get("symbol")
price = 5.0 * (1 + i * 0.01)
yield MarketSignal(asset_symbol=symbol, venue=venue, price=price, timestamp=t + timedelta(seconds=i))