From ae14be28897e49aebdf7bbde6e477fa25925bcf2 Mon Sep 17 00:00:00 2001 From: agent-7e3bbc424e07835b Date: Mon, 20 Apr 2026 14:12:58 +0200 Subject: [PATCH] build(agent): new-agents-2#7e3bbc iteration --- arbsphere/__init__.py | 2 ++ arbsphere/energi_bridge.py | 46 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 48 insertions(+) create mode 100644 arbsphere/energi_bridge.py diff --git a/arbsphere/__init__.py b/arbsphere/__init__.py index b06bb7a..81cf221 100644 --- a/arbsphere/__init__.py +++ b/arbsphere/__init__.py @@ -11,6 +11,7 @@ from .coordinator import admm_lite_step # noqa: F401 from .ir import map_to_ir # noqa: F401 from .go_registry import GoCRegistry # noqa: F401 from .two_venue_demo import run_demo # noqa: F401 +from .energi_bridge import EnergiBridge # noqa: F401 __all__ = [ "LocalArbProblem", @@ -18,6 +19,7 @@ __all__ = [ "DualVariables", "AuditLog", "PrivacyBudget", + "EnergiBridge", "admm_lite_step", "map_to_ir", "GoCRegistry", diff --git a/arbsphere/energi_bridge.py b/arbsphere/energi_bridge.py new file mode 100644 index 0000000..9da9794 --- /dev/null +++ b/arbsphere/energi_bridge.py @@ -0,0 +1,46 @@ +from typing import Dict, Any + +from .primitives import LocalArbProblem, SharedSignals +from .coordinator import PlanDelta + + +class EnergiBridge: + """Canonical bridge for ArbSphere primitives to a vendor-agnostic IR. + + This is a lightweight, production-friendly scaffold intended to enable + adapters to plug into a common IR without leaking internal Python types. + The implementations here are intentionally minimal and deterministic to + support reproducible backtests and audits. + """ + + @staticmethod + def map_local_arb_to_ir(lp: LocalArbProblem) -> Dict[str, Any]: + return { + "type": "LocalArbProblem", + "asset_pair": [lp.asset_pair[0], lp.asset_pair[1]], + "target_mispricing": lp.target_mispricing, + "liquidity_budget": lp.liquidity_budget, + "latency_budget": lp.latency_budget, + } + + @staticmethod + def map_shared_signals_to_ir(signals: SharedSignals) -> Dict[str, Any]: + return { + "type": "SharedSignals", + "deltas": list(getattr(signals, "deltas", [])), + "cross_venue_corr": signals.cross_venue_corr, + "liquidity_availability": dict(signals.liquidity_availability), + "latency_proxy": signals.latency_proxy, + } + + @staticmethod + def map_plan_delta_to_ir(delta: PlanDelta) -> Dict[str, Any]: + return { + "type": "PlanDelta", + "legs": list(delta.legs), + "total_size": float(delta.total_size), + "delta_id": delta.delta_id, + } + + +__all__ = ["EnergiBridge"]