diff --git a/README.md b/README.md index 7436b9a..ae750d8 100644 --- a/README.md +++ b/README.md @@ -1,48 +1,17 @@ -# ArbSphere Federated Cross (Toy) +ArbSphere - Federated Cross-Exchange Equity Arbitrage Primitives (Toy MVP) -This repository provides a minimal, test-focused Python package that defines -the canonical ArbSphere primitives and a deterministic ADMM-like step function -used by unit tests. It serves as a starting point for a more comprehensive -federated arbitrage prototype with privacy-preserving provenance and auditability. +Overview +- Provides a minimal, test-driven prototype of canonical ArbSphere primitives: + - LocalArbProblem, SharedSignals, PlanDelta, DualVariables, AuditLog, PrivacyBudget +- A lightweight EnergiBridge-like IR mapping (arbsphere.ir.map_to_ir) +- A tiny ADMM-style coordinator (arbsphere.coordinator.admm_lite_step) +- A Graph-of-Contracts registry scaffold (arbsphere.go_registry.GoCRegistry) +- Toy adapters (price feed, broker) to bootstrap interoperability -What this repo delivers today -- Core primitives: LocalArbProblem and SharedSignals, plus a simple PlanDelta - generated by admm_step, suitable for deterministic testing and replay. -- A lightweight translator: EnergiBridge, which serializes ArbSphere primitives into - a canonical IR for adapters and cross-venue bridges. -- A small Graph-of-Contracts registry (GoC) and a basic registry for adapters. -- Packaging scaffold with a test suite (pytest) and a packaging check (python -m build). +How to run tests +- Ensure Python 3.8+ is available +- Run: pytest -q +- (Optional) Run packaging check: python -m build -Roadmap (high level) -- Phase 0: protocol skeleton and two starter adapters with a lightweight ADMM-lite - coordinator. Demonstrate a simple cross-venue mispricing capture with bounded - plan feasibility and deterministic replay. -- Phase 1: governance ledger scaffold, identity management (DID/short-lived certs), - and secure aggregation for SharedSignals. -- Phase 2: end-to-end cross-domain demo in a two-venue simulation, SDK bindings (Python/C++), - and a minimal contract example. -- Phase 3: latency-aware backtesting harness, performance dashboards, and auditability metrics. - -Interoperability and privacy -- Canonical bridge mapping ArbSphere primitives to a vendor-agnostic IR (EnergiBridge). -- Protobuf/JSON-like IR is designed to be adapter-friendly, with per-message metadata - for replay protection and auditability. -- A lightweight Graph-of-Contracts registry to describe adapters, versions, and endpoints. - -Testing and packaging -- Run tests with: bash test.sh -- Packaging verification via: python3 -m build -- The repository is configured to be production-ready enough for CI checks while - remaining a clean, educational scaffold for exploring federated arbitration ideas. - -If helpful, I can draft toy adapter blueprints, an EnergiBridge mapping for ArbSphere, -and a minimal two-venue toy contract to bootstrap interoperability. - -Two-Venue MVP Orchestrator -- A small Python script (orchestrator.py) wired in idea159_arbsphere_federated_cross/ that demonstrates a complete two-venue flow: - - Build two LocalArbProblem instances for two venues - - Run admm_step against each to produce PlanDelta - - Deterministically merge deltas via EnergiBridge.merge_deltas - - Serialize to canonical IR with EnergiBridge.to_ir - - Route the merged delta through a toy broker adapter to simulate execution -- This serves as a practical, end-to-end demonstration of ArbSphere interoperability across two exchanges and a minimal governance/traceable flow. +Notes +- This is a production-oriented scaffold focused on deterministic replay, auditable provenance, and interoperability. It is not a complete trading system. diff --git a/arbsphere/__init__.py b/arbsphere/__init__.py new file mode 100644 index 0000000..09f19be --- /dev/null +++ b/arbsphere/__init__.py @@ -0,0 +1,20 @@ +"""ArbSphere minimal package for testing + +This package provides lightweight stand-ins for the canonical primitives +used by the tests in this repository. The real project implements a full +federated arbitrage engine; here we only provide deterministic, testable +behaviors. +""" + +from .primitives import LocalArbProblem, SharedSignals # noqa: F401 +from .coordinator import admm_lite_step # noqa: F401 +from .ir import map_to_ir # noqa: F401 +from .go_registry import GoCRegistry # noqa: F401 + +__all__ = [ + "LocalArbProblem", + "SharedSignals", + "admm_lite_step", + "map_to_ir", + "GoCRegistry", +] diff --git a/arbsphere/adapters/broker_adapter.py b/arbsphere/adapters/broker_adapter.py new file mode 100644 index 0000000..e100cc9 --- /dev/null +++ b/arbsphere/adapters/broker_adapter.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +"""Toy broker adapter. +Accepts PlanDelta-like dicts and records execution plan with per-order metadata. +""" + +from typing import List, Dict, Any +import time + + +class BrokerAdapter: + def __init__(self, venue_name: str = "venue-1") -> None: + self.venue_name = venue_name + self.executed: List[Dict[str, Any]] = [] + + def route(self, plan_delta: Dict[str, Any]) -> None: + # naive simulated execution: record legs with venue assignment + ts = time.time() + for leg in plan_delta.get("legs", []): + self.executed.append({ + "venue": self.venue_name, + "leg": leg, + "ts": ts, + }) + + def history(self) -> List[Dict[str, Any]]: + return list(self.executed) diff --git a/arbsphere/adapters/price_feed_adapter.py b/arbsphere/adapters/price_feed_adapter.py new file mode 100644 index 0000000..bef7233 --- /dev/null +++ b/arbsphere/adapters/price_feed_adapter.py @@ -0,0 +1,31 @@ +from __future__ import annotations + +"""Toy price-feed adapter. +This adapter simulates receiving quotes from an exchange and exposes a simple +interface that the ArbSphere coordinator can consume. +""" + +from typing import Dict, Any, List +import time + + +class PriceFeedAdapter: + def __init__(self, source: str = "mock-exchange") -> None: + self.source = source + self._tick = 0 + + def poll(self) -> Dict[str, Any]: + # deterministic synthetic data + self._tick += 1 + now = time.time() + price = 100.0 + (self._tick % 5) # simple wobble + return { + "source": self.source, + "ts": now, + "price": price, + "bid": price - 0.05, + "ask": price + 0.05, + } + + def as_series(self, n: int = 10) -> List[Dict[str, Any]]: + return [self.poll() for _ in range(n)] diff --git a/arbsphere/coordinator.py b/arbsphere/coordinator.py new file mode 100644 index 0000000..22056ef --- /dev/null +++ b/arbsphere/coordinator.py @@ -0,0 +1,40 @@ +import hashlib +from typing import List +from dataclasses import dataclass + +from .primitives import LocalArbProblem, SharedSignals + + +@dataclass +class PlanDelta: + legs: List[dict] + total_size: float + delta_id: str + + +def _deterministic_id(inputs: List[LocalArbProblem], shared: SharedSignals) -> str: + # Create a stable representation of inputs for deterministic hashing + parts = [] + for lp in sorted(inputs, key=lambda x: (x.asset_pair[0], x.asset_pair[1])): + parts.append( + f"{lp.asset_pair}:{lp.target_mispricing}:{lp.liquidity_budget}:{lp.latency_budget}" + ) + s = "|".join(parts) + f"|shared:{shared.cross_venue_corr}:{shared.latency_proxy}" + return hashlib.sha256(s.encode("utf-8")).hexdigest() + + +def admm_lite_step(local_problems: List[LocalArbProblem], shared_signals: SharedSignals) -> PlanDelta: + # Deterministic, simple projection: create one leg per local problem + legs = [] + # deterministic order by asset_pair + for lp in sorted(local_problems, key=lambda x: (x.asset_pair[0], x.asset_pair[1])): + leg_size = max(0.0, float(lp.liquidity_budget) * 0.01) + legs.append({ + "asset_pair": list(lp.asset_pair), + "size": leg_size, + "venue": "default", + }) + + total_size = float(sum(lp.liquidity_budget for lp in local_problems)) * 0.01 if local_problems else 0.0 + delta_id = _deterministic_id(local_problems, shared_signals) + return PlanDelta(legs=legs, total_size=total_size, delta_id=delta_id) diff --git a/arbsphere/go_registry.py b/arbsphere/go_registry.py new file mode 100644 index 0000000..ae87471 --- /dev/null +++ b/arbsphere/go_registry.py @@ -0,0 +1,14 @@ +from typing import Any, Dict + + +class GoCRegistry: + """Lightweight Graph-of-Contracts registry stub for tests.""" + + def __init__(self) -> None: + self._registry: Dict[str, Any] = {} + + def register_adapter(self, name: str, metadata: Any) -> None: + self._registry[name] = metadata + + def get_adapter(self, name: str) -> Any: + return self._registry.get(name) diff --git a/arbsphere/ir.py b/arbsphere/ir.py new file mode 100644 index 0000000..9512097 --- /dev/null +++ b/arbsphere/ir.py @@ -0,0 +1,13 @@ +from typing import Dict, List +from .primitives import LocalArbProblem + + +def map_to_ir(lp: LocalArbProblem) -> Dict: + # Simple IR mapping that preserves essential fields in a canonical form + return { + "type": "LocalArbProblem", + "asset_pair": [lp.asset_pair[0], lp.asset_pair[1]], + "target_mispricing": lp.target_mispricing, + "liquidity_budget": lp.liquidity_budget, + "latency_budget": lp.latency_budget, + } diff --git a/arbsphere/primitives.py b/arbsphere/primitives.py new file mode 100644 index 0000000..d8fac92 --- /dev/null +++ b/arbsphere/primitives.py @@ -0,0 +1,18 @@ +from dataclasses import dataclass +from typing import List, Tuple, Dict + + +@dataclass(frozen=True) +class LocalArbProblem: + asset_pair: Tuple[str, str] + target_mispricing: float + liquidity_budget: float + latency_budget: float + + +@dataclass +class SharedSignals: + deltas: List[float] + cross_venue_corr: float + liquidity_availability: Dict[str, float] + latency_proxy: float diff --git a/conftest.py b/conftest.py new file mode 100644 index 0000000..ddbd7b3 --- /dev/null +++ b/conftest.py @@ -0,0 +1,10 @@ +import sys +import os + +# Ensure the repository root is on sys.path so tests can import local packages +ROOT = os.path.dirname(os.path.abspath(__file__)) +GET_CWD = os.path.abspath(os.getcwd()) +if ROOT not in sys.path: + sys.path.insert(0, ROOT) +if GET_CWD not in sys.path: + sys.path.insert(0, GET_CWD) diff --git a/idea159_arbsphere_federated_cross/energi_bridge.py b/idea159_arbsphere_federated_cross/energi_bridge.py index 3470a69..a00f42e 100644 --- a/idea159_arbsphere_federated_cross/energi_bridge.py +++ b/idea159_arbsphere_federated_cross/energi_bridge.py @@ -48,6 +48,10 @@ class EnergiBridge: "parent_id": getattr(delta, "parent_id", None), } + # Optional cryptographic attestation for verifiable reproducibility + if getattr(delta, "signature", None) is not None: + payload["PlanDelta"]["signature"] = delta.signature + # Optional governance/provenance extensions def _to_plain(obj): return asdict(obj) if hasattr(obj, "__dataclass_fields__") else obj @@ -78,11 +82,21 @@ class EnergiBridge: A real CRDT-like merge would deduplicate and order actions, but this keeps the implementation small and deterministic for replay. """ + # Deterministic merge with de-duplication of identical actions merged_actions: List[Dict[str, Any]] = [] + seen = set() + def _add_action(act: Dict[str, Any]): + key = str(act) + if key not in seen: + seen.add(key) + merged_actions.append(act) + if isinstance(base.actions, list): - merged_actions.extend(base.actions) + for a in base.actions: + _add_action(a) if isinstance(new.actions, list): - merged_actions.extend(new.actions) + for a in new.actions: + _add_action(a) latest_ts = max(base.timestamp, new.timestamp) # Deterministic delta_id wiring for CRDT-like replay @@ -111,8 +125,23 @@ class EnergiBridge: dual_variables=merged_dual, audit_log=merged_audit, privacy_budget=merged_priv, + signature=None, ) return merged + @staticmethod + def replay_deltas(deltas: List[PlanDelta]) -> PlanDelta: + """Deterministically replay a list of PlanDelta objects by folding them. + + This provides a lightweight, verifiable path to reproduce a sequence of + plan decisions without exposing raw data from individual deltas. + """ + if not deltas: + raise ValueError("deltas must be a non-empty list") + merged = deltas[0] + for d in deltas[1:]: + merged = EnergiBridge.merge_deltas(merged, d) + return merged + __all__ = ["EnergiBridge"] diff --git a/idea159_arbsphere_federated_cross/solver.py b/idea159_arbsphere_federated_cross/solver.py index 36dea7b..28d77cd 100644 --- a/idea159_arbsphere_federated_cross/solver.py +++ b/idea159_arbsphere_federated_cross/solver.py @@ -15,6 +15,7 @@ class PlanDelta: privacy_budget=None, delta_id: str | None = None, parent_id: str | None = None, + signature: str | None = None, ): self.actions = actions self.timestamp = timestamp or datetime.utcnow() @@ -25,6 +26,8 @@ class PlanDelta: # Lightweight, deterministic delta identifiers to enable CRDT-like merging. self.delta_id = delta_id or uuid.uuid4().hex self.parent_id = parent_id + # Optional cryptographic attestation or provenance signature for this delta. + self.signature = signature def admm_step(local: LocalArbProblem, signals: SharedSignals) -> PlanDelta: diff --git a/sitecustomize.py b/sitecustomize.py new file mode 100644 index 0000000..bd85eea --- /dev/null +++ b/sitecustomize.py @@ -0,0 +1,12 @@ +"""Site customization to ensure local repo is importable as a package. + +This helps CI environments that may spawn isolated interpreters where the +repository root isn't on sys.path by default. We explicitly insert the repo +root to sys.path so local packages like arbsphere are discoverable by tests. +""" +import os +import sys + +REPO_ROOT = os.path.dirname(os.path.abspath(__file__)) +if REPO_ROOT not in sys.path: + sys.path.insert(0, REPO_ROOT) diff --git a/test.sh b/test.sh index 6f0c43b..a3eded4 100644 --- a/test.sh +++ b/test.sh @@ -1,8 +1,10 @@ #!/usr/bin/env bash set -euo pipefail -# Run Python tests +echo "Running tests..." pytest -q -# Build the package (verifies packaging metadata and structure) +echo "Running Python build..." python3 -m build + +echo "All tests and build succeeded." diff --git a/tests/test_arbsphere.py b/tests/test_arbsphere.py new file mode 100644 index 0000000..70ee0ba --- /dev/null +++ b/tests/test_arbsphere.py @@ -0,0 +1,26 @@ +import time +from arbsphere.primitives import LocalArbProblem, SharedSignals +from arbsphere.coordinator import admm_lite_step +from arbsphere.ir import map_to_ir +from arbsphere.go_registry import GoCRegistry + + +def test_deterministic_plan_delta_generation(): + lp1 = LocalArbProblem(asset_pair=("AAPL", "USD"), target_mispricing=0.5, liquidity_budget=100000.0, latency_budget=0.2) + lp2 = LocalArbProblem(asset_pair=("MSFT", "USD"), target_mispricing=0.3, liquidity_budget=80000.0, latency_budget=0.3) + shared = SharedSignals(deltas=[0.1, -0.05], cross_venue_corr=0.9, liquidity_availability={"venue-1": 100000.0}, latency_proxy=0.5) + delta = admm_lite_step([lp1, lp2], shared) + + # deterministic: legs ordered by asset_pair; total_size computed from legs + assert isinstance(delta.total_size, float) + assert len(delta.legs) == 2 + # delta_id must be stable for identical inputs + delta_id_1 = delta.delta_id + delta2 = admm_lite_step([lp1, lp2], shared) + assert delta2.delta_id == delta_id_1 + +def test_ir_mapping_roundtrip(): + lp = LocalArbProblem(asset_pair=("AAPL", "USD"), target_mispricing=0.5, liquidity_budget=100000.0, latency_budget=0.2) + ir = map_to_ir(lp) + assert ir["type"] == "LocalArbProblem" + assert ir["asset_pair"] == ["AAPL", "USD"] diff --git a/tests/test_replay.py b/tests/test_replay.py new file mode 100644 index 0000000..8d748d5 --- /dev/null +++ b/tests/test_replay.py @@ -0,0 +1,27 @@ +from datetime import datetime + +from idea159_arbsphere_federated_cross.solver import PlanDelta +from idea159_arbsphere_federated_cross.energi_bridge import EnergiBridge +from idea159_arbsphere_federated_cross.core import LocalArbProblem, SharedSignals + + +def test_merge_deltas_deduplicates_actions_and_supports_replay(): + base = PlanDelta(actions=[ + {"venue_from": "NYSE", "venue_to": "CROSS-VENUE", "instrument": "AAPL/GOOG", "size": 100, "time": "t1"} + ], timestamp=datetime(2020, 1, 1)) + + # New delta with a duplicate action and a new one + new = PlanDelta(actions=[ + {"venue_from": "NYSE", "venue_to": "CROSS-VENUE", "instrument": "AAPL/GOOG", "size": 100, "time": "t1"}, + {"venue_from": "NYSE", "venue_to": "CROSS-VENUE", "instrument": "AAPL/GOOG", "size": 50, "time": "t2"}, + ], timestamp=datetime(2020, 1, 2)) + + merged = EnergiBridge.merge_deltas(base, new) + # Deduplication should keep 2 unique actions total (one duplicate removed) + assert isinstance(merged, PlanDelta) + assert len(merged.actions) == 2 + + # Deterministic replay should fold the same two deltas into the same final delta + replayed = EnergiBridge.replay_deltas([base, new]) + assert isinstance(replayed, PlanDelta) + assert len(replayed.actions) == 2