35 lines
1.0 KiB
Python
35 lines
1.0 KiB
Python
"""Lightweight in-memory contract registry for CatOpt bridge.
|
|
|
|
This module exposes a simple Registry API to store versioned contracts
|
|
used by the CatOpt bridge for interoperability.
|
|
"""
|
|
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
class ContractRegistry:
|
|
def __init__(self) -> None:
|
|
# Map contract name -> {"version": str, "schema": Any}
|
|
self._contracts: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def register(self, name: str, version: str, schema: Any) -> None:
|
|
self._contracts[name] = {"version": version, "schema": schema}
|
|
|
|
def get(self, name: str) -> Optional[Dict[str, Any]]:
|
|
return self._contracts.get(name)
|
|
|
|
def all(self) -> Dict[str, Dict[str, Any]]:
|
|
return dict(self._contracts)
|
|
|
|
|
|
# Public singleton registry instance used by the bridge
|
|
REGISTRY = ContractRegistry()
|
|
|
|
|
|
def register_contract(name: str, version: str, schema: Any) -> None:
|
|
REGISTRY.register(name, version, schema)
|
|
|
|
|
|
def get_contract(name: str) -> Optional[Dict[str, Any]]:
|
|
return REGISTRY.get(name)
|