deltaforge-real-time-cross-.../deltaforge/adapters/equity_feed.py

20 lines
715 B
Python

from __future__ import annotations
import time
from typing import List
from ..dsl import Asset, MarketSignal
class EquityFeedAdapter:
"""
Starter equity feed adapter.
In a real system this would connect to a streaming data source.
Here we provide a deterministic, test-friendly generator.
"""
def __init__(self, asset_symbol: str = "AAPL"):
self.asset = Asset(id="eq-"+asset_symbol, type="equity", symbol=asset_symbol)
self.start = time.time()
def poll_signals(self) -> List[MarketSignal]:
t = time.time() - self.start
price = 150.0 + (t % 5) # deterministic-ish
return [MarketSignal(asset=self.asset, timestamp=t, price=price, liquidity=1.0)]