idea159-arbsphere-federated.../idea159_arbsphere_federated.../energi_bridge.py

117 lines
4.6 KiB
Python

"""EnergiBridge: Canonical IR translator for ArbSphere primitives.
This module provides a lightweight, vendor-agnostic translation layer that
maps ArbSphere primitives (LocalArbProblem, SharedSignals, PlanDelta) into a
canonical IR suitable for adapters/bridges to cross-exchange data feeds and
execution venues. The goal is minimal, deterministic, and easy to extend.
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Dict, List
from .core import LocalArbProblem, SharedSignals
from .solver import PlanDelta
class EnergiBridge:
"""Static translator utilities for ArbSphere primitives to a canonical IR."""
@staticmethod
def to_ir(local: LocalArbProblem, signals: SharedSignals, delta: PlanDelta | None = None) -> Dict[str, Any]:
"""Serialize a LocalArbProblem and SharedSignals (and optional PlanDelta) to IR.
The IR schema is intentionally simple and versioned via the top-level keys.
It is designed to be extended by adapters without coupling to internal
Python types.
"""
payload: Dict[str, Any] = {
"IRVersion": 1,
"Object": {
"id": local.id,
"venue": local.venue,
"asset_pair": local.asset_pair,
"target_misprice": local.target_misprice,
"max_exposure": local.max_exposure,
"latency_budget_ms": local.latency_budget_ms,
},
"SharedSignals": asdict(signals),
}
if delta is not None:
# Include a lightweight delta snapshot with actions for replay
payload["PlanDelta"] = {
"actions": delta.actions,
"timestamp": delta.timestamp.isoformat(),
}
# Optional governance/provenance extensions
def _to_plain(obj):
return asdict(obj) if hasattr(obj, "__dataclass_fields__") else obj
if getattr(delta, "dual_variables", None) is not None:
payload["DualVariables"] = _to_plain(delta.dual_variables)
if getattr(delta, "audit_log", None) is not None:
payload["AuditLog"] = _to_plain(delta.audit_log)
if getattr(delta, "privacy_budget", None) is not None:
payload["PrivacyBudget"] = _to_plain(delta.privacy_budget)
return payload
@staticmethod
def from_ir(ir: Dict[str, Any]) -> Dict[str, Any]:
"""Deserialize a canonical IR payload back into a structured dict.
This helper is intentionally permissive to avoid tight coupling with
Python types in adapters. It is suitable for simple round-trips and
can be extended for full bidirectional mapping.
"""
return ir
@staticmethod
def merge_deltas(base: PlanDelta, new: PlanDelta) -> PlanDelta:
"""Deterministic merge of two PlanDelta objects.
For this toy MVP, we concatenate actions and keep the latest timestamp.
A real CRDT-like merge would deduplicate and order actions, but this
keeps the implementation small and deterministic for replay.
"""
merged_actions: List[Dict[str, Any]] = []
if isinstance(base.actions, list):
merged_actions.extend(base.actions)
if isinstance(new.actions, list):
merged_actions.extend(new.actions)
latest_ts = max(base.timestamp, new.timestamp)
# Deterministic delta_id wiring for CRDT-like replay
base_id = getattr(base, "delta_id", None)
new_id = getattr(new, "delta_id", None)
if base_id and new_id:
merged_id = f"{base_id}:{new_id}"
elif base_id:
merged_id = base_id
elif new_id:
merged_id = new_id
else:
import uuid
merged_id = uuid.uuid4().hex
merged_parent = getattr(base, "parent_id", None) or getattr(new, "parent_id", None)
merged_dual = getattr(new, "dual_variables", None) if getattr(new, "dual_variables", None) is not None else getattr(base, "dual_variables", None)
merged_audit = getattr(new, "audit_log", None) if getattr(new, "audit_log", None) is not None else getattr(base, "audit_log", None)
merged_priv = getattr(new, "privacy_budget", None) if getattr(new, "privacy_budget", None) is not None else getattr(base, "privacy_budget", None)
merged = PlanDelta(
actions=merged_actions,
timestamp=latest_ts,
delta_id=merged_id,
parent_id=merged_parent,
dual_variables=merged_dual,
audit_log=merged_audit,
privacy_budget=merged_priv,
)
return merged
__all__ = ["EnergiBridge"]