64 lines
2.8 KiB
Python
64 lines
2.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Dict, List
|
|
|
|
from .core import LocalOrder, SharedSignals, PlanDelta, DualVariables, AuditLog
|
|
|
|
|
|
def _sign(value: float) -> float:
|
|
# deterministic helper for deterministic delta ordering
|
|
return float(value)
|
|
|
|
|
|
class FederationEngine:
|
|
def __init__(self, policy):
|
|
self.policy = policy # instance of Policy
|
|
self.audit = AuditLog()
|
|
|
|
def reconcile(self, venue_orders: Dict[str, List[LocalOrder]], venue_signals: Dict[str, SharedSignals]) -> PlanDelta:
|
|
# Very lightweight ADMM-like reconciliation:
|
|
# 1) compute cross-venue net exposure per symbol
|
|
# 2) if any net_exposure exceeds policy, scale down orders proportionally
|
|
# 3) emit delta as updates to orders (adjusted quantities)
|
|
updates = []
|
|
# aggregate total exposure per symbol across venues
|
|
exposure: Dict[str, float] = {}
|
|
for venue, orders in venue_orders.items():
|
|
for o in orders:
|
|
sign = 1.0 if o.side.lower() == "buy" else -1.0
|
|
exposure[o.symbol] = exposure.get(o.symbol, 0.0) + sign * o.quantity * o.price
|
|
# enforce max net exposure per symbol if necessary by scaling quantities
|
|
for venue, orders in venue_orders.items():
|
|
for o in orders:
|
|
max_exposure = self.policy.max_net_exposure
|
|
current_exposure = exposure.get(o.symbol, 0.0)
|
|
projected = current_exposure
|
|
# if projection exceeds allowed, scale this order's impact down
|
|
if max_exposure != 0 and abs(projected) > max_exposure:
|
|
# compute a conservative reduced quantity
|
|
factor = max_exposure / max(1e-9, abs(projected))
|
|
new_qty = max(0.0, o.quantity * max(0.0, min(1.0, abs(factor))))
|
|
if new_qty < o.quantity:
|
|
updates.append({
|
|
"venue": venue,
|
|
"order_id": o.order_id,
|
|
"symbol": o.symbol,
|
|
"new_quantity": new_qty,
|
|
"reason": "cross-venue exposure cap triggered",
|
|
})
|
|
delta = PlanDelta(updates=updates)
|
|
self.audit.add("reconcile", {"updates_count": len(updates)})
|
|
return delta
|
|
|
|
def delta_sync(self, deltas: List[PlanDelta]) -> PlanDelta:
|
|
# deterministically merge a list of deltas by concatenating and sorting by covered fields
|
|
merged_updates = []
|
|
for d in deltas:
|
|
merged_updates.extend(d.updates)
|
|
# canonical sort: venue, order_id
|
|
merged_updates.sort(key=lambda u: (u.get("venue", ""), u.get("order_id", "")))
|
|
result = PlanDelta(updates=merged_updates)
|
|
self.audit.add("delta_sync", {"merged_count": len(merged_updates)})
|
|
return result
|