148 lines
5.8 KiB
Python
148 lines
5.8 KiB
Python
"""EnergiBridge: Canonical IR translator for ArbSphere primitives.
|
|
|
|
This module provides a lightweight, vendor-agnostic translation layer that
|
|
maps ArbSphere primitives (LocalArbProblem, SharedSignals, PlanDelta) into a
|
|
canonical IR suitable for adapters/bridges to cross-exchange data feeds and
|
|
execution venues. The goal is minimal, deterministic, and easy to extend.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import asdict
|
|
from typing import Any, Dict, List
|
|
|
|
from .core import LocalArbProblem, SharedSignals
|
|
from .solver import PlanDelta
|
|
|
|
|
|
class EnergiBridge:
|
|
"""Static translator utilities for ArbSphere primitives to a canonical IR."""
|
|
|
|
@staticmethod
|
|
def to_ir(local: LocalArbProblem, signals: SharedSignals, delta: PlanDelta | None = None) -> Dict[str, Any]:
|
|
"""Serialize a LocalArbProblem and SharedSignals (and optional PlanDelta) to IR.
|
|
|
|
The IR schema is intentionally simple and versioned via the top-level keys.
|
|
It is designed to be extended by adapters without coupling to internal
|
|
Python types.
|
|
"""
|
|
payload: Dict[str, Any] = {
|
|
"IRVersion": 1,
|
|
"Object": {
|
|
"id": local.id,
|
|
"venue": local.venue,
|
|
"asset_pair": local.asset_pair,
|
|
"target_misprice": local.target_misprice,
|
|
"max_exposure": local.max_exposure,
|
|
"latency_budget_ms": local.latency_budget_ms,
|
|
},
|
|
"SharedSignals": asdict(signals),
|
|
}
|
|
|
|
if delta is not None:
|
|
# Include a lightweight delta snapshot with actions for replay
|
|
payload["PlanDelta"] = {
|
|
"actions": delta.actions,
|
|
"timestamp": delta.timestamp.isoformat(),
|
|
"delta_id": getattr(delta, "delta_id", None),
|
|
"parent_id": getattr(delta, "parent_id", None),
|
|
}
|
|
|
|
# Optional cryptographic attestation for verifiable reproducibility
|
|
if getattr(delta, "signature", None) is not None:
|
|
payload["PlanDelta"]["signature"] = delta.signature
|
|
|
|
# Optional governance/provenance extensions
|
|
def _to_plain(obj):
|
|
return asdict(obj) if hasattr(obj, "__dataclass_fields__") else obj
|
|
if getattr(delta, "dual_variables", None) is not None:
|
|
payload["DualVariables"] = _to_plain(delta.dual_variables)
|
|
if getattr(delta, "audit_log", None) is not None:
|
|
payload["AuditLog"] = _to_plain(delta.audit_log)
|
|
if getattr(delta, "privacy_budget", None) is not None:
|
|
payload["PrivacyBudget"] = _to_plain(delta.privacy_budget)
|
|
|
|
return payload
|
|
|
|
@staticmethod
|
|
def from_ir(ir: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Deserialize a canonical IR payload back into a structured dict.
|
|
|
|
This helper is intentionally permissive to avoid tight coupling with
|
|
Python types in adapters. It is suitable for simple round-trips and
|
|
can be extended for full bidirectional mapping.
|
|
"""
|
|
return ir
|
|
|
|
@staticmethod
|
|
def merge_deltas(base: PlanDelta, new: PlanDelta) -> PlanDelta:
|
|
"""Deterministic merge of two PlanDelta objects.
|
|
|
|
For this toy MVP, we concatenate actions and keep the latest timestamp.
|
|
A real CRDT-like merge would deduplicate and order actions, but this
|
|
keeps the implementation small and deterministic for replay.
|
|
"""
|
|
# Deterministic merge with de-duplication of identical actions
|
|
merged_actions: List[Dict[str, Any]] = []
|
|
seen = set()
|
|
def _add_action(act: Dict[str, Any]):
|
|
key = str(act)
|
|
if key not in seen:
|
|
seen.add(key)
|
|
merged_actions.append(act)
|
|
|
|
if isinstance(base.actions, list):
|
|
for a in base.actions:
|
|
_add_action(a)
|
|
if isinstance(new.actions, list):
|
|
for a in new.actions:
|
|
_add_action(a)
|
|
|
|
latest_ts = max(base.timestamp, new.timestamp)
|
|
# Deterministic delta_id wiring for CRDT-like replay
|
|
base_id = getattr(base, "delta_id", None)
|
|
new_id = getattr(new, "delta_id", None)
|
|
if base_id and new_id:
|
|
merged_id = f"{base_id}:{new_id}"
|
|
elif base_id:
|
|
merged_id = base_id
|
|
elif new_id:
|
|
merged_id = new_id
|
|
else:
|
|
import uuid
|
|
merged_id = uuid.uuid4().hex
|
|
|
|
merged_parent = getattr(base, "parent_id", None) or getattr(new, "parent_id", None)
|
|
merged_dual = getattr(new, "dual_variables", None) if getattr(new, "dual_variables", None) is not None else getattr(base, "dual_variables", None)
|
|
merged_audit = getattr(new, "audit_log", None) if getattr(new, "audit_log", None) is not None else getattr(base, "audit_log", None)
|
|
merged_priv = getattr(new, "privacy_budget", None) if getattr(new, "privacy_budget", None) is not None else getattr(base, "privacy_budget", None)
|
|
|
|
merged = PlanDelta(
|
|
actions=merged_actions,
|
|
timestamp=latest_ts,
|
|
delta_id=merged_id,
|
|
parent_id=merged_parent,
|
|
dual_variables=merged_dual,
|
|
audit_log=merged_audit,
|
|
privacy_budget=merged_priv,
|
|
signature=None,
|
|
)
|
|
return merged
|
|
|
|
@staticmethod
|
|
def replay_deltas(deltas: List[PlanDelta]) -> PlanDelta:
|
|
"""Deterministically replay a list of PlanDelta objects by folding them.
|
|
|
|
This provides a lightweight, verifiable path to reproduce a sequence of
|
|
plan decisions without exposing raw data from individual deltas.
|
|
"""
|
|
if not deltas:
|
|
raise ValueError("deltas must be a non-empty list")
|
|
merged = deltas[0]
|
|
for d in deltas[1:]:
|
|
merged = EnergiBridge.merge_deltas(merged, d)
|
|
return merged
|
|
|
|
|
|
__all__ = ["EnergiBridge"]
|