diff --git a/deltaforge/__init__.py b/deltaforge/__init__.py index 999b0d4..a739d1f 100644 --- a/deltaforge/__init__.py +++ b/deltaforge/__init__.py @@ -17,6 +17,7 @@ __all__ = [ "OptionsFeedAdapter", "ExecutionEngine", "Backtester", + "RealTimeEngine", ] from .dsl import Asset, MarketSignal, StrategyDelta, PlanDelta @@ -25,3 +26,4 @@ from .adapters.equity_feed import EquityFeedAdapter from .adapters.options_feed import OptionsFeedAdapter from .execution import ExecutionEngine from .backtester import Backtester +from .rt_engine import RealTimeEngine diff --git a/deltaforge/adapters/equity_feed.py b/deltaforge/adapters/equity_feed.py index 1eb366a..11a267c 100644 --- a/deltaforge/adapters/equity_feed.py +++ b/deltaforge/adapters/equity_feed.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timedelta from typing import Iterator -from ..dsl import MarketSignal +from ..dsl import MarketSignal, Asset class EquityFeedAdapter: @@ -17,10 +17,18 @@ class EquityFeedAdapter: self.venues = venues or ["VENUE-A", "VENUE-B"] def stream_signals(self) -> Iterator[MarketSignal]: + """Yield deterministic MarketSignal objects compatible with deltaforge.dsl.MarketSignal. + + Each signal carries the canonical Asset description as expected by the DSL. + """ base = {"AAPL": 150.0, "MSFT": 300.0} t = datetime.utcnow() for i in range(4): for v_i, venue in enumerate(self.venues): for sym in self.symbols: price = base.get(sym, 100.0) * (1 + (i * 0.001) + (v_i * 0.0005)) - yield MarketSignal(asset_symbol=sym, venue=venue, price=float(price), timestamp=t + timedelta(seconds=i)) + asset = Asset(symbol=sym, type="equity") + ts = float((t + timedelta(seconds=i)).timestamp()) + venue_code = 0.0 if venue == "VENUE-A" else 1.0 + sig = MarketSignal(asset=asset, price=float(price), timestamp=ts, meta={"venue": venue_code}) + yield sig diff --git a/deltaforge/adapters/options_feed.py b/deltaforge/adapters/options_feed.py index 6fd54e1..7d840f2 100644 --- a/deltaforge/adapters/options_feed.py +++ b/deltaforge/adapters/options_feed.py @@ -3,7 +3,7 @@ from __future__ import annotations from datetime import datetime, timedelta from typing import Iterator -from ..dsl import MarketSignal +from ..dsl import MarketSignal, Asset class OptionsFeedAdapter: @@ -23,4 +23,7 @@ class OptionsFeedAdapter: for a in self.assets: symbol = a.get("symbol") price = 5.0 * (1 + i * 0.01) - yield MarketSignal(asset_symbol=symbol, venue=venue, price=price, timestamp=t + timedelta(seconds=i)) + asset = Asset(symbol=symbol, type=a.get("type", "call")) + ts = float((t + timedelta(seconds=i)).timestamp()) + venue_code = 0.0 if venue == "VENUE-A" else 1.0 + yield MarketSignal(asset=asset, price=float(price), timestamp=ts, meta={"venue": venue_code}) diff --git a/deltaforge/rt_engine.py b/deltaforge/rt_engine.py new file mode 100644 index 0000000..51afa43 --- /dev/null +++ b/deltaforge/rt_engine.py @@ -0,0 +1,86 @@ +from __future__ import annotations + +from typing import List + +from .adapters.equity_feed import EquityFeedAdapter +from .adapters.options_feed import OptionsFeedAdapter +from .core import Curator +from .coordinator import Coordinator +from .execution import ExecutionEngine +from .backtester import Backtester +from .dsl import MarketSignal, PlanDelta, StrategyDelta, Asset + + +class RealTimeEngine: + """Minimal real-time cross-asset engine (MVP). + + Streams signals from equity and options adapters, synthesizes a plan via the + curator, coordinates cross-venue coherence, and executes the plan. Also runs a + deterministic replay for testing purposes. + """ + + def __init__(self, + equity_symbols: List[str] | None = None, + option_assets: List[dict] | None = None, + venues: List[str] | None = None, + sample_window: int = 4): + self.equity_symbols = equity_symbols or ["AAPL", "MSFT"] + self.option_assets = option_assets or [{"symbol": "AAPL", "type": "call"}, {"symbol": "MSFT", "type": "put"}] + self.venues = venues or ["VENUE-A", "VENUE-B"] + self.sample_window = sample_window + + # Adapters + self.equity_adapter = EquityFeedAdapter(symbols=self.equity_symbols, venues=self.venues) + self.option_adapter = OptionsFeedAdapter(assets=self.option_assets, venues=self.venues) + + # Lightweight components + self.curator = Curator() + self.coordinator = Coordinator() + self.engine = ExecutionEngine() + self.bt = Backtester(initial_cash=100000.0) + + def _collect_signals(self) -> List[MarketSignal]: + signals: List[MarketSignal] = [] + eq_iter = self.equity_adapter.stream_signals() + opt_iter = self.option_adapter.stream_signals() + + for _ in range(self.sample_window): + try: + s = next(eq_iter) + signals.append(s) + except StopIteration: + pass + try: + s = next(opt_iter) + signals.append(s) + except StopIteration: + pass + return signals + + def run_cycle(self) -> dict: + """Run a single deterministic cycle: collect signals, synthesize and coordinate a plan, and execute.""" + signals = self._collect_signals() + # Build simple objectives from signals + objectives: List[StrategyDelta] = [] + for idx, sig in enumerate(signals): + asset = sig.asset + hedges = 0.5 # simple heuristic placeholder + objectives.append(StrategyDelta(id=f"rt-{idx}", assets=[asset], hedge_ratio=hedges, target_pnl=0.0, constraints=[])) + + # Curate a plan from signals (optional in MVP) – we keep this lightweight + plan_curated: PlanDelta = self.curator.synthesize_plan(signals, objectives) + + # Cross-venue coordination step (ADMM-lite) + plan = self.coordinator.coordinate(signals, author="rt-engine") + + # Execute plan (routing and PnL proxy) + exec_result = self.engine.execute(plan) + + # Backtest replay for deterministic evaluation + replay = self.bt.run(plan) + + return { + "plan": plan, + "execution": exec_result, + "replay": replay, + }