idea144-crossvenuearbx-fede.../crossvenue_arbx/coordinator.py

81 lines
3.2 KiB
Python

from __future__ import annotations
import time
from typing import Dict, List, Any
from .core import LocalArbProblem, SharedSignals, PlanDelta
from .governance import GraphOfContracts
from .bridge import EnergiBridge
class CentralCoordinator:
"""A lightweight, async-ADMM-like coordinator with deterministic replay.
This MVP keeps a registry of latest SharedSignals per venue and merges inputs
deterministically to produce PlanDelta actions.
"""
def __init__(self) -> None:
self.shared_signals_by_venue: Dict[str, SharedSignals] = {}
self.version: int = 0
self.contracts = GraphOfContracts()
# Each element is a list of delta action dicts produced by a venue
self.pending_delta_actions: List[List[Dict[str, Any]]] = []
self.last_plan: PlanDelta | None = None
# Optional bridge for canonical IR interoperability (can be used by adapters)
self.bridge = EnergiBridge()
self.last_catopt: Dict[str, Any] | None = None
def ingest_local(self, risk_source: LocalArbProblem, signals: SharedSignals) -> PlanDelta | None:
self.version += 1
self.shared_signals_by_venue[risk_source.venue] = signals
# Produce a canonical IR representation for interoperability
try:
self.last_catopt = self.bridge.to_catopt(risk_source, signals)
except Exception:
# Non-critical for MVP; retain existing behavior on bridge failure
self.last_catopt = None
# Naive cross-venue decision: if any price_delta_by_asset exceeds threshold, propose cross-venue move
delta = []
for asset, delta_price in signals.price_delta_by_asset.items():
if abs(delta_price) > 0.0005:
# simple dummy cross-venue action: move asset from venue A to venue B (names inferred)
action = {
"from_venue": risk_source.venue,
"to_venue": "VenueB" if risk_source.venue != "VenueB" else "VenueA",
"asset": asset,
"size": 10.0 * abs(delta_price),
"time": time.time(),
}
delta.append(action)
if delta:
plan = PlanDelta(
delta_actions=delta,
timestamp=time.time(),
contract_id="contract-1",
signature=f"sig-{self.version}",
)
self.last_plan = plan
self.pending_delta_actions.append(delta)
return plan
return None
def reconcile(self) -> PlanDelta | None:
# Deterministic delta reconciliation on reconnect: merge all pending deltas
if not self.pending_delta_actions:
return None
# Flatten actions and sort by a stable key
merged = []
for d in self.pending_delta_actions:
merged.extend(d)
merged.sort(key=lambda a: (a.get("from_venue"), a.get("to_venue"), a.get("asset")))
plan = PlanDelta(
delta_actions=merged,
timestamp=time.time(),
contract_id="contract-merged",
signature=f"sig-reconciled-{len(self.pending_delta_actions)}",
)
self.pending_delta_actions.clear()
self.last_plan = plan
return plan