from __future__ import annotations from dataclasses import dataclass, asdict from typing import Any, Dict, List import json import time from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives import serialization # Basic IR blocks for cross-domain interoperability (CatOpt-inspired) @dataclass class FoundationModelBlock: id: str domain: str model_type: str device_targets: List[str] max_energy: float latency_budget: float @dataclass class PlanningPolicyBlock: safety_budget: float performance_budget: float drift_tolerance: float @dataclass class LocalProblemBlock: id: str domain: str description: str version: int = 1 @dataclass class SharedVariablesBlock: variables: Dict[str, Any] version: int = 1 @dataclass class PlanDeltaBlock: delta: Dict[str, Any] timestamp: float version: int = 1 @dataclass class DualVariablesBlock: lambda_energy: float lambda_privacy: float @dataclass class PrivacyBudgetBlock: lambda_privacy: float lambda_energy: float = 0.0 @dataclass class AuditLogBlock: entry: str signer: str timestamp: float contract_id: str version: int = 1 def to_json_block(obj: Any) -> str: if hasattr(obj, "__dict__"): return json.dumps(asdict(obj), default=str) return json.dumps(obj, default=str) class GoCRegistry: """Lightweight in-memory registry for adapters and contracts with per-message metadata. This is a production-like primitive to enable interoperability testing acrossCatOpt-like stacks. It provides signing for messages to enable replay-protected logging. """ def __init__(self) -> None: self._private_key = Ed25519PrivateKey.generate() self.public_key = self._private_key.public_key() self.adapters: Dict[str, Dict[str, Any]] = {} self.contracts: Dict[str, Dict[str, Any]] = {} self.entries: List[Dict[str, Any]] = [] def sign(self, message: bytes) -> bytes: return self._private_key.sign(message) def verify(self, message: bytes, signature: bytes) -> bool: try: self.public_key.verify(signature, message) return True except Exception: return False # Adapter management def register_adapter(self, adapter_id: str, supported_domains: List[str]) -> Dict[str, Any]: rec = { "adapter_id": adapter_id, "supported_domains": supported_domains, "registered_at": time.time(), "contract_version": 1, } self.adapters[adapter_id] = rec self._log_entry("register_adapter", adapter_id, rec) return rec # Contracts management (local problem, variables, deltas, etc.) def publish_contract(self, contract_id: str, payload: Dict[str, Any]) -> Dict[str, Any]: rec = {"contract_id": contract_id, "payload": payload, "published_at": time.time()} self.contracts[contract_id] = rec self._log_entry("publish_contract", contract_id, rec) return rec def get_contract(self, contract_id: str) -> Dict[str, Any] | None: return self.contracts.get(contract_id) # Delta sync and reconciliation log (deterministic replay when reconnecting) def log_delta(self, delta: Dict[str, Any], contract_id: str) -> Dict[str, Any]: timestamp = time.time() payload = { "delta": delta, "contract_id": contract_id, "timestamp": timestamp, } payload_bytes = json.dumps(payload, sort_keys=True).encode() signature = self.sign(payload_bytes).hex() entry = { "type": "PlanDelta", "payload": payload, "signature": signature, } self.entries.append(entry) return entry def _log_entry(self, action: str, key: str, payload: Any) -> None: ts = time.time() self.entries.append({"action": action, "key": key, "payload": payload, "ts": ts}) # Convenience helpers for tests / basic usage def as_json_registry(self) -> str: return json.dumps({ "adapters": self.adapters, "contracts": self.contracts, "entries": self.entries, }, default=str)