build(agent): new-agents-3#dd492b iteration

This commit is contained in:
agent-dd492b85242a98c5 2026-04-20 15:21:21 +02:00
parent fd956819a0
commit df2d0a9f78
10 changed files with 414 additions and 149 deletions

View File

@ -29,3 +29,7 @@ Implementation guidance
- Ensure tests remain deterministic and fast. - Ensure tests remain deterministic and fast.
If you want me to push a concrete two-venue toy MVP spec or add toy adapters, I can draft and implement those next. If you want me to push a concrete two-venue toy MVP spec or add toy adapters, I can draft and implement those next.
- ArbSphere MVP scaffold (Python) OpenPR
- What changed: added core primitives, EnergiBridge skeleton, toy adapters, and tests to enable cross-venue interoperability with auditable replay.
- How to run tests: bash test.sh
- Notes: This is a safe, incremental MVP that lays the groundwork for Phase 0 protocol skeleton and adapters.

View File

@ -1,49 +1,21 @@
# ArbSphere: Federated Cross-Exchange Equity Arbitrage (Prototype) ArbSphere Prose (Prototype)
==========================
ArbSphere is a lightweight, standards-driven prototype for federated cross-exchange equity arbitrage with privacy-preserving provenance and verifiable reproducibility. The repository includes a minimal canonical IR and an end-to-end two-venue demo to bootstrap interoperability across data feeds and execution adapters. This repository contains a production-oriented scaffold for ArbSphere, a federated cross-exchange arbitrage prototype with auditable provenance and deterministic replay. The MVP focuses on canonical primitives, a lightweight EnergiBridge-like interoperability spine, and toy adapters to bootstrap cross-venue interoperability.
Key concepts (canonical primitives): Whats included
- LocalArbProblem: per-venue arbitrage task (asset pair, target mispricing, liquidity budget, latency budget) - Python package arbsphere with primitives: LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry
- SharedSignals: aggregated signals across venues (price deltas, cross-venue correlations, liquidity, latency proxies) - EnergiBridge-style IR bridge: arbsphere/energi_bridge.py
- PlanDelta: incremental arbitrage actions (legs, sizes, timing) with deterministic identifiers for replay - Toy adapters: arbsphere/adapters/data_feed_adapter.py and arbsphere/adapters/broker_gateway_adapter.py
- DualVariables, AuditLog, PrivacyBudget: governance/provenance extensions for auditable operation - Lightweight tests for primitives and IR round-trips: tests/test_primitives.py
- Graph-of-Contracts (GoC): registry of adapters and data schemas with per-message metadata to support replay/provenance - Test runner: test.sh (pytest + python -m build)
- Packaging: pyproject.toml
Provenance Enhancements How to run
- PlanDelta now includes extended provenance fields: version (IR schema version), nonce (replay protection), and signer (provenance/identity). - Install dependencies and run tests
- EnergiBridge mappings are extended to emit version, nonce, and signer in the PlanDelta IR so adapters can preserve provenance end-to-end. - bash test.sh
- All new fields default to safe values, preserving backward compatibility for existing tests and adapters. - Inspect the MVP adapters and primitives under arbsphere/
Architecture overview Notes
- ARBSphere primitives live in arbsphere/primitives.py and arbsphere/coordinator.py for the toy two-venue demo. - This is a focused MVP: extend with additional contracts, privacy budgets, cryptographic attestations, and governance logging as you iterate.
- A lightweight EnergiBridge (arbsphere/energi_bridge.py) translates primitives into a canonical IR for adapters. - The repository follows the projects directive to build a robust, testable, and interoperable base for ArbSphere FED robustness.
- A small GoC registry (arbsphere/go_registry.py) tracks adapters and their metadata.
- A two-venue demo (arbsphere/two_venue_demo.py) wires a price-feed adapter and a broker adapter to demonstrate end-to-end operation.
Getting started
- Run tests and packaging checks:
bash test.sh
- Run the two-venue demo:
python arbsphere/two_venue_demo.py
- Explore the IR mapping helper:
python -c "from arbsphere.ir import map_to_ir; from arbsphere.primitives import LocalArbProblem; lp = LocalArbProblem(asset_pair=('AAPL','USD'), target_mispricing=0.5, liquidity_budget=100000.0, latency_budget=0.2); print(map_to_ir(lp))"
Extending ArbSphere
- The MVP is intentionally small: work in small, well-scoped steps.
- Extend the EnergiBridge with additional IR fields and versioning as new adapters are added.
- Expand the GoC registry to describe more adapters and their schemas.
- Add more sophisticated replay (CRDT-like delta merging) and cryptographic attestations for plan deltas.
Foundations for the 812 week MVP plan are documented in the repository wiki/docs:
- docs/arbsphere_mvp_spec.md (toy two-venue MVP spec, contract example, and adapters)
If you want, I can draft and implement a concrete toy two-venue MVP spec and a minimal arb contract to bootstrap ArbSphere interoperability across exchanges.
For reference, the package is named and versioned as:
- Package: idea159-arbsphere-federated-cross
- Version: 0.1.0
Happy to adjust the scope or naming conventions to fit your teams standards.
---
This README intentionally mirrors the project structure and provides a quick-start for new contributors.

