61 lines
1.7 KiB
Python
61 lines
1.7 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, asdict
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Any
|
|
|
|
|
|
@dataclass
|
|
class GoCAdapter:
|
|
name: str
|
|
description: str
|
|
version: str
|
|
registered_at: datetime
|
|
|
|
|
|
@dataclass
|
|
class GoCSchema:
|
|
adapter_name: str
|
|
message_type: str
|
|
version: int
|
|
metadata: Dict[str, Any]
|
|
registered_at: datetime
|
|
|
|
|
|
class GraphOfContracts:
|
|
"""Lightweight in-memory registry describing adapters and data schemas."""
|
|
|
|
def __init__(self) -> None:
|
|
self.adapters: Dict[str, GoCAdapter] = {}
|
|
self.schemas: List[GoCSchema] = []
|
|
|
|
# Adapter lifecycle
|
|
def register_adapter(self, name: str, description: str, version: str) -> GoCAdapter:
|
|
adapter = GoCAdapter(name=name, description=description, version=version, registered_at=datetime.utcnow())
|
|
self.adapters[name] = adapter
|
|
return adapter
|
|
|
|
def get_adapter(self, name: str) -> Optional[GoCAdapter]:
|
|
return self.adapters.get(name)
|
|
|
|
# Schema registry
|
|
def register_schema(self, adapter_name: str, message_type: str, version: int, metadata: Optional[Dict[str, Any]] = None) -> GoCSchema:
|
|
schema = GoCSchema(
|
|
adapter_name=adapter_name,
|
|
message_type=message_type,
|
|
version=version,
|
|
metadata=metadata or {},
|
|
registered_at=datetime.utcnow(),
|
|
)
|
|
self.schemas.append(schema)
|
|
return schema
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"adapters": {k: asdict(v) for k, v in self.adapters.items()},
|
|
"schemas": [asdict(s) for s in self.schemas],
|
|
}
|
|
|
|
|
|
__all__ = ["GraphOfContracts", "GoCAdapter", "GoCSchema"]
|