92 lines
4.1 KiB
Python
92 lines
4.1 KiB
Python
import time
|
|
import uuid
|
|
from typing import Dict, Any
|
|
from .contracts import LocalProblem, SharedVariables, PlanDelta, ConstraintSet, DeviceInfo
|
|
|
|
|
|
class GraphContractRegistry:
|
|
def __init__(self) -> None:
|
|
self._contracts: Dict[str, Dict[str, Any]] = {}
|
|
|
|
# Backward-compatible API: support both old and new signatures used by tests
|
|
def register_contract(self, contract_type: str, version: str, payload: Dict[str, Any]) -> None:
|
|
# New API: store by (type, version)
|
|
self._contracts[(contract_type, version)] = payload
|
|
|
|
def get_contract(self, contract_type: str, version: str) -> Dict[str, Any]:
|
|
return self._contracts.get((contract_type, version), {})
|
|
|
|
def conformance_check(self, contract: Dict[str, Any]) -> bool:
|
|
# Minimal conformance: ensure required keys exist
|
|
required = {"type", "payload"}
|
|
if not isinstance(contract, dict):
|
|
return False
|
|
return required.issubset(set(contract.keys()))
|
|
|
|
def register_adapter(self, adapter_type: str, version: str, payload: Dict[str, Any]) -> None:
|
|
# minimal adapter registry (store in a separate namespace)
|
|
if not hasattr(self, "_adapters"):
|
|
self._adapters = {}
|
|
self._adapters[(adapter_type, version)] = payload
|
|
|
|
def conformance_test(self, adapter_iface: Dict[str, Any], contract_schema: Dict[str, Any]) -> bool:
|
|
"""Check adapter-contract conformance.
|
|
|
|
This test is intentionally lenient to support both legacy payloads and
|
|
payload-with-metadata structures. Some code paths register contracts as
|
|
plain payload dicts (legacy) while newer paths store contracts under a
|
|
{"payload": ..., "metadata": ...} wrapper. Accept either form for
|
|
compatibility while ensuring the adapter is present as well.
|
|
"""
|
|
key = (adapter_iface.get("name"), adapter_iface.get("version"))
|
|
contract_key = (contract_schema.get("name"), contract_schema.get("version"))
|
|
adapters_ok = getattr(self, "_adapters", {}).get(key) is not None
|
|
entry = self._contracts.get(contract_key)
|
|
# Accept both legacy payload form and new wrapped form
|
|
contracts_ok = False
|
|
if isinstance(entry, dict):
|
|
# New style with payload/metadata
|
|
if "payload" in entry:
|
|
contracts_ok = True
|
|
else:
|
|
# treat dict-like legacy payload as present
|
|
contracts_ok = True if entry else False
|
|
else:
|
|
contracts_ok = entry is not None
|
|
return adapters_ok and contracts_ok
|
|
|
|
# Extended API: support metadata-aware contract registration without breaking
|
|
# existing callers. This enables per-message auditing and replay protection.
|
|
def register_contract_with_meta(self, contract_type: str, version: str, payload: Dict[str, Any], metadata: Dict[str, Any] | None = None) -> None:
|
|
if not hasattr(self, "_contracts"):
|
|
self._contracts = {}
|
|
self._contracts[(contract_type, version)] = {
|
|
"payload": payload,
|
|
"metadata": metadata or {},
|
|
}
|
|
|
|
def get_contract_with_meta(self, contract_type: str, version: str) -> Dict[str, Any]:
|
|
entry = self._contracts.get((contract_type, version))
|
|
if isinstance(entry, dict) and "payload" in entry:
|
|
return entry
|
|
# Fallback to legacy payload storage if present
|
|
# Support both: a raw payload dict, or None
|
|
if isinstance(entry, dict):
|
|
# Legacy form where the entry itself is the payload dict
|
|
return {"payload": entry, "metadata": {}}
|
|
return {"payload": entry, "metadata": {}}
|
|
|
|
|
|
class ContractRegistry:
|
|
"""Backward-compatible, simplified registry interface used by tests.
|
|
Maps (type_name, version) -> contract payload dict.
|
|
"""
|
|
def __init__(self) -> None:
|
|
self._store: Dict[tuple, Any] = {}
|
|
|
|
def register_contract(self, contract_type: str, version: str, payload: Any) -> None:
|
|
self._store[(contract_type, version)] = payload
|
|
|
|
def get_contract(self, contract_type: str, version: str) -> Any:
|
|
return self._store.get((contract_type, version))
|