deltaforge-real-time-cross-.../deltaforge/adapters/equity_feed.py

27 lines
938 B
Python

from __future__ import annotations
from datetime import datetime, timedelta
from typing import Iterator
from ..dsl import MarketSignal
class EquityFeedAdapter:
"""Starter equity price feed adapter (stubbed for MVP).
Generates deterministic MarketSignal data for two assets across two venues.
"""
def __init__(self, symbols=None, venues=None):
self.symbols = symbols or ["AAPL", "MSFT"]
self.venues = venues or ["VENUE-A", "VENUE-B"]
def stream_signals(self) -> Iterator[MarketSignal]:
base = {"AAPL": 150.0, "MSFT": 300.0}
t = datetime.utcnow()
for i in range(4):
for v_i, venue in enumerate(self.venues):
for sym in self.symbols:
price = base.get(sym, 100.0) * (1 + (i * 0.001) + (v_i * 0.0005))
yield MarketSignal(asset_symbol=sym, venue=venue, price=float(price), timestamp=t + timedelta(seconds=i))