idea69-tradeseal-federated-.../tests/test_tradeseal.py

71 lines
2.7 KiB
Python

import pytest
from tradeseal.core import LocalOrder, SharedSignals, PlanDelta, AuditLog, Policy
from tradeseal.federation import FederationEngine
from tradeseal.governance import GovernanceLedger
from tradeseal.contract_registry import ContractRegistry
def test_basic_primitives_and_delta_sync():
# Prepare two venues with orders
v1_orders = [LocalOrder(order_id="v1-1", venue="V1", symbol="AAPL", side="buy", quantity=100, price=150.0)]
v2_orders = [LocalOrder(order_id="v2-1", venue="V2", symbol="AAPL", side="buy", quantity=50, price=151.0)]
venue_orders = {"V1": v1_orders, "V2": v2_orders}
policy = Policy(max_net_exposure=100000.0, max_position_per_symbol=100000.0)
engine = FederationEngine(policy)
# simple signals per venue (not used heavily in this minimal MVP but present for API parity)
venue_signals = {
"V1": SharedSignals(venue="V1", risk_metrics={"net_exposure": 1000.0}),
"V2": SharedSignals(venue="V2", risk_metrics={"net_exposure": 500.0}),
}
delta1 = engine.reconcile(venue_orders, {"V1": venue_signals["V1"], "V2": venue_signals["V2"]})
# No adjustments should be required in this toy test since exposures are tiny
assert isinstance(delta1, PlanDelta)
assert isinstance(delta1.updates, list)
# delta_sync with two deltas should produce a deterministic merged delta
delta2 = engine.delta_sync([delta1, delta1])
assert isinstance(delta2, PlanDelta)
# the merged count should be even if delta1 has 0 updates
assert len(delta2.updates) == 0 or isinstance(delta2.updates[0], dict)
def test_governance_attestation_and_verify():
key = b"test-secret-key-1234"
ledger = GovernanceLedger(key)
data = {"action": "attest", "subject": "trade-seal", "payload": {"x": 1}}
entry = ledger.attest(data)
# verify that the entry can be validated
assert ledger.verify(entry) is True
# tamper should fail verification
bad = dict(entry)
bad["signature"] = "00" * 32
assert ledger.verify(bad) is False
def test_contract_registry_and_attestation():
# Prepare governance ledger and contract registry seed
key = b"test-secret-key-xyz-9876"
ledger = GovernanceLedger(key)
registry = ContractRegistry()
registry.seed_two_versions_for_demo()
latest = registry.get_latest_contract("FIXFeedContract")
assert latest is not None
assert latest.version == "v1.1"
att = registry.generate_attestation_for_contract(
ledger,
name="FIXFeedContract",
version=latest.version,
data={"example_field": "value"},
)
# Ensure attestation was produced and verifiable
assert isinstance(att, dict)
assert ledger.verify(att) is True