import os import sys BASE = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) if BASE not in sys.path: sys.path.insert(0, BASE) from crossvenue_arbx.adapters import PriceFeedAdapter, BrokerAdapter from crossvenue_arbx.coordinator import CentralCoordinator from crossvenue_arbx.core import LocalArbProblem, SharedSignals def test_end_to_end_two_venues_basic_reconciliation(): # Simple two-venue setup with two assets to validate end-to-end flow adA = PriceFeedAdapter("VenueA", ["AAPL", "MSFT"]) adB = PriceFeedAdapter("VenueB", ["AAPL", "MSFT"]) coord = CentralCoordinator() # Scene 1: VenueA emits a delta with an ensured large delta to produce a plan pA, sA = adA.step() # Force a sizable delta to trigger a plan (robust test across hash nondeterminism) sA.price_delta_by_asset["AAPL"] = 0.01 planA = coord.ingest_local(pA, sA) assert planA is not None assert isinstance(planA.delta_actions, list) assert len(planA.delta_actions) >= 0 # Scene 2: VenueB emits a delta large enough to produce a plan pB, sB = adB.step() sB.price_delta_by_asset["AAPL"] = 0.012 planB = coord.ingest_local(pB, sB) # Either VenueA or VenueB may yield a plan depending on deltas; at least one plan may exist assert planB is None or isinstance(planB, type(planA)) # Scene 3: reconcile across gathered deltas (offline islanding with deterministic replay) recon = coord.reconcile() if planA is not None or planB is not None: # Should produce a PlanDelta if there were pending deltas assert recon is None or isinstance(recon, type(planA)) if recon is not None: # Ensure the merged actions are sorted in a stable way actions = recon.delta_actions sorted_actions = sorted(actions, key=lambda x: (x.get("from_venue"), x.get("to_venue"), x.get("asset"))) assert actions == sorted_actions