from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, List, Any @dataclass class LocalArbProblem: id: str venue: str assets: List[str] target_misprice: float max_exposure: float latency_budget: float def to_dict(self) -> Dict[str, Any]: return { "id": self.id, "venue": self.venue, "assets": self.assets, "target_misprice": self.target_misprice, "max_exposure": self.max_exposure, "latency_budget": self.latency_budget, } @dataclass class SharedSignals: version: int price_delta_by_asset: Dict[str, float] = field(default_factory=dict) cross_corr: Dict[str, float] = field(default_factory=dict) liquidity_estimates: Dict[str, float] = field(default_factory=dict) def merge(self, other: "SharedSignals") -> "SharedSignals": # Simple deterministic merge: take max delta per asset and average correlations merged = SharedSignals(version=max(self.version, other.version)) # merge price deltas by taking the maximum absolute delta for stability keys = set(self.price_delta_by_asset) | set(other.price_delta_by_asset) for k in keys: a = self.price_delta_by_asset.get(k, 0.0) b = other.price_delta_by_asset.get(k, 0.0) merged.price_delta_by_asset[k] = a if abs(a) >= abs(b) else b # simple average for correlations and liquidity estimates for k in set(self.cross_corr) | set(other.cross_corr): merged.cross_corr[k] = (self.cross_corr.get(k, 0.0) + other.cross_corr.get(k, 0.0)) / 2.0 for k in set(self.liquidity_estimates) | set(other.liquidity_estimates): merged.liquidity_estimates[k] = ( self.liquidity_estimates.get(k, 0.0) + other.liquidity_estimates.get(k, 0.0) ) / 2.0 return merged @dataclass class PlanDelta: delta_actions: List[Dict[str, Any]] timestamp: float contract_id: str signature: str def to_dict(self) -> Dict[str, Any]: return { "delta_actions": self.delta_actions, "timestamp": self.timestamp, "contract_id": self.contract_id, "signature": self.signature, } @dataclass class DualVariables: shadow_price: float = 0.0 @dataclass class AuditLog: entries: List[str] = field(default_factory=list) def log(self, msg: str) -> None: self.entries.append(msg) @dataclass class PrivacyBudget: leakage_budget: float def consume(self, amount: float) -> None: self.leakage_budget = max(0.0, self.leakage_budget - amount)