70 lines
2.7 KiB
Python
70 lines
2.7 KiB
Python
from __future__ import annotations
|
|
|
|
import time
|
|
from typing import Dict, List, Any
|
|
|
|
from .core import LocalArbProblem, SharedSignals, PlanDelta
|
|
from .governance import GraphOfContracts
|
|
|
|
|
|
class CentralCoordinator:
|
|
"""A lightweight, async-ADMM-like coordinator with deterministic replay.
|
|
|
|
This MVP keeps a registry of latest SharedSignals per venue and merges inputs
|
|
deterministically to produce PlanDelta actions.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self.shared_signals_by_venue: Dict[str, SharedSignals] = {}
|
|
self.version: int = 0
|
|
self.contracts = GraphOfContracts()
|
|
self.pending_delta_actions: List[Dict[str, Any]] = []
|
|
self.last_plan: PlanDelta | None = None
|
|
|
|
def ingest_local(self, risk_source: LocalArbProblem, signals: SharedSignals) -> PlanDelta | None:
|
|
self.version += 1
|
|
self.shared_signals_by_venue[risk_source.venue] = signals
|
|
# Naive cross-venue decision: if any price_delta_by_asset exceeds threshold, propose cross-venue move
|
|
delta = []
|
|
for asset, delta_price in signals.price_delta_by_asset.items():
|
|
if abs(delta_price) > 0.0005:
|
|
# simple dummy cross-venue action: move asset from venue A to venue B (names inferred)
|
|
action = {
|
|
"from_venue": risk_source.venue,
|
|
"to_venue": "VenueB" if risk_source.venue != "VenueB" else "VenueA",
|
|
"asset": asset,
|
|
"size": 10.0 * abs(delta_price),
|
|
"time": time.time(),
|
|
}
|
|
delta.append(action)
|
|
if delta:
|
|
plan = PlanDelta(
|
|
delta_actions=delta,
|
|
timestamp=time.time(),
|
|
contract_id="contract-1",
|
|
signature=f"sig-{self.version}",
|
|
)
|
|
self.last_plan = plan
|
|
self.pending_delta_actions.append(delta)
|
|
return plan
|
|
return None
|
|
|
|
def reconcile(self) -> PlanDelta | None:
|
|
# Deterministic delta reconciliation on reconnect: merge all pending deltas
|
|
if not self.pending_delta_actions:
|
|
return None
|
|
# Flatten actions and sort by a stable key
|
|
merged = []
|
|
for d in self.pending_delta_actions:
|
|
merged.extend(d)
|
|
merged.sort(key=lambda a: (a.get("from_venue"), a.get("to_venue"), a.get("asset")))
|
|
plan = PlanDelta(
|
|
delta_actions=merged,
|
|
timestamp=time.time(),
|
|
contract_id="contract-merged",
|
|
signature=f"sig-reconciled-{len(self.pending_delta_actions)}",
|
|
)
|
|
self.pending_delta_actions.clear()
|
|
self.last_plan = plan
|
|
return plan
|