idea159-arbsphere-federated.../arbsphere/energi_bridge.py

51 lines
1.8 KiB
Python

from typing import Dict, Any
from .primitives import LocalArbProblem, SharedSignals
from .coordinator import PlanDelta
class EnergiBridge:
"""Canonical bridge for ArbSphere primitives to a vendor-agnostic IR.
This is a lightweight, production-friendly scaffold intended to enable
adapters to plug into a common IR without leaking internal Python types.
The implementations here are intentionally minimal and deterministic to
support reproducible backtests and audits.
"""
@staticmethod
def map_local_arb_to_ir(lp: LocalArbProblem) -> Dict[str, Any]:
return {
"type": "LocalArbProblem",
"asset_pair": [lp.asset_pair[0], lp.asset_pair[1]],
"target_mispricing": lp.target_mispricing,
"liquidity_budget": lp.liquidity_budget,
"latency_budget": lp.latency_budget,
}
@staticmethod
def map_shared_signals_to_ir(signals: SharedSignals) -> Dict[str, Any]:
return {
"type": "SharedSignals",
"deltas": list(getattr(signals, "deltas", [])),
"cross_venue_corr": signals.cross_venue_corr,
"liquidity_availability": dict(signals.liquidity_availability),
"latency_proxy": signals.latency_proxy,
}
@staticmethod
def map_plan_delta_to_ir(delta: PlanDelta) -> Dict[str, Any]:
return {
"type": "PlanDelta",
"legs": list(delta.legs),
"total_size": float(delta.total_size),
"delta_id": delta.delta_id,
# Extended provenance fields for deterministic replay and auditing
"timestamp": getattr(delta, "timestamp", None),
"parent_delta_id": getattr(delta, "parent_delta_id", None),
"signature": getattr(delta, "signature", None),
}
__all__ = ["EnergiBridge"]