idea149-deltax-forge-cross/tests/test_deltax.py

71 lines
2.8 KiB
Python

import time
from deltax_forge_cross.dsl import Asset, MarketSignal, StrategyDelta, PlanDelta, AuditLogEntry
from deltax_forge_cross.core.local_venue_solver import LocalVenueSolver
from deltax_forge_cross.core.central_curator import CentralCurator
from deltax_forge_cross.adapters.price_feed_adapter import translate_market_signals
from deltax_forge_cross.adapters.venue_execution_adapter import translate_plan_delta
from deltax_forge_cross.logging import RunLog
from deltax_forge_cross.ledger import LedgerSigner, sign_and_append
def test_dsl_creation():
a1 = Asset(symbol="AAPL", kind="Equity")
a2 = Asset(symbol="MSFT", kind="Equity")
ms = MarketSignal(symbol="AAPL", price=150.0, liquidity=0.9, latency_proxy=0.01)
strategy = StrategyDelta(
delta_budget=10.0,
vega_budget=2.0,
gamma_budget=1.0,
pnl_target=1.0,
assets=[a1, a2],
signals=[ms],
)
assert strategy.delta_budget == 10.0
assert len(strategy.assets) == 2
def test_local_and_central_coherence():
a1 = Asset(symbol="AAPL", kind="Equity")
a2 = Asset(symbol="MSFT", kind="Equity")
strategy = StrategyDelta(delta_budget=10.0, vega_budget=2.0, gamma_budget=1.0, pnl_target=1.0,
assets=[a1, a2])
solver1 = LocalVenueSolver("VENUE1", assets=[a1.symbol, a2.symbol])
solver2 = LocalVenueSolver("VENUE2", assets=[a1.symbol, a2.symbol])
d1 = solver1.compute_local_delta(strategy)
d2 = solver2.compute_local_delta(strategy)
import pytest
curator = CentralCurator(["VENUE1", "VENUE2"])
shared = curator.coordinate([d1, d2])
assert shared["AAPL"] == pytest.approx(5.0)
assert shared["MSFT"] == pytest.approx(5.0)
def test_log_replay():
log = RunLog("test_run.log")
log.log({"event": "start"})
# Replay should yield 1+ entries; we simply verify that it yields a dict-like payload
entries = list(RunLog.replay("test_run.log"))
assert len(entries) >= 1
def test_adapter_translation_and_governance_signing():
a1 = Asset(symbol="AAPL", kind="Equity")
strategy = StrategyDelta(delta_budget=10.0, vega_budget=2.0, gamma_budget=1.0, pnl_target=1.0,
assets=[a1])
# price feed translation
ms = MarketSignal(symbol="AAPL", price=150.0, liquidity=0.8, latency_proxy=0.02)
strategy.signals = [ms]
feeds = translate_market_signals(strategy)
assert "AAPL" in feeds
# plan delta translation
plan = PlanDelta(hedge_updates={"AAPL": 5.0}, authority="tester")
exec_map = translate_plan_delta(plan)
assert exec_map["AAPL"] == 5.0
# governance signing
signer = LedgerSigner("secretkey")
payload = plan.to_dict() if hasattr(plan, 'to_dict') else plan.__dict__
sig = signer.sign(str(payload))
assert signer.verify(str(payload), sig)