"""Simple data contracts used by NovaPlan MVP. - PlanDelta: delta between local and global plans. - SharedSchedule: aggregated schedule signals from agents. - ResourceUsage: energy, time, or other resource consumptions. - PrivacyBudget: basic DP-like budget for an agent (simulated). - AuditLog: lightweight log entries for governance. """ from __future__ import annotations from dataclasses import dataclass, asdict from typing import Dict, Any, List import json @dataclass class PlanDelta: agent_id: str delta: Dict[str, float] timestamp: float # Optional fields to support CRDT-like, partition-tolerant merges parent_version: int | None = None sequence: int | None = None # Optional contract and signature fields to support Contract-as-Code (CaC) contract_id: str | None = None signature: str | None = None def to_json(self) -> str: return json.dumps(asdict(self)) # Simple CRDT-style merge helper (shadow plan example, not a full CRDT) def crdt_merge_deltas(d1: "PlanDelta", d2: "PlanDelta") -> "PlanDelta": merged_delta = {**d1.delta, **d2.delta} merged_agent = d2.agent_id if d2.agent_id else d1.agent_id merged_ts = max(d1.timestamp, d2.timestamp) merged_parent = None if d1.parent_version is not None or d2.parent_version is not None: v1 = d1.parent_version if d1.parent_version is not None else 0 v2 = d2.parent_version if d2.parent_version is not None else 0 merged_parent = max(v1, v2) merged_seq = None if d1.sequence is not None or d2.sequence is not None: s1 = d1.sequence if d1.sequence is not None else 0 s2 = d2.sequence if d2.sequence is not None else 0 merged_seq = max(s1, s2) return PlanDelta(agent_id=merged_agent, delta=merged_delta, timestamp=merged_ts, parent_version=merged_parent, sequence=merged_seq) @dataclass class SharedSchedule: schedule: Dict[str, Any] timestamp: float @dataclass class ResourceUsage: agent_id: str resources: Dict[str, float] timestamp: float @dataclass class PrivacyBudget: agent_id: str budget: float timestamp: float @dataclass class AuditLog: entry_id: str message: str timestamp: float def serialize(obj: object) -> str: if hasattr(obj, "__dict__"): return json.dumps(obj.__dict__) return json.dumps(obj) # Lightweight contract registry for versioning and interoperability class ContractRegistry: _registry: Dict[str, int] = {} _schemas: Dict[str, Dict[str, Dict[str, Any]]] = {} @classmethod def register(cls, name: str, version: int) -> None: cls._registry[name] = int(version) @classmethod def version_of(cls, name: str, default: int | None = None) -> int | None: return cls._registry.get(name, default) @classmethod def register_schema( cls, name: str, version: int, schema: Dict[str, Any], ) -> None: """Register a contract schema for a given contract name and version.""" cls.register(name, version) cls._schemas.setdefault(name, {})[str(version)] = schema @classmethod def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None: return cls._schemas.get(name, {}).get(str(version)) @classmethod def list_schemas(cls) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for name, versions in cls._schemas.items(): for ver, schema in versions.items(): results.append({"name": name, "version": int(ver), "schema": schema}) return results @staticmethod def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: """Minimal validation: check required keys and basic type hints if provided.""" required = set(schema.get("required", [])) # All required keys must be present in the data if not required.issubset(set(data.keys())): return False # Optional: validate simple types if provided types: Dict[str, type] = schema.get("types", {}) for key, typ in types.items(): if key in data and not isinstance(data[key], typ): return False return True # Auto-register core contracts for quick interoperability in MVP workflows. # This ensures a minimal, versioned contract surface is available as soon as # the module is imported, which benefits tooling and adapters that rely on # contract versioning without requiring explicit setup code in downstream # components. for _name, _ver, _schema in [ ("PlanDelta", 1, {"required": ["agent_id", "delta", "timestamp"], "types": {"agent_id": str, "delta": dict, "timestamp": (int, float)}}), ("SharedSchedule", 1, {"required": ["schedule", "timestamp"], "types": {"schedule": dict, "timestamp": (int, float)}}), ("ResourceUsage", 1, {"required": ["agent_id", "resources", "timestamp"], "types": {"agent_id": str, "resources": dict, "timestamp": (int, float)}}), ("PrivacyBudget", 1, {"required": ["agent_id", "budget", "timestamp"], "types": {"agent_id": str, "budget": (int, float), "timestamp": (int, float)}}), ("AuditLog", 1, {"required": ["entry_id", "message", "timestamp"], "types": {"entry_id": str, "message": str, "timestamp": (int, float)}}), ]: ContractRegistry.register_schema(_name, _ver, _schema) # Lightweight Adapter Registry (Graph-of-Contracts for adapters) class AdapterRegistry: """Minimal registry to track adapter versions and their schemas. This mirrors the contract registry pattern but for adapter software units (e.g., rover HabitatAdapter, etc.). It enables plugging in vendor adapters while keeping a versioned contract surface for interoperability tooling. """ _registry: Dict[str, int] = {} _schemas: Dict[str, Dict[str, Dict[str, Any]]] = {} @classmethod def register_adapter(cls, name: str, version: int) -> None: cls._registry[name] = int(version) @classmethod def version_of(cls, name: str, default: int | None = None) -> int | None: return cls._registry.get(name, default) @classmethod def register_schema( cls, name: str, version: int, schema: Dict[str, Any], ) -> None: cls.register_adapter(name, version) cls._schemas.setdefault(name, {})[str(version)] = schema @classmethod def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None: return cls._schemas.get(name, {}).get(str(version)) @classmethod def list_schemas(cls) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for name, versions in cls._schemas.items(): for ver, schema in versions.items(): results.append({"name": name, "version": int(ver), "schema": schema}) return results @staticmethod def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: required = set(schema.get("required", [])) if not required.issubset(set(data.keys())): return False types: Dict[str, type] = schema.get("types", {}) for key, typ in types.items(): if key in data and not isinstance(data[key], typ): return False return True # Pre-register a couple of MVP adapter schemas to illustrate interoperability. AdapterRegistry.register_schema( name="RoverAdapter", version=1, schema={ "required": ["adapter_id", "status"], "types": {"adapter_id": str, "status": dict}, }, ) # ---------------- CaC (Contract-as-Code) primitives ----------------- @dataclass class CaCContract: contract_id: str version: int content: Dict[str, Any] signature: str | None = None def to_json(self) -> str: return json.dumps({ "contract_id": self.contract_id, "version": self.version, "content": self.content, "signature": self.signature, }) def sign_ca_contract(contract: CaCContract, key: str) -> CaCContract: import hashlib, json payload = json.dumps(contract.content, sort_keys=True).encode() contract.signature = hashlib.sha256((key).encode() + payload).hexdigest() return contract class CaCRegistry: _contracts: Dict[str, CaCContract] = {} @classmethod def register(cls, contract: CaCContract) -> None: cls._contracts[contract.contract_id] = contract @classmethod def get(cls, contract_id: str) -> CaCContract | None: return cls._contracts.get(contract_id) AdapterRegistry.register_schema( name="HabitatAdapter", version=1, schema={ "required": ["module_id", "status"], "types": {"module_id": str, "status": dict}, }, )