idea159-arbsphere-federated.../idea159_arbsphere_federated.../energi_bridge.py

148 lines
5.8 KiB
Python

"""EnergiBridge: Canonical IR translator for ArbSphere primitives.
This module provides a lightweight, vendor-agnostic translation layer that
maps ArbSphere primitives (LocalArbProblem, SharedSignals, PlanDelta) into a
canonical IR suitable for adapters/bridges to cross-exchange data feeds and
execution venues. The goal is minimal, deterministic, and easy to extend.
"""
from __future__ import annotations
from dataclasses import asdict
from typing import Any, Dict, List
from .core import LocalArbProblem, SharedSignals
from .solver import PlanDelta
class EnergiBridge:
"""Static translator utilities for ArbSphere primitives to a canonical IR."""
@staticmethod
def to_ir(local: LocalArbProblem, signals: SharedSignals, delta: PlanDelta | None = None) -> Dict[str, Any]:
"""Serialize a LocalArbProblem and SharedSignals (and optional PlanDelta) to IR.
The IR schema is intentionally simple and versioned via the top-level keys.
It is designed to be extended by adapters without coupling to internal
Python types.
"""
payload: Dict[str, Any] = {
"IRVersion": 1,
"Object": {
"id": local.id,
"venue": local.venue,
"asset_pair": local.asset_pair,
"target_misprice": local.target_misprice,
"max_exposure": local.max_exposure,
"latency_budget_ms": local.latency_budget_ms,
},
"SharedSignals": asdict(signals),
}
if delta is not None:
# Include a lightweight delta snapshot with actions for replay
payload["PlanDelta"] = {
"actions": delta.actions,
"timestamp": delta.timestamp.isoformat(),
"delta_id": getattr(delta, "delta_id", None),
"parent_id": getattr(delta, "parent_id", None),
}
# Optional cryptographic attestation for verifiable reproducibility
if getattr(delta, "signature", None) is not None:
payload["PlanDelta"]["signature"] = delta.signature
# Optional governance/provenance extensions
def _to_plain(obj):
return asdict(obj) if hasattr(obj, "__dataclass_fields__") else obj
if getattr(delta, "dual_variables", None) is not None:
payload["DualVariables"] = _to_plain(delta.dual_variables)
if getattr(delta, "audit_log", None) is not None:
payload["AuditLog"] = _to_plain(delta.audit_log)
if getattr(delta, "privacy_budget", None) is not None:
payload["PrivacyBudget"] = _to_plain(delta.privacy_budget)
return payload
@staticmethod
def from_ir(ir: Dict[str, Any]) -> Dict[str, Any]:
"""Deserialize a canonical IR payload back into a structured dict.
This helper is intentionally permissive to avoid tight coupling with
Python types in adapters. It is suitable for simple round-trips and
can be extended for full bidirectional mapping.
"""
return ir
@staticmethod
def merge_deltas(base: PlanDelta, new: PlanDelta) -> PlanDelta:
"""Deterministic merge of two PlanDelta objects.
For this toy MVP, we concatenate actions and keep the latest timestamp.
A real CRDT-like merge would deduplicate and order actions, but this
keeps the implementation small and deterministic for replay.
"""
# Deterministic merge with de-duplication of identical actions
merged_actions: List[Dict[str, Any]] = []
seen = set()
def _add_action(act: Dict[str, Any]):
key = str(act)
if key not in seen:
seen.add(key)
merged_actions.append(act)
if isinstance(base.actions, list):
for a in base.actions:
_add_action(a)
if isinstance(new.actions, list):
for a in new.actions:
_add_action(a)
latest_ts = max(base.timestamp, new.timestamp)
# Deterministic delta_id wiring for CRDT-like replay
base_id = getattr(base, "delta_id", None)
new_id = getattr(new, "delta_id", None)
if base_id and new_id:
merged_id = f"{base_id}:{new_id}"
elif base_id:
merged_id = base_id
elif new_id:
merged_id = new_id
else:
import uuid
merged_id = uuid.uuid4().hex
merged_parent = getattr(base, "parent_id", None) or getattr(new, "parent_id", None)
merged_dual = getattr(new, "dual_variables", None) if getattr(new, "dual_variables", None) is not None else getattr(base, "dual_variables", None)
merged_audit = getattr(new, "audit_log", None) if getattr(new, "audit_log", None) is not None else getattr(base, "audit_log", None)
merged_priv = getattr(new, "privacy_budget", None) if getattr(new, "privacy_budget", None) is not None else getattr(base, "privacy_budget", None)
merged = PlanDelta(
actions=merged_actions,
timestamp=latest_ts,
delta_id=merged_id,
parent_id=merged_parent,
dual_variables=merged_dual,
audit_log=merged_audit,
privacy_budget=merged_priv,
signature=None,
)
return merged
@staticmethod
def replay_deltas(deltas: List[PlanDelta]) -> PlanDelta:
"""Deterministically replay a list of PlanDelta objects by folding them.
This provides a lightweight, verifiable path to reproduce a sequence of
plan decisions without exposing raw data from individual deltas.
"""
if not deltas:
raise ValueError("deltas must be a non-empty list")
merged = deltas[0]
for d in deltas[1:]:
merged = EnergiBridge.merge_deltas(merged, d)
return merged
__all__ = ["EnergiBridge"]