idea159-arbsphere-federated.../idea159_arbsphere_federated.../goc_registry.py

61 lines
1.7 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, asdict
from datetime import datetime
from typing import Dict, List, Optional, Any
@dataclass
class GoCAdapter:
name: str
description: str
version: str
registered_at: datetime
@dataclass
class GoCSchema:
adapter_name: str
message_type: str
version: int
metadata: Dict[str, Any]
registered_at: datetime
class GraphOfContracts:
"""Lightweight in-memory registry describing adapters and data schemas."""
def __init__(self) -> None:
self.adapters: Dict[str, GoCAdapter] = {}
self.schemas: List[GoCSchema] = []
# Adapter lifecycle
def register_adapter(self, name: str, description: str, version: str) -> GoCAdapter:
adapter = GoCAdapter(name=name, description=description, version=version, registered_at=datetime.utcnow())
self.adapters[name] = adapter
return adapter
def get_adapter(self, name: str) -> Optional[GoCAdapter]:
return self.adapters.get(name)
# Schema registry
def register_schema(self, adapter_name: str, message_type: str, version: int, metadata: Optional[Dict[str, Any]] = None) -> GoCSchema:
schema = GoCSchema(
adapter_name=adapter_name,
message_type=message_type,
version=version,
metadata=metadata or {},
registered_at=datetime.utcnow(),
)
self.schemas.append(schema)
return schema
def to_dict(self) -> Dict[str, Any]:
return {
"adapters": {k: asdict(v) for k, v in self.adapters.items()},
"schemas": [asdict(s) for s in self.schemas],
}
__all__ = ["GraphOfContracts", "GoCAdapter", "GoCSchema"]