View File

@ -1,27 +1,31 @@
"""ArbSphere minimal package for testing """ArbSphere: Federated Cross-Exchange Arbitrage primitives (prototype).
This package provides lightweight stand-ins for the canonical primitives This package provides minimal, well-typed primitives to represent
used by the tests in this repository. The real project implements a full the canonical IR for ArbSphere: LocalArbProblem, SharedSignals, PlanDelta,
federated arbitrage engine; here we only provide deterministic, testable DualVariables, PrivacyBudget, AuditLog, TimeRounds, and GoCRegistry.
behaviors.
The goal is to enable deterministic replay, simple IR transport via
EnergiBridge-like GoC skeletons, and easy extension with adapters.
""" """
from .primitives import LocalArbProblem, SharedSignals, DualVariables, AuditLog, PrivacyBudget # noqa: F401 from .primitives import (
from .coordinator import admm_lite_step # noqa: F401 LocalArbProblem,
from .ir import map_to_ir # noqa: F401 SharedSignals,
from .go_registry import GoCRegistry # noqa: F401 PlanDelta,
from .two_venue_demo import run_demo # noqa: F401 DualVariables,
from .energi_bridge import EnergiBridge # noqa: F401 PrivacyBudget,
AuditLog,
TimeRounds,
GoCRegistry,
)
__all__ = [ __all__ = [
"LocalArbProblem", "LocalArbProblem",
"SharedSignals", "SharedSignals",
"PlanDelta",
"DualVariables", "DualVariables",
"AuditLog",
"PrivacyBudget", "PrivacyBudget",
"EnergiBridge", "AuditLog",
"admm_lite_step", "TimeRounds",
"map_to_ir",
"GoCRegistry", "GoCRegistry",
"run_demo",
] ]

View File

@ -0,0 +1,20 @@
"""Toy Broker Gateway Adapter for ArbSphere MVP.
Accepts a PlanDelta and prints a routing decision. In a real system this
would orchestrate cross-venue order routing and monitor per-order metadata.
"""
from __future__ import annotations
from arbsphere.primitives import PlanDelta
def route_plan(plan: PlanDelta) -> dict:
# Minimal placeholder routing: just acknowledge receipt and echo plan size
routing = {
"status": "accepted",
"legs": len(plan.delta),
"timestamp": plan.timestamp,
"author": plan.author,
}
return routing

View File

@ -0,0 +1,37 @@
"""Toy Data Feed Adapter for ArbSphere MVP.
Simulates a venue data feed producing LocalArbProblem and SharedSignals.
In a real deployment this would subscribe to market data across venues.
"""
from __future__ import annotations
import time
from typing import Optional
from arbsphere.primitives import LocalArbProblem, SharedSignals
def synth_local_arb_problem() -> LocalArbProblem:
return LocalArbProblem(
venue_id="VENUE-A",
asset_pair=["AAPL", "MSFT"],
target_mispricing=0.25,
liquidity_budget=1_000_000.0,
latency_budget_ms=5,
)
def synth_shared_signals() -> SharedSignals:
return SharedSignals(
version=1,
price_delta=0.12,
cross_venue_correlations={"VENUE-A|VENUE-B": 0.85},
available_borrow_liquidity=500_000.0,
)
def stream_once() -> tuple[LocalArbProblem, SharedSignals | None]: # pragma: no cover - integration helper
lp = synth_local_arb_problem()
ss = synth_shared_signals()
return lp, ss

View File

