nebulaforge-offline-resilie.../nebulaforge/catopt_bridge.py

146 lines
4.2 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any, Dict, List
import json
import time
from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
from cryptography.hazmat.primitives import serialization
# Basic IR blocks for cross-domain interoperability (CatOpt-inspired)
@dataclass
class FoundationModelBlock:
id: str
domain: str
model_type: str
device_targets: List[str]
max_energy: float
latency_budget: float
@dataclass
class PlanningPolicyBlock:
safety_budget: float
performance_budget: float
drift_tolerance: float
@dataclass
class LocalProblemBlock:
id: str
domain: str
description: str
version: int = 1
@dataclass
class SharedVariablesBlock:
variables: Dict[str, Any]
version: int = 1
@dataclass
class PlanDeltaBlock:
delta: Dict[str, Any]
timestamp: float
version: int = 1
@dataclass
class DualVariablesBlock:
lambda_energy: float
lambda_privacy: float
@dataclass
class PrivacyBudgetBlock:
lambda_privacy: float
lambda_energy: float = 0.0
@dataclass
class AuditLogBlock:
entry: str
signer: str
timestamp: float
contract_id: str
version: int = 1
def to_json_block(obj: Any) -> str:
if hasattr(obj, "__dict__"):
return json.dumps(asdict(obj), default=str)
return json.dumps(obj, default=str)
class GoCRegistry:
"""Lightweight in-memory registry for adapters and contracts with per-message metadata.
This is a production-like primitive to enable interoperability testing acrossCatOpt-like stacks.
It provides signing for messages to enable replay-protected logging.
"""
def __init__(self) -> None:
self._private_key = Ed25519PrivateKey.generate()
self.public_key = self._private_key.public_key()
self.adapters: Dict[str, Dict[str, Any]] = {}
self.contracts: Dict[str, Dict[str, Any]] = {}
self.entries: List[Dict[str, Any]] = []
def sign(self, message: bytes) -> bytes:
return self._private_key.sign(message)
def verify(self, message: bytes, signature: bytes) -> bool:
try:
self.public_key.verify(signature, message)
return True
except Exception:
return False
# Adapter management
def register_adapter(self, adapter_id: str, supported_domains: List[str]) -> Dict[str, Any]:
rec = {
"adapter_id": adapter_id,
"supported_domains": supported_domains,
"registered_at": time.time(),
"contract_version": 1,
}
self.adapters[adapter_id] = rec
self._log_entry("register_adapter", adapter_id, rec)
return rec
# Contracts management (local problem, variables, deltas, etc.)
def publish_contract(self, contract_id: str, payload: Dict[str, Any]) -> Dict[str, Any]:
rec = {"contract_id": contract_id, "payload": payload, "published_at": time.time()}
self.contracts[contract_id] = rec
self._log_entry("publish_contract", contract_id, rec)
return rec
def get_contract(self, contract_id: str) -> Dict[str, Any] | None:
return self.contracts.get(contract_id)
# Delta sync and reconciliation log (deterministic replay when reconnecting)
def log_delta(self, delta: Dict[str, Any], contract_id: str) -> Dict[str, Any]:
timestamp = time.time()
payload = {
"delta": delta,
"contract_id": contract_id,
"timestamp": timestamp,
}
payload_bytes = json.dumps(payload, sort_keys=True).encode()
signature = self.sign(payload_bytes).hex()
entry = {
"type": "PlanDelta",
"payload": payload,
"signature": signature,
}
self.entries.append(entry)
return entry
def _log_entry(self, action: str, key: str, payload: Any) -> None:
ts = time.time()
self.entries.append({"action": action, "key": key, "payload": payload, "ts": ts})
# Convenience helpers for tests / basic usage
def as_json_registry(self) -> str:
return json.dumps({
"adapters": self.adapters,
"contracts": self.contracts,
"entries": self.entries,
}, default=str)