build(agent): new-agents#a6e6ec iteration

This commit is contained in:
agent-a6e6ec231c5f7801 2026-04-20 15:24:41 +02:00
parent c63d1e998c
commit 62e518140c
4 changed files with 113 additions and 1 deletions

View File

@ -13,6 +13,7 @@ Getting Started
- Run tests and build: `bash test.sh`.
- Run the demo locally: `python -m crossvenue_arbx.demo --iterations 3`.
- View coverage and governance artefacts in `crossvenue_arbx/governance.py` and `crossvenue_arbx/core.py`.
- Interoperability: a lightweight EnergiBridge (crossvenue_arbx.bridge) translates CrossVenueArbX primitives into a canonical CatOpt-IR for integration with external ecosystems.
Architecture Summary
- Local signal discovery per venue; aggregated signals managed by a central coordinator.

67
crossvenue_arbx/bridge.py Normal file
View File

@ -0,0 +1,67 @@
from __future__ import annotations
"""EnergiBridge-inspired interoperability bridge for CrossVenueArbX primitives.
This module provides a tiny, production-light translation layer that maps
CrossVenueArbX primitives (LocalArbProblem, SharedSignals, PlanDelta, etc.)
into a vendor-agnostic, CatOpt-like intermediate representation (IR).
Rationale:
- Enables interoperability with downstream systems that expect a canonical IR
- Keeps changes small and isolated to a single bridge module
- Supports deterministic replay semantics by producing stable, versioned IR blocks
"""
from typing import Any, Dict
from .core import LocalArbProblem, SharedSignals, PlanDelta
class EnergiBridge:
"""Bridge utility to convert CrossVenueArbX primitives to a CatOpt-like IR."""
@staticmethod
def to_catopt(problem: LocalArbProblem, signals: SharedSignals) -> Dict[str, Any]:
"""Convert a LocalArbProblem and its associated SharedSignals into a
canonical, vendor-agnostic representation.
The output is a simple nested dict structure with clear segregation of
objects and morphisms, suitable for transport or logging in cross-venue
systems. This lightweight representation is intentionally simple to keep
the MVP lean while still providing a predictable schema.
"""
catopt: Dict[str, Any] = {
"version": max(getattr(problem, "version", 0), getattr(signals, "version", 0))
if hasattr(problem, "version") or hasattr(signals, "version")
else 1,
"objects": {
"LocalArbProblem": problem.to_dict(),
"SharedSignals": {
"version": signals.version,
"price_delta_by_asset": dict(signals.price_delta_by_asset),
"cross_corr": dict(signals.cross_corr),
"liquidity_estimates": dict(signals.liquidity_estimates),
},
},
"morphisms": {
"SharedSignals": {
"version": signals.version,
# shallow representation; real implementations may include attestations
"connections": list(signals.cross_corr.keys()),
}
},
"plan_delta_ref": None, # linked PlanDelta would be populated at runtime
}
return catopt
@staticmethod
def from_catopt(catopt: Dict[str, Any]) -> Dict[str, Any]:
"""Inverse of to_catopt for debugging/demo purposes.
This is intentionally simple and returns the raw object-like payload for
downstream processing or logging.
"""
return {
"objects": catopt.get("objects", {}),
"morphisms": catopt.get("morphisms", {}),
}

View File

@ -5,6 +5,7 @@ from typing import Dict, List, Any
from .core import LocalArbProblem, SharedSignals, PlanDelta
from .governance import GraphOfContracts
from .bridge import EnergiBridge
class CentralCoordinator:
@ -18,12 +19,22 @@ class CentralCoordinator:
self.shared_signals_by_venue: Dict[str, SharedSignals] = {}
self.version: int = 0
self.contracts = GraphOfContracts()
self.pending_delta_actions: List[Dict[str, Any]] = []
# Each element is a list of delta action dicts produced by a venue
self.pending_delta_actions: List[List[Dict[str, Any]]] = []
self.last_plan: PlanDelta | None = None
# Optional bridge for canonical IR interoperability (can be used by adapters)
self.bridge = EnergiBridge()
self.last_catopt: Dict[str, Any] | None = None
def ingest_local(self, risk_source: LocalArbProblem, signals: SharedSignals) -> PlanDelta | None:
self.version += 1
self.shared_signals_by_venue[risk_source.venue] = signals
# Produce a canonical IR representation for interoperability
try:
self.last_catopt = self.bridge.to_catopt(risk_source, signals)
except Exception:
# Non-critical for MVP; retain existing behavior on bridge failure
self.last_catopt = None
# Naive cross-venue decision: if any price_delta_by_asset exceeds threshold, propose cross-venue move
delta = []
for asset, delta_price in signals.price_delta_by_asset.items():

33
tests/test_bridge.py Normal file
View File

@ -0,0 +1,33 @@
from crossvenue_arbx.bridge import EnergiBridge
from crossvenue_arbx.core import LocalArbProblem, SharedSignals
def test_to_catopt_basic_structure():
problem = LocalArbProblem(
id="venue1-p1",
venue="Venue1",
assets=["AAPL", "MSFT"],
target_misprice=0.001,
max_exposure=100000.0,
latency_budget=0.1,
)
signals = SharedSignals(
version=1,
price_delta_by_asset={"AAPL": 0.001, "MSFT": -0.0007},
cross_corr={("AAPL", "MSFT"): 0.8},
liquidity_estimates={"AAPL": 1.0, "MSFT": 1.2},
)
bridge = EnergiBridge()
catopt = bridge.to_catopt(problem, signals)
# Basic structure checks
assert isinstance(catopt, dict)
assert "version" in catopt
assert catopt["version"] >= 1
assert "objects" in catopt
objs = catopt["objects"]
assert "LocalArbProblem" in objs
assert objs["LocalArbProblem"]["venue"] == "Venue1"
assert "SharedSignals" in objs
assert objs["SharedSignals"]["version"] == 1