@ -1,55 +1,56 @@
"""EnergiBridge-like canonical IR bridge for ArbSphere primitives.
Provides simple to_json / from_json translations between ArbSphere primitives
and a lightweight Graph-of-Contracts (GoC) like payload format.
This is intentionally minimal as a scaffold for MVP interoperability.
"""
from __future__ import annotations
from typing import Dict, Any from typing import Dict, Any
from .primitives import LocalArbProblem, SharedSignals from .primitives import LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry
from .coordinator import PlanDelta
class EnergiBridge: def to_ir(obj: object) -> Dict[str, Any]:
"""Canonical bridge for ArbSphere primitives to a vendor-agnostic IR. """Serialize a primitive to a canonical IR dict."""
if isinstance(obj, LocalArbProblem):
This is a lightweight, production-friendly scaffold intended to enable return {"contract_type": "LocalArbProblem", "payload": obj.to_json()}
adapters to plug into a common IR without leaking internal Python types. if isinstance(obj, SharedSignals):
The implementations here are intentionally minimal and deterministic to return {"contract_type": "SharedSignals", "payload": obj.to_json()}
support reproducible backtests and audits. if isinstance(obj, PlanDelta):
""" return {"contract_type": "PlanDelta", "payload": obj.to_json()}
if isinstance(obj, DualVariables):
@staticmethod return {"contract_type": "DualVariables", "payload": obj.to_json()}
def map_local_arb_to_ir(lp: LocalArbProblem) -> Dict[str, Any]: if isinstance(obj, PrivacyBudget):
return { return {"contract_type": "PrivacyBudget", "payload": obj.to_json()}
"type": "LocalArbProblem", if isinstance(obj, AuditLog):
"asset_pair": [lp.asset_pair[0], lp.asset_pair[1]], return {"contract_type": "AuditLog", "payload": obj.to_json()}
# NOTE: Use the correct attribute name from LocalArbProblem if isinstance(obj, TimeRounds):
"target_mispricing": lp.target_mispricing, return {"contract_type": "TimeRounds", "payload": obj.to_json()}
"liquidity_budget": lp.liquidity_budget, if isinstance(obj, GoCRegistry):
"latency_budget": lp.latency_budget, return {"contract_type": "GoCRegistry", "payload": obj.to_json()}
} raise TypeError(f"Unsupported type for IR translation: {type(obj)}")
@staticmethod
def map_shared_signals_to_ir(signals: SharedSignals) -> Dict[str, Any]:
return {
"type": "SharedSignals",
"deltas": list(getattr(signals, "deltas", [])),
"cross_venue_corr": signals.cross_venue_corr,
"liquidity_availability": dict(signals.liquidity_availability),
"latency_proxy": signals.latency_proxy,
}
@staticmethod
def map_plan_delta_to_ir(delta: PlanDelta) -> Dict[str, Any]:
return {
"type": "PlanDelta",
"legs": list(delta.legs),
"total_size": float(delta.total_size),
"delta_id": delta.delta_id,
# Extended provenance fields for deterministic replay and auditing
"timestamp": getattr(delta, "timestamp", None),
"parent_delta_id": getattr(delta, "parent_delta_id", None),
"signature": getattr(delta, "signature", None),
# New provenance fields
"version": getattr(delta, "version", None),
"nonce": getattr(delta, "nonce", None),
"signer": getattr(delta, "signer", None),
}
__all__ = ["EnergiBridge"] def from_ir(payload: Dict[str, Any]) -> object:
"""Deserialize a canonical IR payload back into a primitive instance when possible."""
ct = payload.get("contract_type")
data = payload.get("payload", {})
if ct == "LocalArbProblem":
return LocalArbProblem.from_json(data)
if ct == "SharedSignals":
return SharedSignals.from_json(data)
if ct == "PlanDelta":
return PlanDelta.from_json(data)
if ct == "DualVariables":
return DualVariables.from_json(data)
if ct == "PrivacyBudget":
return PrivacyBudget.from_json(data)
if ct == "AuditLog":
return AuditLog.from_json(data)
if ct == "TimeRounds":
return TimeRounds.from_json(data)
if ct == "GoCRegistry":
return GoCRegistry.from_json(data)
raise ValueError(f"Unknown contract_type in IR payload: {ct}")

View File

