deltaforge-real-time-cross-.../deltaforge_mvp/coordination.py

58 lines
2.3 KiB
Python

"""ADMM-lite style coordination skeleton for cross-venue coherence."""
from __future__ import annotations
from typing import List, Optional
from deltaforge_mvp.core import PlanDelta, StrategyDelta
class LocalRiskSolver:
def __init__(self, venue_name: str):
self.venue_name = venue_name
def propose(self, signals: List[StrategyDelta]) -> PlanDelta:
# Minimal heuristic: aggregate deltas and propose a single PlanDelta per venue
# In real system this would solve a convex program; here we pass through the deltas.
pd = PlanDelta(deltas=signals, venue=self.venue_name, author="local-solver")
return pd
class CentralCurator:
def __init__(self):
pass
def enforce(self, plans: List[PlanDelta]) -> PlanDelta:
# Naive: merge all deltas into a single PlanDelta with combined deltas
merged = []
for p in plans:
merged.extend(p.deltas)
return PlanDelta(deltas=merged, venue=None, author="curator")
def enforce_with_fallback(self, plans: List[PlanDelta], fallback: Optional["ShadowFallback"] = None) -> PlanDelta:
"""Enforce cross-venue constraints with optional shadow/fallback strategy.
If there are no plans to enforce, and a fallback is provided, use the
fallback to produce a safe delta plan. Otherwise, fall back to the standard
enforcement path.
"""
if not plans and fallback is not None:
return fallback.propose_safe(plans)
return self.enforce(plans)
class ShadowFallback:
"""Lightweight shadow/fallback solver to propose safe deltas when disconnected."""
def propose_safe(self, signals) -> PlanDelta:
# If signals is a list of StrategyDelta, create a corresponding zero-delta plan
deltas: List[StrategyDelta] = []
# Normalize: extract StrategyDelta items whether the input contains StrategyDelta or PlanDelta
items: List[StrategyDelta] = []
for s in signals:
if isinstance(s, PlanDelta):
items.extend(s.deltas)
elif isinstance(s, StrategyDelta):
items.append(s)
for st in items:
deltas.append(StrategyDelta(asset=st.asset, delta=0.0, timestamp=0.0))
return PlanDelta(deltas=deltas, venue=None, author="shadow-fallback")