"""EnergiBridge: Canonical IR translator for ArbSphere primitives. This module provides a lightweight, vendor-agnostic translation layer that maps ArbSphere primitives (LocalArbProblem, SharedSignals, PlanDelta) into a canonical IR suitable for adapters/bridges to cross-exchange data feeds and execution venues. The goal is minimal, deterministic, and easy to extend. """ from __future__ import annotations from dataclasses import asdict from typing import Any, Dict, List from .core import LocalArbProblem, SharedSignals from .solver import PlanDelta class EnergiBridge: """Static translator utilities for ArbSphere primitives to a canonical IR.""" @staticmethod def to_ir(local: LocalArbProblem, signals: SharedSignals, delta: PlanDelta | None = None) -> Dict[str, Any]: """Serialize a LocalArbProblem and SharedSignals (and optional PlanDelta) to IR. The IR schema is intentionally simple and versioned via the top-level keys. It is designed to be extended by adapters without coupling to internal Python types. """ payload: Dict[str, Any] = { "IRVersion": 1, "Object": { "id": local.id, "venue": local.venue, "asset_pair": local.asset_pair, "target_misprice": local.target_misprice, "max_exposure": local.max_exposure, "latency_budget_ms": local.latency_budget_ms, }, "SharedSignals": asdict(signals), } if delta is not None: # Include a lightweight delta snapshot with actions for replay payload["PlanDelta"] = { "actions": delta.actions, "timestamp": delta.timestamp.isoformat(), "delta_id": getattr(delta, "delta_id", None), "parent_id": getattr(delta, "parent_id", None), } # Optional cryptographic attestation for verifiable reproducibility if getattr(delta, "signature", None) is not None: payload["PlanDelta"]["signature"] = delta.signature # Optional governance/provenance extensions def _to_plain(obj): return asdict(obj) if hasattr(obj, "__dataclass_fields__") else obj if getattr(delta, "dual_variables", None) is not None: payload["DualVariables"] = _to_plain(delta.dual_variables) if getattr(delta, "audit_log", None) is not None: payload["AuditLog"] = _to_plain(delta.audit_log) if getattr(delta, "privacy_budget", None) is not None: payload["PrivacyBudget"] = _to_plain(delta.privacy_budget) return payload @staticmethod def from_ir(ir: Dict[str, Any]) -> Dict[str, Any]: """Deserialize a canonical IR payload back into a structured dict. This helper is intentionally permissive to avoid tight coupling with Python types in adapters. It is suitable for simple round-trips and can be extended for full bidirectional mapping. """ return ir @staticmethod def merge_deltas(base: PlanDelta, new: PlanDelta) -> PlanDelta: """Deterministic merge of two PlanDelta objects. For this toy MVP, we concatenate actions and keep the latest timestamp. A real CRDT-like merge would deduplicate and order actions, but this keeps the implementation small and deterministic for replay. """ # Deterministic merge with de-duplication of identical actions merged_actions: List[Dict[str, Any]] = [] seen = set() def _add_action(act: Dict[str, Any]): key = str(act) if key not in seen: seen.add(key) merged_actions.append(act) if isinstance(base.actions, list): for a in base.actions: _add_action(a) if isinstance(new.actions, list): for a in new.actions: _add_action(a) latest_ts = max(base.timestamp, new.timestamp) # Deterministic delta_id wiring for CRDT-like replay base_id = getattr(base, "delta_id", None) new_id = getattr(new, "delta_id", None) if base_id and new_id: merged_id = f"{base_id}:{new_id}" elif base_id: merged_id = base_id elif new_id: merged_id = new_id else: import uuid merged_id = uuid.uuid4().hex merged_parent = getattr(base, "parent_id", None) or getattr(new, "parent_id", None) merged_dual = getattr(new, "dual_variables", None) if getattr(new, "dual_variables", None) is not None else getattr(base, "dual_variables", None) merged_audit = getattr(new, "audit_log", None) if getattr(new, "audit_log", None) is not None else getattr(base, "audit_log", None) merged_priv = getattr(new, "privacy_budget", None) if getattr(new, "privacy_budget", None) is not None else getattr(base, "privacy_budget", None) merged = PlanDelta( actions=merged_actions, timestamp=latest_ts, delta_id=merged_id, parent_id=merged_parent, dual_variables=merged_dual, audit_log=merged_audit, privacy_budget=merged_priv, signature=None, ) return merged @staticmethod def replay_deltas(deltas: List[PlanDelta]) -> PlanDelta: """Deterministically replay a list of PlanDelta objects by folding them. This provides a lightweight, verifiable path to reproduce a sequence of plan decisions without exposing raw data from individual deltas. """ if not deltas: raise ValueError("deltas must be a non-empty list") merged = deltas[0] for d in deltas[1:]: merged = EnergiBridge.merge_deltas(merged, d) return merged __all__ = ["EnergiBridge"]