idea69-tradeseal-federated-.../tradeseal/contract_registry.py

94 lines
3.1 KiB
Python

from __future__ import annotations
from typing import Dict, List, Optional
from dataclasses import dataclass, field
from tradeseal.governance import GovernanceLedger
@dataclass
class ContractVersion:
version: str
contract_id: str
payload_schema: Dict
@dataclass
class Contract:
name: str
versions: List[ContractVersion] = field(default_factory=list)
def add_version(self, version: ContractVersion) -> None:
self.versions.append(version)
def latest(self) -> Optional[ContractVersion]:
if not self.versions:
return None
# assume versions are appended in increasing order; return last
return self.versions[-1]
class ContractRegistry:
"""A lightweight Graph-of-Contracts registry for versioned data contracts and adapters."""
def __init__(self):
self._contracts: Dict[str, Contract] = {}
def add_contract_version(self, name: str, version: str, contract_id: str, payload_schema: Dict) -> ContractVersion:
c = self._contracts.get(name)
cv = ContractVersion(version=version, contract_id=contract_id, payload_schema=payload_schema)
if c is None:
c = Contract(name=name, versions=[cv])
self._contracts[name] = c
else:
c.add_version(cv)
return cv
def get_latest_contract(self, name: str) -> Optional[ContractVersion]:
c = self._contracts.get(name)
if c is None:
return None
return c.latest()
def get_contract_version(self, name: str, version: str) -> Optional[ContractVersion]:
c = self._contracts.get(name)
if not c:
return None
for v in c.versions:
if v.version == version:
return v
return None
def list_contracts(self) -> Dict[str, List[str]]:
return {name: [v.version for v in c.versions] for name, c in self._contracts.items()}
def seed_two_versions_for_demo(self) -> None:
# Seed sample contracts (two versioned contracts for MVP)
self.add_contract_version(
name="FIXFeedContract",
version="v1.0",
contract_id="FIX-DS-1",
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity", "price"]},
)
self.add_contract_version(
name="FIXFeedContract",
version="v1.1",
contract_id="FIX-DS-2",
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity", "price", "timestamp"]},
)
self.add_contract_version(
name="BrokerRESTContract",
version="v0.9",
contract_id="REST-DS-0.9",
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity"]},
)
def generate_attestation_for_contract(self, ledger: GovernanceLedger, name: str, version: str, data: dict) -> dict:
payload = {
"contract": name,
"version": version,
"payload": data,
}
return ledger.attest(payload)