deltaforge-real-time-cross-.../deltaforge_mvp/coordination.py

80 lines
3.2 KiB
Python

"""ADMM-lite style coordination skeleton for cross-venue coherence."""
from __future__ import annotations
from typing import List, Optional
from deltaforge_mvp.core import PlanDelta, StrategyDelta
class LocalRiskSolver:
def __init__(self, venue_name: str):
self.venue_name = venue_name
def propose(self, signals: List[StrategyDelta]) -> PlanDelta:
# Minimal heuristic: aggregate deltas and propose a single PlanDelta per venue
# In real system this would solve a convex program; here we pass through the deltas.
pd = PlanDelta(deltas=signals, venue=self.venue_name, author="local-solver")
return pd
class CentralCurator:
def __init__(self):
pass
def enforce(self, plans: List[PlanDelta]) -> PlanDelta:
# Naive: merge all deltas into a single PlanDelta with combined deltas
merged = []
for p in plans:
merged.extend(p.deltas)
# ADMM-lite balancing: attempt to enforce cross-venue coherence by
# driving the net delta toward zero. This is a lightweight, deterministic
# adjustment suitable for a toy MVP and deterministic replay.
if len(merged) > 0:
total = sum(d.delta for d in merged)
mean_adjust = total / len(merged)
# Create adjusted copies to avoid mutating existing deltas
adjusted = []
for d in merged:
adj = StrategyDelta(
asset=d.asset,
delta=d.delta - mean_adjust,
vega=d.vega,
gamma=d.gamma,
target_pnl=d.target_pnl,
max_order_size=d.max_order_size,
timestamp=d.timestamp,
)
adjusted.append(adj)
merged = adjusted
return PlanDelta(deltas=merged, venue=None, author="curator")
def enforce_with_fallback(self, plans: List[PlanDelta], fallback: Optional["ShadowFallback"] = None) -> PlanDelta:
"""Enforce cross-venue constraints with optional shadow/fallback strategy.
If there are no plans to enforce, and a fallback is provided, use the
fallback to produce a safe delta plan. Otherwise, fall back to the standard
enforcement path.
"""
if not plans and fallback is not None:
return fallback.propose_safe(plans)
return self.enforce(plans)
class ShadowFallback:
"""Lightweight shadow/fallback solver to propose safe deltas when disconnected."""
def propose_safe(self, signals) -> PlanDelta:
# If signals is a list of StrategyDelta, create a corresponding zero-delta plan
deltas: List[StrategyDelta] = []
# Normalize: extract StrategyDelta items whether the input contains StrategyDelta or PlanDelta
items: List[StrategyDelta] = []
for s in signals:
if isinstance(s, PlanDelta):
items.extend(s.deltas)
elif isinstance(s, StrategyDelta):
items.append(s)
for st in items:
deltas.append(StrategyDelta(asset=st.asset, delta=0.0, timestamp=0.0))
return PlanDelta(deltas=deltas, venue=None, author="shadow-fallback")