deltaforge-real-time-cross-.../deltaforge/core.py

42 lines
1.6 KiB
Python

from __future__ import annotations
from typing import List
from .dsl import Asset, MarketSignal, PlanDelta, StrategyDelta
class Curator:
"""Simple central coordinator: enforces cross-venue delta neutrality
with a naive ADMM-lite style constraint. This is a minimal stub for MVP.
"""
def __init__(self, assets: List[Asset] = None, contract_id: str | None = None):
self.assets = assets or []
self.contract_id = contract_id
def synthesize_plan(self, signals: List[MarketSignal], objectives: List[StrategyDelta]) -> PlanDelta:
"""Generate a PlanDelta given per-venue signals and desired strategy blocks.
Minimal MVP: translate each StrategyDelta into a hedge action targeting
the first asset listed, producing a simple delta-neutral plan if possible.
"""
hedges: List[Dict] = []
latest_price = signals[-1].price if signals else 0.0
latest_ts = signals[-1].timestamp if signals else 0.0
for idx, sd in enumerate(objectives or []):
if getattr(sd, 'assets', None):
asset = sd.assets[0]
else:
# Fallback: skip if no asset available
continue
size = 1.0 * (-1 if idx % 2 == 1 else 1)
hedges.append({
"action": "hedge",
"symbol": asset.symbol,
"size": float(size),
"price": float(latest_price),
"ts": float(latest_ts),
})
plan = PlanDelta(delta=hedges, timestamp=float(latest_ts), author="curator", contract_id=self.contract_id)
return plan