94 lines
3.1 KiB
Python
94 lines
3.1 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Dict, List, Optional
|
|
from dataclasses import dataclass, field
|
|
|
|
from tradeseal.governance import GovernanceLedger
|
|
|
|
|
|
@dataclass
|
|
class ContractVersion:
|
|
version: str
|
|
contract_id: str
|
|
payload_schema: Dict
|
|
|
|
|
|
@dataclass
|
|
class Contract:
|
|
name: str
|
|
versions: List[ContractVersion] = field(default_factory=list)
|
|
|
|
def add_version(self, version: ContractVersion) -> None:
|
|
self.versions.append(version)
|
|
|
|
def latest(self) -> Optional[ContractVersion]:
|
|
if not self.versions:
|
|
return None
|
|
# assume versions are appended in increasing order; return last
|
|
return self.versions[-1]
|
|
|
|
|
|
class ContractRegistry:
|
|
"""A lightweight Graph-of-Contracts registry for versioned data contracts and adapters."""
|
|
|
|
def __init__(self):
|
|
self._contracts: Dict[str, Contract] = {}
|
|
|
|
def add_contract_version(self, name: str, version: str, contract_id: str, payload_schema: Dict) -> ContractVersion:
|
|
c = self._contracts.get(name)
|
|
cv = ContractVersion(version=version, contract_id=contract_id, payload_schema=payload_schema)
|
|
if c is None:
|
|
c = Contract(name=name, versions=[cv])
|
|
self._contracts[name] = c
|
|
else:
|
|
c.add_version(cv)
|
|
return cv
|
|
|
|
def get_latest_contract(self, name: str) -> Optional[ContractVersion]:
|
|
c = self._contracts.get(name)
|
|
if c is None:
|
|
return None
|
|
return c.latest()
|
|
|
|
def get_contract_version(self, name: str, version: str) -> Optional[ContractVersion]:
|
|
c = self._contracts.get(name)
|
|
if not c:
|
|
return None
|
|
for v in c.versions:
|
|
if v.version == version:
|
|
return v
|
|
return None
|
|
|
|
def list_contracts(self) -> Dict[str, List[str]]:
|
|
return {name: [v.version for v in c.versions] for name, c in self._contracts.items()}
|
|
|
|
def seed_two_versions_for_demo(self) -> None:
|
|
# Seed sample contracts (two versioned contracts for MVP)
|
|
self.add_contract_version(
|
|
name="FIXFeedContract",
|
|
version="v1.0",
|
|
contract_id="FIX-DS-1",
|
|
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity", "price"]},
|
|
)
|
|
self.add_contract_version(
|
|
name="FIXFeedContract",
|
|
version="v1.1",
|
|
contract_id="FIX-DS-2",
|
|
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity", "price", "timestamp"]},
|
|
)
|
|
|
|
self.add_contract_version(
|
|
name="BrokerRESTContract",
|
|
version="v0.9",
|
|
contract_id="REST-DS-0.9",
|
|
payload_schema={"type": "order", "fields": ["order_id", "venue", "symbol", "side", "quantity"]},
|
|
)
|
|
|
|
def generate_attestation_for_contract(self, ledger: GovernanceLedger, name: str, version: str, data: dict) -> dict:
|
|
payload = {
|
|
"contract": name,
|
|
"version": version,
|
|
"payload": data,
|
|
}
|
|
return ledger.attest(payload)
|