from __future__ import annotations import json from typing import Dict, List from .core import LocalOrder, SharedSignals, PlanDelta, DualVariables, AuditLog def _sign(value: float) -> float: # deterministic helper for deterministic delta ordering return float(value) class FederationEngine: def __init__(self, policy): self.policy = policy # instance of Policy self.audit = AuditLog() def reconcile(self, venue_orders: Dict[str, List[LocalOrder]], venue_signals: Dict[str, SharedSignals]) -> PlanDelta: # Very lightweight ADMM-like reconciliation: # 1) compute cross-venue net exposure per symbol # 2) if any net_exposure exceeds policy, scale down orders proportionally # 3) emit delta as updates to orders (adjusted quantities) updates = [] # aggregate total exposure per symbol across venues exposure: Dict[str, float] = {} for venue, orders in venue_orders.items(): for o in orders: sign = 1.0 if o.side.lower() == "buy" else -1.0 exposure[o.symbol] = exposure.get(o.symbol, 0.0) + sign * o.quantity * o.price # enforce max net exposure per symbol if necessary by scaling quantities for venue, orders in venue_orders.items(): for o in orders: max_exposure = self.policy.max_net_exposure current_exposure = exposure.get(o.symbol, 0.0) projected = current_exposure # if projection exceeds allowed, scale this order's impact down if max_exposure != 0 and abs(projected) > max_exposure: # compute a conservative reduced quantity factor = max_exposure / max(1e-9, abs(projected)) new_qty = max(0.0, o.quantity * max(0.0, min(1.0, abs(factor)))) if new_qty < o.quantity: updates.append({ "venue": venue, "order_id": o.order_id, "symbol": o.symbol, "new_quantity": new_qty, "reason": "cross-venue exposure cap triggered", }) delta = PlanDelta(updates=updates) self.audit.add("reconcile", {"updates_count": len(updates)}) return delta def delta_sync(self, deltas: List[PlanDelta]) -> PlanDelta: # deterministically merge a list of deltas by concatenating and sorting by covered fields merged_updates = [] for d in deltas: merged_updates.extend(d.updates) # canonical sort: venue, order_id merged_updates.sort(key=lambda u: (u.get("venue", ""), u.get("order_id", ""))) result = PlanDelta(updates=merged_updates) self.audit.add("delta_sync", {"merged_count": len(merged_updates)}) return result