146 lines
4.2 KiB
Python
146 lines
4.2 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Any, Dict, List
|
|
import json
|
|
import time
|
|
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
|
|
from cryptography.hazmat.primitives import serialization
|
|
|
|
|
|
# Basic IR blocks for cross-domain interoperability (CatOpt-inspired)
|
|
|
|
@dataclass
|
|
class FoundationModelBlock:
|
|
id: str
|
|
domain: str
|
|
model_type: str
|
|
device_targets: List[str]
|
|
max_energy: float
|
|
latency_budget: float
|
|
|
|
@dataclass
|
|
class PlanningPolicyBlock:
|
|
safety_budget: float
|
|
performance_budget: float
|
|
drift_tolerance: float
|
|
|
|
@dataclass
|
|
class LocalProblemBlock:
|
|
id: str
|
|
domain: str
|
|
description: str
|
|
version: int = 1
|
|
|
|
@dataclass
|
|
class SharedVariablesBlock:
|
|
variables: Dict[str, Any]
|
|
version: int = 1
|
|
|
|
@dataclass
|
|
class PlanDeltaBlock:
|
|
delta: Dict[str, Any]
|
|
timestamp: float
|
|
version: int = 1
|
|
|
|
@dataclass
|
|
class DualVariablesBlock:
|
|
lambda_energy: float
|
|
lambda_privacy: float
|
|
|
|
@dataclass
|
|
class PrivacyBudgetBlock:
|
|
lambda_privacy: float
|
|
lambda_energy: float = 0.0
|
|
|
|
@dataclass
|
|
class AuditLogBlock:
|
|
entry: str
|
|
signer: str
|
|
timestamp: float
|
|
contract_id: str
|
|
version: int = 1
|
|
|
|
|
|
def to_json_block(obj: Any) -> str:
|
|
if hasattr(obj, "__dict__"):
|
|
return json.dumps(asdict(obj), default=str)
|
|
return json.dumps(obj, default=str)
|
|
|
|
|
|
class GoCRegistry:
|
|
"""Lightweight in-memory registry for adapters and contracts with per-message metadata.
|
|
|
|
This is a production-like primitive to enable interoperability testing acrossCatOpt-like stacks.
|
|
It provides signing for messages to enable replay-protected logging.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._private_key = Ed25519PrivateKey.generate()
|
|
self.public_key = self._private_key.public_key()
|
|
self.adapters: Dict[str, Dict[str, Any]] = {}
|
|
self.contracts: Dict[str, Dict[str, Any]] = {}
|
|
self.entries: List[Dict[str, Any]] = []
|
|
|
|
def sign(self, message: bytes) -> bytes:
|
|
return self._private_key.sign(message)
|
|
|
|
def verify(self, message: bytes, signature: bytes) -> bool:
|
|
try:
|
|
self.public_key.verify(signature, message)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
# Adapter management
|
|
def register_adapter(self, adapter_id: str, supported_domains: List[str]) -> Dict[str, Any]:
|
|
rec = {
|
|
"adapter_id": adapter_id,
|
|
"supported_domains": supported_domains,
|
|
"registered_at": time.time(),
|
|
"contract_version": 1,
|
|
}
|
|
self.adapters[adapter_id] = rec
|
|
self._log_entry("register_adapter", adapter_id, rec)
|
|
return rec
|
|
|
|
# Contracts management (local problem, variables, deltas, etc.)
|
|
def publish_contract(self, contract_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
|
|
rec = {"contract_id": contract_id, "payload": payload, "published_at": time.time()}
|
|
self.contracts[contract_id] = rec
|
|
self._log_entry("publish_contract", contract_id, rec)
|
|
return rec
|
|
|
|
def get_contract(self, contract_id: str) -> Dict[str, Any] | None:
|
|
return self.contracts.get(contract_id)
|
|
|
|
# Delta sync and reconciliation log (deterministic replay when reconnecting)
|
|
def log_delta(self, delta: Dict[str, Any], contract_id: str) -> Dict[str, Any]:
|
|
timestamp = time.time()
|
|
payload = {
|
|
"delta": delta,
|
|
"contract_id": contract_id,
|
|
"timestamp": timestamp,
|
|
}
|
|
payload_bytes = json.dumps(payload, sort_keys=True).encode()
|
|
signature = self.sign(payload_bytes).hex()
|
|
entry = {
|
|
"type": "PlanDelta",
|
|
"payload": payload,
|
|
"signature": signature,
|
|
}
|
|
self.entries.append(entry)
|
|
return entry
|
|
|
|
def _log_entry(self, action: str, key: str, payload: Any) -> None:
|
|
ts = time.time()
|
|
self.entries.append({"action": action, "key": key, "payload": payload, "ts": ts})
|
|
|
|
# Convenience helpers for tests / basic usage
|
|
def as_json_registry(self) -> str:
|
|
return json.dumps({
|
|
"adapters": self.adapters,
|
|
"contracts": self.contracts,
|
|
"entries": self.entries,
|
|
}, default=str)
|