@ -1,51 +1,233 @@
from dataclasses import dataclass from __future__ import annotations
from typing import List, Tuple, Dict, Any
import json
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional
def _to_json(obj: Any) -> Any:
if hasattr(obj, "to_json"):
return obj.to_json()
if isinstance(obj, list):
return [_to_json(v) for v in obj]
if isinstance(obj, dict):
return {k: _to_json(v) for k, v in obj.items()}
return obj
def _from_json(cls, data: Any):
if hasattr(cls, "from_json"):
return cls.from_json(data)
return data
@dataclass(frozen=True)
class LocalArbProblem: class LocalArbProblem:
asset_pair: Tuple[str, str] def __init__(
target_mispricing: float self,
liquidity_budget: float venue_id: str = "DEFAULT_VENUE",
latency_budget: float asset_pair=None,
# Optional identifiers for cross-system tracing; kept optional to remain target_mispricing: float = 0.0,
# backwards-compatible with tests that instantiate with the original fields. liquidity_budget: float = 0.0,
id: str | None = None latency_budget_ms: int | None = None,
venue: str | None = None latency_budget: float | None = None,
):
self.venue_id = venue_id
self.asset_pair = list(asset_pair) if asset_pair is not None else []
self.target_mispricing = float(target_mispricing)
self.liquidity_budget = float(liquidity_budget)
self.latency_budget_ms = latency_budget_ms
self.latency_budget = latency_budget
if self.latency_budget_ms is None and self.latency_budget is not None:
self.latency_budget_ms = int(self.latency_budget * 1000)
def to_json(self) -> Dict[str, Any]:
return {
"venue_id": self.venue_id,
"asset_pair": self.asset_pair,
"target_mispricing": self.target_mispricing,
"liquidity_budget": self.liquidity_budget,
"latency_budget_ms": self.latency_budget_ms,
}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "LocalArbProblem":
return cls(
venue_id=data.get("venue_id", "DEFAULT_VENUE"),
asset_pair=data.get("asset_pair", []),
target_mispricing=data.get("target_mispricing", 0.0),
liquidity_budget=data.get("liquidity_budget", 0.0),
latency_budget_ms=data.get("latency_budget_ms"),
)
def __eq__(self, other: object) -> bool:
if not isinstance(other, LocalArbProblem):
return False
return (
self.venue_id == other.venue_id
and self.asset_pair == other.asset_pair
and self.target_mispricing == other.target_mispricing
and self.liquidity_budget == other.liquidity_budget
and self.latency_budget_ms == other.latency_budget_ms
)
@dataclass
class SharedSignals: class SharedSignals:
deltas: List[float] def __init__(self, deltas: list[float], cross_venue_corr: float, liquidity_availability: float, latency_proxy: float):
cross_venue_corr: float self.deltas = deltas
liquidity_availability: Dict[str, float] self.cross_venue_corr = cross_venue_corr
latency_proxy: float self.liquidity_availability = liquidity_availability
self.latency_proxy = latency_proxy
def to_json(self) -> Dict[str, Any]:
return {
"deltas": self.deltas,
"cross_venue_corr": self.cross_venue_corr,
"liquidity_availability": self.liquidity_availability,
"latency_proxy": self.latency_proxy,
}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "SharedSignals":
return cls(
deltas=data.get("deltas", []),
cross_venue_corr=data.get("cross_venue_corr", 0.0),
liquidity_availability=data.get("liquidity_availability", 0.0),
latency_proxy=data.get("latency_proxy", 0.0),
)
@dataclass @dataclass
class PlanDelta:
delta: List[Dict[str, Any]]
timestamp: float
author: str
signature: Optional[str] = None
def to_json(self) -> Dict[str, Any]:
return {
"delta": self.delta,
"timestamp": self.timestamp,
"author": self.author,
"signature": self.signature,
}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "PlanDelta":
return cls(
delta=data["delta"],
timestamp=data["timestamp"],
author=data["author"],
signature=data.get("signature"),
)
class DualVariables: class DualVariables:
"""Federated optimization dual variables (shadow prices per asset pair). def __init__(self, shadow_prices: Dict[str, float] | None = None, venue_shadows: Dict[str, float] | None = None):
if shadow_prices is not None:
self.venue_shadows = dict(shadow_prices)
elif venue_shadows is not None:
self.venue_shadows = dict(venue_shadows)
else:
self.venue_shadows = {}
This is a lightweight stand-in for the real project's duals used by the def to_json(self) -> Dict[str, Any]:
ADMM-like coordinator to balance cross-venue objectives. return {"venue_shadows": self.venue_shadows}
"""
shadow_prices: Dict[str, float]
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "DualVariables":
return cls(venue_shadows=data.get("venue_shadows", {}))
@dataclass # Compatibility: provide shadow_prices alias used by existing IR mapping
class AuditLogEntry: @property
ts: float def shadow_prices(self) -> Dict[str, float]:
event: str return self.venue_shadows
details: Dict[str, Any]
signature: str
@dataclass
class AuditLog:
entries: List[AuditLogEntry]
@dataclass @dataclass
class PrivacyBudget: class PrivacyBudget:
"""Simple privacy budget per signal source or metric.""" def __init__(self, total_budget: float | None = None, spent: float = 0.0, budgets: Dict[str, float] | None = None):
budgets: Dict[str, float] if budgets is not None:
self.total_budget = budgets.get("total", budgets.get("signals", 0.0))
self.spent = budgets.get("spent", 0.0)
else:
self.total_budget = total_budget if total_budget is not None else 0.0
self.spent = spent
def to_json(self) -> Dict[str, Any]:
return {"total_budget": self.total_budget, "spent": self.spent}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "PrivacyBudget":
return cls(total_budget=data.get("total_budget", 0.0), spent=data.get("spent", 0.0))
@property
def budgets(self) -> Dict[str, float]:
# Compatibility: expose a single "signals" budget for the IR consumer
return {"signals": self.total_budget}
@dataclass
class AuditLogEntry:
def __init__(self, ts: float, event: str, signer: Optional[str] = None, details: Dict[str, Any] = None, signature: Optional[str] = None):
self.ts = ts
self.event = event
# Support both 'signer' and legacy 'signature' naming
self.signer = signer if signer is not None else signature
self.details = details if details is not None else {}
def to_json(self) -> Dict[str, Any]:
return {"event": self.event, "ts": self.ts, "signer": self.signer, "details": self.details}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "AuditLogEntry":
return cls(ts=data["ts"], event=data["event"], signer=data.get("signer"), details=data.get("details", {}))
@property
def signature(self) -> str | None:
return self.signer
@dataclass
class AuditLog:
entries: List[AuditLogEntry] = field(default_factory=list)
def append(self, entry: AuditLogEntry) -> None:
self.entries.append(entry)
def to_json(self) -> Dict[str, Any]:
serialized = []
for e in self.entries:
if hasattr(e, "to_json"):
serialized.append(e.to_json())
else:
serialized.append(e)
return {"entries": serialized}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "AuditLog":
entries = [AuditLogEntry.from_json(e) for e in data.get("entries", [])]
return cls(entries=entries)
@dataclass
class TimeRounds:
rounds: List[int] = field(default_factory=list)
def to_json(self) -> Dict[str, Any]:
return {"rounds": self.rounds}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "TimeRounds":
return cls(rounds=data.get("rounds", []))
@dataclass
class GoCRegistry:
adapters: Dict[str, str] = field(default_factory=dict) # adapter_name -> contract_version
def to_json(self) -> Dict[str, Any]:
return {"adapters": self.adapters}
@classmethod
def from_json(cls, data: Dict[str, Any]) -> "GoCRegistry":
return cls(adapters=data.get("adapters", {}))

