idea159-arbsphere-federated.../idea159_arbsphere_federated.../energi_bridge.py

80 lines
2.9 KiB
Python

"""EnergiBridge: Canonical IR translator for ArbSphere primitives.
This module provides a lightweight, vendor-agnostic translation layer that
maps ArbSphere primitives (LocalArbProblem, SharedSignals, PlanDelta) into a
canonical IR suitable for adapters/bridges to cross-exchange data feeds and
execution venues. The goal is minimal, deterministic, and easy to extend.
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Dict, List
from .core import LocalArbProblem, SharedSignals
from .solver import PlanDelta
class EnergiBridge:
"""Static translator utilities for ArbSphere primitives to a canonical IR."""
@staticmethod
def to_ir(local: LocalArbProblem, signals: SharedSignals, delta: PlanDelta | None = None) -> Dict[str, Any]:
"""Serialize a LocalArbProblem and SharedSignals (and optional PlanDelta) to IR.
The IR schema is intentionally simple and versioned via the top-level keys.
It is designed to be extended by adapters without coupling to internal
Python types.
"""
payload: Dict[str, Any] = {
"IRVersion": 1,
"Object": {
"id": local.id,
"venue": local.venue,
"asset_pair": local.asset_pair,
"target_misprice": local.target_misprice,
"max_exposure": local.max_exposure,
"latency_budget_ms": local.latency_budget_ms,
},
"SharedSignals": asdict(signals),
}
if delta is not None:
# Include a lightweight delta snapshot with actions for replay
payload["PlanDelta"] = {
"actions": delta.actions,
"timestamp": delta.timestamp.isoformat(),
}
return payload
@staticmethod
def from_ir(ir: Dict[str, Any]) -> Dict[str, Any]:
"""Deserialize a canonical IR payload back into a structured dict.
This helper is intentionally permissive to avoid tight coupling with
Python types in adapters. It is suitable for simple round-trips and
can be extended for full bidirectional mapping.
"""
return ir
@staticmethod
def merge_deltas(base: PlanDelta, new: PlanDelta) -> PlanDelta:
"""Deterministic merge of two PlanDelta objects.
For this toy MVP, we concatenate actions and keep the latest timestamp.
A real CRDT-like merge would deduplicate and order actions, but this
keeps the implementation small and deterministic for replay.
"""
merged_actions: List[Dict[str, Any]] = []
if isinstance(base.actions, list):
merged_actions.extend(base.actions)
if isinstance(new.actions, list):
merged_actions.extend(new.actions)
latest_ts = max(base.timestamp, new.timestamp)
return PlanDelta(actions=merged_actions, timestamp=latest_ts)
__all__ = ["EnergiBridge"]