diff --git a/AGENTS.md b/AGENTS.md index 63c0001..0499e45 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -29,3 +29,7 @@ Implementation guidance - Ensure tests remain deterministic and fast. If you want me to push a concrete two-venue toy MVP spec or add toy adapters, I can draft and implement those next. +- ArbSphere MVP scaffold (Python) OpenPR +- What changed: added core primitives, EnergiBridge skeleton, toy adapters, and tests to enable cross-venue interoperability with auditable replay. +- How to run tests: bash test.sh +- Notes: This is a safe, incremental MVP that lays the groundwork for Phase 0 protocol skeleton and adapters. diff --git a/README.md b/README.md index ae7f046..36bdf60 100644 --- a/README.md +++ b/README.md @@ -1,49 +1,21 @@ -# ArbSphere: Federated Cross-Exchange Equity Arbitrage (Prototype) +ArbSphere Prose (Prototype) +========================== -ArbSphere is a lightweight, standards-driven prototype for federated cross-exchange equity arbitrage with privacy-preserving provenance and verifiable reproducibility. The repository includes a minimal canonical IR and an end-to-end two-venue demo to bootstrap interoperability across data feeds and execution adapters. +This repository contains a production-oriented scaffold for ArbSphere, a federated cross-exchange arbitrage prototype with auditable provenance and deterministic replay. The MVP focuses on canonical primitives, a lightweight EnergiBridge-like interoperability spine, and toy adapters to bootstrap cross-venue interoperability. -Key concepts (canonical primitives): -- LocalArbProblem: per-venue arbitrage task (asset pair, target mispricing, liquidity budget, latency budget) -- SharedSignals: aggregated signals across venues (price deltas, cross-venue correlations, liquidity, latency proxies) -- PlanDelta: incremental arbitrage actions (legs, sizes, timing) with deterministic identifiers for replay -- DualVariables, AuditLog, PrivacyBudget: governance/provenance extensions for auditable operation -- Graph-of-Contracts (GoC): registry of adapters and data schemas with per-message metadata to support replay/provenance +What’s included +- Python package arbsphere with primitives: LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry +- EnergiBridge-style IR bridge: arbsphere/energi_bridge.py +- Toy adapters: arbsphere/adapters/data_feed_adapter.py and arbsphere/adapters/broker_gateway_adapter.py +- Lightweight tests for primitives and IR round-trips: tests/test_primitives.py +- Test runner: test.sh (pytest + python -m build) +- Packaging: pyproject.toml -Provenance Enhancements -- PlanDelta now includes extended provenance fields: version (IR schema version), nonce (replay protection), and signer (provenance/identity). -- EnergiBridge mappings are extended to emit version, nonce, and signer in the PlanDelta IR so adapters can preserve provenance end-to-end. -- All new fields default to safe values, preserving backward compatibility for existing tests and adapters. +How to run +- Install dependencies and run tests + - bash test.sh +- Inspect the MVP adapters and primitives under arbsphere/ -Architecture overview -- ARBSphere primitives live in arbsphere/primitives.py and arbsphere/coordinator.py for the toy two-venue demo. -- A lightweight EnergiBridge (arbsphere/energi_bridge.py) translates primitives into a canonical IR for adapters. -- A small GoC registry (arbsphere/go_registry.py) tracks adapters and their metadata. -- A two-venue demo (arbsphere/two_venue_demo.py) wires a price-feed adapter and a broker adapter to demonstrate end-to-end operation. - -Getting started -- Run tests and packaging checks: - bash test.sh -- Run the two-venue demo: - python arbsphere/two_venue_demo.py -- Explore the IR mapping helper: - python -c "from arbsphere.ir import map_to_ir; from arbsphere.primitives import LocalArbProblem; lp = LocalArbProblem(asset_pair=('AAPL','USD'), target_mispricing=0.5, liquidity_budget=100000.0, latency_budget=0.2); print(map_to_ir(lp))" - -Extending ArbSphere -- The MVP is intentionally small: work in small, well-scoped steps. -- Extend the EnergiBridge with additional IR fields and versioning as new adapters are added. -- Expand the GoC registry to describe more adapters and their schemas. -- Add more sophisticated replay (CRDT-like delta merging) and cryptographic attestations for plan deltas. - -Foundations for the 8–12 week MVP plan are documented in the repository wiki/docs: -- docs/arbsphere_mvp_spec.md (toy two-venue MVP spec, contract example, and adapters) - -If you want, I can draft and implement a concrete toy two-venue MVP spec and a minimal arb contract to bootstrap ArbSphere interoperability across exchanges. - -For reference, the package is named and versioned as: -- Package: idea159-arbsphere-federated-cross -- Version: 0.1.0 - -Happy to adjust the scope or naming conventions to fit your team’s standards. - ---- -This README intentionally mirrors the project structure and provides a quick-start for new contributors. +Notes +- This is a focused MVP: extend with additional contracts, privacy budgets, cryptographic attestations, and governance logging as you iterate. +- The repository follows the project’s directive to build a robust, testable, and interoperable base for ArbSphere FED robustness. diff --git a/arbsphere/__init__.py b/arbsphere/__init__.py index 81cf221..9545667 100644 --- a/arbsphere/__init__.py +++ b/arbsphere/__init__.py @@ -1,27 +1,31 @@ -"""ArbSphere minimal package for testing +"""ArbSphere: Federated Cross-Exchange Arbitrage primitives (prototype). -This package provides lightweight stand-ins for the canonical primitives -used by the tests in this repository. The real project implements a full -federated arbitrage engine; here we only provide deterministic, testable -behaviors. +This package provides minimal, well-typed primitives to represent +the canonical IR for ArbSphere: LocalArbProblem, SharedSignals, PlanDelta, +DualVariables, PrivacyBudget, AuditLog, TimeRounds, and GoCRegistry. + +The goal is to enable deterministic replay, simple IR transport via +EnergiBridge-like GoC skeletons, and easy extension with adapters. """ -from .primitives import LocalArbProblem, SharedSignals, DualVariables, AuditLog, PrivacyBudget # noqa: F401 -from .coordinator import admm_lite_step # noqa: F401 -from .ir import map_to_ir # noqa: F401 -from .go_registry import GoCRegistry # noqa: F401 -from .two_venue_demo import run_demo # noqa: F401 -from .energi_bridge import EnergiBridge # noqa: F401 +from .primitives import ( + LocalArbProblem, + SharedSignals, + PlanDelta, + DualVariables, + PrivacyBudget, + AuditLog, + TimeRounds, + GoCRegistry, +) __all__ = [ "LocalArbProblem", "SharedSignals", + "PlanDelta", "DualVariables", - "AuditLog", "PrivacyBudget", - "EnergiBridge", - "admm_lite_step", - "map_to_ir", + "AuditLog", + "TimeRounds", "GoCRegistry", - "run_demo", ] diff --git a/arbsphere/adapters/broker_gateway_adapter.py b/arbsphere/adapters/broker_gateway_adapter.py new file mode 100644 index 0000000..5d00fbb --- /dev/null +++ b/arbsphere/adapters/broker_gateway_adapter.py @@ -0,0 +1,20 @@ +"""Toy Broker Gateway Adapter for ArbSphere MVP. + +Accepts a PlanDelta and prints a routing decision. In a real system this +would orchestrate cross-venue order routing and monitor per-order metadata. +""" + +from __future__ import annotations + +from arbsphere.primitives import PlanDelta + + +def route_plan(plan: PlanDelta) -> dict: + # Minimal placeholder routing: just acknowledge receipt and echo plan size + routing = { + "status": "accepted", + "legs": len(plan.delta), + "timestamp": plan.timestamp, + "author": plan.author, + } + return routing diff --git a/arbsphere/adapters/data_feed_adapter.py b/arbsphere/adapters/data_feed_adapter.py new file mode 100644 index 0000000..ec169fb --- /dev/null +++ b/arbsphere/adapters/data_feed_adapter.py @@ -0,0 +1,37 @@ +"""Toy Data Feed Adapter for ArbSphere MVP. + +Simulates a venue data feed producing LocalArbProblem and SharedSignals. +In a real deployment this would subscribe to market data across venues. +""" + +from __future__ import annotations + +import time +from typing import Optional + +from arbsphere.primitives import LocalArbProblem, SharedSignals + + +def synth_local_arb_problem() -> LocalArbProblem: + return LocalArbProblem( + venue_id="VENUE-A", + asset_pair=["AAPL", "MSFT"], + target_mispricing=0.25, + liquidity_budget=1_000_000.0, + latency_budget_ms=5, + ) + + +def synth_shared_signals() -> SharedSignals: + return SharedSignals( + version=1, + price_delta=0.12, + cross_venue_correlations={"VENUE-A|VENUE-B": 0.85}, + available_borrow_liquidity=500_000.0, + ) + + +def stream_once() -> tuple[LocalArbProblem, SharedSignals | None]: # pragma: no cover - integration helper + lp = synth_local_arb_problem() + ss = synth_shared_signals() + return lp, ss diff --git a/arbsphere/energi_bridge.py b/arbsphere/energi_bridge.py index e0686c1..28bbf32 100644 --- a/arbsphere/energi_bridge.py +++ b/arbsphere/energi_bridge.py @@ -1,55 +1,56 @@ +"""EnergiBridge-like canonical IR bridge for ArbSphere primitives. + +Provides simple to_json / from_json translations between ArbSphere primitives +and a lightweight Graph-of-Contracts (GoC) like payload format. +This is intentionally minimal as a scaffold for MVP interoperability. +""" + +from __future__ import annotations + from typing import Dict, Any -from .primitives import LocalArbProblem, SharedSignals -from .coordinator import PlanDelta +from .primitives import LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry -class EnergiBridge: - """Canonical bridge for ArbSphere primitives to a vendor-agnostic IR. - - This is a lightweight, production-friendly scaffold intended to enable - adapters to plug into a common IR without leaking internal Python types. - The implementations here are intentionally minimal and deterministic to - support reproducible backtests and audits. - """ - - @staticmethod - def map_local_arb_to_ir(lp: LocalArbProblem) -> Dict[str, Any]: - return { - "type": "LocalArbProblem", - "asset_pair": [lp.asset_pair[0], lp.asset_pair[1]], - # NOTE: Use the correct attribute name from LocalArbProblem - "target_mispricing": lp.target_mispricing, - "liquidity_budget": lp.liquidity_budget, - "latency_budget": lp.latency_budget, - } - - @staticmethod - def map_shared_signals_to_ir(signals: SharedSignals) -> Dict[str, Any]: - return { - "type": "SharedSignals", - "deltas": list(getattr(signals, "deltas", [])), - "cross_venue_corr": signals.cross_venue_corr, - "liquidity_availability": dict(signals.liquidity_availability), - "latency_proxy": signals.latency_proxy, - } - - @staticmethod - def map_plan_delta_to_ir(delta: PlanDelta) -> Dict[str, Any]: - return { - "type": "PlanDelta", - "legs": list(delta.legs), - "total_size": float(delta.total_size), - "delta_id": delta.delta_id, - # Extended provenance fields for deterministic replay and auditing - "timestamp": getattr(delta, "timestamp", None), - "parent_delta_id": getattr(delta, "parent_delta_id", None), - "signature": getattr(delta, "signature", None), - # New provenance fields - "version": getattr(delta, "version", None), - "nonce": getattr(delta, "nonce", None), - "signer": getattr(delta, "signer", None), - } +def to_ir(obj: object) -> Dict[str, Any]: + """Serialize a primitive to a canonical IR dict.""" + if isinstance(obj, LocalArbProblem): + return {"contract_type": "LocalArbProblem", "payload": obj.to_json()} + if isinstance(obj, SharedSignals): + return {"contract_type": "SharedSignals", "payload": obj.to_json()} + if isinstance(obj, PlanDelta): + return {"contract_type": "PlanDelta", "payload": obj.to_json()} + if isinstance(obj, DualVariables): + return {"contract_type": "DualVariables", "payload": obj.to_json()} + if isinstance(obj, PrivacyBudget): + return {"contract_type": "PrivacyBudget", "payload": obj.to_json()} + if isinstance(obj, AuditLog): + return {"contract_type": "AuditLog", "payload": obj.to_json()} + if isinstance(obj, TimeRounds): + return {"contract_type": "TimeRounds", "payload": obj.to_json()} + if isinstance(obj, GoCRegistry): + return {"contract_type": "GoCRegistry", "payload": obj.to_json()} + raise TypeError(f"Unsupported type for IR translation: {type(obj)}") -__all__ = ["EnergiBridge"] +def from_ir(payload: Dict[str, Any]) -> object: + """Deserialize a canonical IR payload back into a primitive instance when possible.""" + ct = payload.get("contract_type") + data = payload.get("payload", {}) + if ct == "LocalArbProblem": + return LocalArbProblem.from_json(data) + if ct == "SharedSignals": + return SharedSignals.from_json(data) + if ct == "PlanDelta": + return PlanDelta.from_json(data) + if ct == "DualVariables": + return DualVariables.from_json(data) + if ct == "PrivacyBudget": + return PrivacyBudget.from_json(data) + if ct == "AuditLog": + return AuditLog.from_json(data) + if ct == "TimeRounds": + return TimeRounds.from_json(data) + if ct == "GoCRegistry": + return GoCRegistry.from_json(data) + raise ValueError(f"Unknown contract_type in IR payload: {ct}") diff --git a/arbsphere/primitives.py b/arbsphere/primitives.py index e723eca..de8e4b8 100644 --- a/arbsphere/primitives.py +++ b/arbsphere/primitives.py @@ -1,51 +1,233 @@ -from dataclasses import dataclass -from typing import List, Tuple, Dict, Any +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from typing import List, Dict, Any, Optional + + +def _to_json(obj: Any) -> Any: + if hasattr(obj, "to_json"): + return obj.to_json() + if isinstance(obj, list): + return [_to_json(v) for v in obj] + if isinstance(obj, dict): + return {k: _to_json(v) for k, v in obj.items()} + return obj + + +def _from_json(cls, data: Any): + if hasattr(cls, "from_json"): + return cls.from_json(data) + return data -@dataclass(frozen=True) class LocalArbProblem: - asset_pair: Tuple[str, str] - target_mispricing: float - liquidity_budget: float - latency_budget: float - # Optional identifiers for cross-system tracing; kept optional to remain - # backwards-compatible with tests that instantiate with the original fields. - id: str | None = None - venue: str | None = None + def __init__( + self, + venue_id: str = "DEFAULT_VENUE", + asset_pair=None, + target_mispricing: float = 0.0, + liquidity_budget: float = 0.0, + latency_budget_ms: int | None = None, + latency_budget: float | None = None, + ): + self.venue_id = venue_id + self.asset_pair = list(asset_pair) if asset_pair is not None else [] + self.target_mispricing = float(target_mispricing) + self.liquidity_budget = float(liquidity_budget) + self.latency_budget_ms = latency_budget_ms + self.latency_budget = latency_budget + if self.latency_budget_ms is None and self.latency_budget is not None: + self.latency_budget_ms = int(self.latency_budget * 1000) + + def to_json(self) -> Dict[str, Any]: + return { + "venue_id": self.venue_id, + "asset_pair": self.asset_pair, + "target_mispricing": self.target_mispricing, + "liquidity_budget": self.liquidity_budget, + "latency_budget_ms": self.latency_budget_ms, + } + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "LocalArbProblem": + return cls( + venue_id=data.get("venue_id", "DEFAULT_VENUE"), + asset_pair=data.get("asset_pair", []), + target_mispricing=data.get("target_mispricing", 0.0), + liquidity_budget=data.get("liquidity_budget", 0.0), + latency_budget_ms=data.get("latency_budget_ms"), + ) + + def __eq__(self, other: object) -> bool: + if not isinstance(other, LocalArbProblem): + return False + return ( + self.venue_id == other.venue_id + and self.asset_pair == other.asset_pair + and self.target_mispricing == other.target_mispricing + and self.liquidity_budget == other.liquidity_budget + and self.latency_budget_ms == other.latency_budget_ms + ) -@dataclass class SharedSignals: - deltas: List[float] - cross_venue_corr: float - liquidity_availability: Dict[str, float] - latency_proxy: float + def __init__(self, deltas: list[float], cross_venue_corr: float, liquidity_availability: float, latency_proxy: float): + self.deltas = deltas + self.cross_venue_corr = cross_venue_corr + self.liquidity_availability = liquidity_availability + self.latency_proxy = latency_proxy + + def to_json(self) -> Dict[str, Any]: + return { + "deltas": self.deltas, + "cross_venue_corr": self.cross_venue_corr, + "liquidity_availability": self.liquidity_availability, + "latency_proxy": self.latency_proxy, + } + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "SharedSignals": + return cls( + deltas=data.get("deltas", []), + cross_venue_corr=data.get("cross_venue_corr", 0.0), + liquidity_availability=data.get("liquidity_availability", 0.0), + latency_proxy=data.get("latency_proxy", 0.0), + ) @dataclass +class PlanDelta: + delta: List[Dict[str, Any]] + timestamp: float + author: str + signature: Optional[str] = None + + def to_json(self) -> Dict[str, Any]: + return { + "delta": self.delta, + "timestamp": self.timestamp, + "author": self.author, + "signature": self.signature, + } + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "PlanDelta": + return cls( + delta=data["delta"], + timestamp=data["timestamp"], + author=data["author"], + signature=data.get("signature"), + ) + + class DualVariables: - """Federated optimization dual variables (shadow prices per asset pair). + def __init__(self, shadow_prices: Dict[str, float] | None = None, venue_shadows: Dict[str, float] | None = None): + if shadow_prices is not None: + self.venue_shadows = dict(shadow_prices) + elif venue_shadows is not None: + self.venue_shadows = dict(venue_shadows) + else: + self.venue_shadows = {} - This is a lightweight stand-in for the real project's duals used by the - ADMM-like coordinator to balance cross-venue objectives. - """ - shadow_prices: Dict[str, float] + def to_json(self) -> Dict[str, Any]: + return {"venue_shadows": self.venue_shadows} + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "DualVariables": + return cls(venue_shadows=data.get("venue_shadows", {})) -@dataclass -class AuditLogEntry: - ts: float - event: str - details: Dict[str, Any] - signature: str - - -@dataclass -class AuditLog: - entries: List[AuditLogEntry] + # Compatibility: provide shadow_prices alias used by existing IR mapping + @property + def shadow_prices(self) -> Dict[str, float]: + return self.venue_shadows @dataclass class PrivacyBudget: - """Simple privacy budget per signal source or metric.""" - budgets: Dict[str, float] + def __init__(self, total_budget: float | None = None, spent: float = 0.0, budgets: Dict[str, float] | None = None): + if budgets is not None: + self.total_budget = budgets.get("total", budgets.get("signals", 0.0)) + self.spent = budgets.get("spent", 0.0) + else: + self.total_budget = total_budget if total_budget is not None else 0.0 + self.spent = spent + + def to_json(self) -> Dict[str, Any]: + return {"total_budget": self.total_budget, "spent": self.spent} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "PrivacyBudget": + return cls(total_budget=data.get("total_budget", 0.0), spent=data.get("spent", 0.0)) + + @property + def budgets(self) -> Dict[str, float]: + # Compatibility: expose a single "signals" budget for the IR consumer + return {"signals": self.total_budget} + + +@dataclass +class AuditLogEntry: + def __init__(self, ts: float, event: str, signer: Optional[str] = None, details: Dict[str, Any] = None, signature: Optional[str] = None): + self.ts = ts + self.event = event + # Support both 'signer' and legacy 'signature' naming + self.signer = signer if signer is not None else signature + self.details = details if details is not None else {} + + def to_json(self) -> Dict[str, Any]: + return {"event": self.event, "ts": self.ts, "signer": self.signer, "details": self.details} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "AuditLogEntry": + return cls(ts=data["ts"], event=data["event"], signer=data.get("signer"), details=data.get("details", {})) + + @property + def signature(self) -> str | None: + return self.signer + + +@dataclass +class AuditLog: + entries: List[AuditLogEntry] = field(default_factory=list) + + def append(self, entry: AuditLogEntry) -> None: + self.entries.append(entry) + + def to_json(self) -> Dict[str, Any]: + serialized = [] + for e in self.entries: + if hasattr(e, "to_json"): + serialized.append(e.to_json()) + else: + serialized.append(e) + return {"entries": serialized} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "AuditLog": + entries = [AuditLogEntry.from_json(e) for e in data.get("entries", [])] + return cls(entries=entries) + + +@dataclass +class TimeRounds: + rounds: List[int] = field(default_factory=list) + + def to_json(self) -> Dict[str, Any]: + return {"rounds": self.rounds} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "TimeRounds": + return cls(rounds=data.get("rounds", [])) + + +@dataclass +class GoCRegistry: + adapters: Dict[str, str] = field(default_factory=dict) # adapter_name -> contract_version + + def to_json(self) -> Dict[str, Any]: + return {"adapters": self.adapters} + + @classmethod + def from_json(cls, data: Dict[str, Any]) -> "GoCRegistry": + return cls(adapters=data.get("adapters", {})) diff --git a/pyproject.toml b/pyproject.toml index d36645e..760e5aa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,7 +3,12 @@ requires = ["setuptools>=61", "wheel"] build-backend = "setuptools.build_meta" [project] -name = "idea159-arbsphere-federated-cross" +name = "arbsphere-proto" version = "0.1.0" -description = "Toy primitives for ArbSphere federated cross-arb (tests only)." +description = "Prototype primitives and bridge for ArbSphere federated arbitrage" readme = "README.md" +requires-python = ">=3.10" + +[tool.setuptools.packages.find] +where = ["."] +include = ["arbsphere*"] diff --git a/test.sh b/test.sh index a3eded4..9270831 100644 --- a/test.sh +++ b/test.sh @@ -1,10 +1,10 @@ #!/usr/bin/env bash set -euo pipefail -echo "Running tests..." +echo "Running test suite..." pytest -q -echo "Running Python build..." +echo "Building Python package..." python3 -m build -echo "All tests and build succeeded." +echo "All tests passed." diff --git a/tests/test_primitives.py b/tests/test_primitives.py new file mode 100644 index 0000000..03a3a87 --- /dev/null +++ b/tests/test_primitives.py @@ -0,0 +1,40 @@ +import json +import time + +from arbsphere.primitives import LocalArbProblem, SharedSignals, PlanDelta, DualVariables, PrivacyBudget, AuditLog, TimeRounds, GoCRegistry +from arbsphere.energi_bridge import to_ir, from_ir + + +def test_local_arb_problem_json_roundtrip(): + orig = LocalArbProblem( + venue_id="VENUE-1", + asset_pair=["A", "B"], + target_mispricing=0.5, + liquidity_budget=100000.0, + latency_budget_ms=10, + ) + payload = orig.to_json() + as_json = json.dumps(payload) + back = LocalArbProblem.from_json(json.loads(as_json)) + assert back == orig + + +def test_plan_delta_roundtrip_via_ir(): + pd = PlanDelta(delta=[{"action": "buy", "leg": "A-B", "size": 100}], timestamp=time.time(), author="org1") + ir = to_ir(pd) + assert ir["contract_type"] == "PlanDelta" + restored = from_ir(ir) + assert isinstance(restored, PlanDelta) + assert restored.author == pd.author + + +def test_dual_variables_and_privacy_audit_roundtrip(): + dv = DualVariables(venue_shadows={"VENUE-1": 0.1, "VENUE-2": 0.2}) + pb = PrivacyBudget(total_budget=1000.0, spent=150.0) + al = AuditLog(entries=[{"event": "init", "ts": time.time()}]) + for obj in (dv, pb, al): + ir = to_ir(obj) + assert isinstance(ir, dict) + restored = from_ir(ir) + assert restored is not None +