View File

@ -3,7 +3,12 @@ requires = ["setuptools>=61", "wheel"]
build-backend = "setuptools.build_meta" build-backend = "setuptools.build_meta"
[project] [project]
name = "idea159-arbsphere-federated-cross" name = "arbsphere-proto"
version = "0.1.0" version = "0.1.0"
description = "Toy primitives for ArbSphere federated cross-arb (tests only)." description = "Prototype primitives and bridge for ArbSphere federated arbitrage"
readme = "README.md" readme = "README.md"
requires-python = ">=3.10"
[tool.setuptools.packages.find]
where = ["."]
include = ["arbsphere*"]

View File

@ -1,10 +1,10 @@
#!/usr/bin/env bash #!/usr/bin/env bash
set -euo pipefail set -euo pipefail
echo "Running tests..." echo "Running test suite..."
pytest -q pytest -q
echo "Running Python build..." echo "Building Python package..."
python3 -m build python3 -m build
echo "All tests and build succeeded." echo "All tests passed."

40
tests/test_primitives.py Normal file
View File

@ -0,0 +1,40 @@
import json
import time
from arbsphere.primitives import LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry
from arbsphere.energi_bridge import to_ir, from_ir
def test_local_arb_problem_json_roundtrip():
orig = LocalArbProblem(
venue_id="VENUE-1",
asset_pair=["A", "B"],
target_mispricing=0.5,
liquidity_budget=100000.0,
latency_budget_ms=10,
)
payload = orig.to_json()
as_json = json.dumps(payload)
back = LocalArbProblem.from_json(json.loads(as_json))
assert back == orig
def test_plan_delta_roundtrip_via_ir():
pd = PlanDelta(delta=[{"action": "buy", "leg": "A-B", "size": 100}], timestamp=time.time(), author="org1")
ir = to_ir(pd)
assert ir["contract_type"] == "PlanDelta"
restored = from_ir(ir)
assert isinstance(restored, PlanDelta)
assert restored.author == pd.author
def test_dual_variables_and_privacy_audit_roundtrip():
dv = DualVariables(venue_shadows={"VENUE-1": 0.1, "VENUE-2": 0.2})
pb = PrivacyBudget(total_budget=1000.0, spent=150.0)
al = AuditLog(entries=[{"event": "init", "ts": time.time()}])
for obj in (dv, pb, al):
ir = to_ir(obj)
assert isinstance(ir, dict)
restored = from_ir(ir)
assert restored is not None