71 lines
2.8 KiB
Python
71 lines
2.8 KiB
Python
import time
|
|
from deltax_forge_cross.dsl import Asset, MarketSignal, StrategyDelta, PlanDelta, AuditLogEntry
|
|
from deltax_forge_cross.core.local_venue_solver import LocalVenueSolver
|
|
from deltax_forge_cross.core.central_curator import CentralCurator
|
|
from deltax_forge_cross.adapters.price_feed_adapter import translate_market_signals
|
|
from deltax_forge_cross.adapters.venue_execution_adapter import translate_plan_delta
|
|
from deltax_forge_cross.logging import RunLog
|
|
from deltax_forge_cross.ledger import LedgerSigner, sign_and_append
|
|
|
|
|
|
def test_dsl_creation():
|
|
a1 = Asset(symbol="AAPL", kind="Equity")
|
|
a2 = Asset(symbol="MSFT", kind="Equity")
|
|
ms = MarketSignal(symbol="AAPL", price=150.0, liquidity=0.9, latency_proxy=0.01)
|
|
strategy = StrategyDelta(
|
|
delta_budget=10.0,
|
|
vega_budget=2.0,
|
|
gamma_budget=1.0,
|
|
pnl_target=1.0,
|
|
assets=[a1, a2],
|
|
signals=[ms],
|
|
)
|
|
assert strategy.delta_budget == 10.0
|
|
assert len(strategy.assets) == 2
|
|
|
|
|
|
def test_local_and_central_coherence():
|
|
a1 = Asset(symbol="AAPL", kind="Equity")
|
|
a2 = Asset(symbol="MSFT", kind="Equity")
|
|
strategy = StrategyDelta(delta_budget=10.0, vega_budget=2.0, gamma_budget=1.0, pnl_target=1.0,
|
|
assets=[a1, a2])
|
|
solver1 = LocalVenueSolver("VENUE1", assets=[a1.symbol, a2.symbol])
|
|
solver2 = LocalVenueSolver("VENUE2", assets=[a1.symbol, a2.symbol])
|
|
d1 = solver1.compute_local_delta(strategy)
|
|
d2 = solver2.compute_local_delta(strategy)
|
|
import pytest
|
|
curator = CentralCurator(["VENUE1", "VENUE2"])
|
|
shared = curator.coordinate([d1, d2])
|
|
assert shared["AAPL"] == pytest.approx(5.0)
|
|
assert shared["MSFT"] == pytest.approx(5.0)
|
|
|
|
|
|
def test_log_replay():
|
|
log = RunLog("test_run.log")
|
|
log.log({"event": "start"})
|
|
# Replay should yield 1+ entries; we simply verify that it yields a dict-like payload
|
|
entries = list(RunLog.replay("test_run.log"))
|
|
assert len(entries) >= 1
|
|
|
|
|
|
def test_adapter_translation_and_governance_signing():
|
|
a1 = Asset(symbol="AAPL", kind="Equity")
|
|
strategy = StrategyDelta(delta_budget=10.0, vega_budget=2.0, gamma_budget=1.0, pnl_target=1.0,
|
|
assets=[a1])
|
|
# price feed translation
|
|
ms = MarketSignal(symbol="AAPL", price=150.0, liquidity=0.8, latency_proxy=0.02)
|
|
strategy.signals = [ms]
|
|
feeds = translate_market_signals(strategy)
|
|
assert "AAPL" in feeds
|
|
|
|
# plan delta translation
|
|
plan = PlanDelta(hedge_updates={"AAPL": 5.0}, authority="tester")
|
|
exec_map = translate_plan_delta(plan)
|
|
assert exec_map["AAPL"] == 5.0
|
|
|
|
# governance signing
|
|
signer = LedgerSigner("secretkey")
|
|
payload = plan.to_dict() if hasattr(plan, 'to_dict') else plan.__dict__
|
|
sig = signer.sign(str(payload))
|
|
assert signer.verify(str(payload), sig)
|