"""Simple data contracts used by NovaPlan MVP. - PlanDelta: delta between local and global plans. - SharedSchedule: aggregated schedule signals from agents. - ResourceUsage: energy, time, or other resource consumptions. - PrivacyBudget: basic DP-like budget for an agent (simulated). - AuditLog: lightweight log entries for governance. """ from __future__ import annotations from dataclasses import dataclass, asdict from typing import Dict, Any, List import json @dataclass class PlanDelta: agent_id: str delta: Dict[str, float] timestamp: float def to_json(self) -> str: return json.dumps(asdict(self)) @dataclass class SharedSchedule: schedule: Dict[str, Any] timestamp: float @dataclass class ResourceUsage: agent_id: str resources: Dict[str, float] timestamp: float @dataclass class PrivacyBudget: agent_id: str budget: float timestamp: float @dataclass class AuditLog: entry_id: str message: str timestamp: float def serialize(obj: object) -> str: if hasattr(obj, "__dict__"): return json.dumps(obj.__dict__) return json.dumps(obj) # Lightweight contract registry for versioning and interoperability class ContractRegistry: _registry: Dict[str, int] = {} _schemas: Dict[str, Dict[str, Dict[str, Any]]] = {} @classmethod def register(cls, name: str, version: int) -> None: cls._registry[name] = int(version) @classmethod def version_of(cls, name: str, default: int | None = None) -> int | None: return cls._registry.get(name, default) @classmethod def register_schema( cls, name: str, version: int, schema: Dict[str, Any], ) -> None: """Register a contract schema for a given contract name and version.""" cls.register(name, version) cls._schemas.setdefault(name, {})[str(version)] = schema @classmethod def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None: return cls._schemas.get(name, {}).get(str(version)) @classmethod def list_schemas(cls) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for name, versions in cls._schemas.items(): for ver, schema in versions.items(): results.append({"name": name, "version": int(ver), "schema": schema}) return results @staticmethod def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: """Minimal validation: check required keys and basic type hints if provided.""" required = set(schema.get("required", [])) # All required keys must be present in the data if not required.issubset(set(data.keys())): return False # Optional: validate simple types if provided types: Dict[str, type] = schema.get("types", {}) for key, typ in types.items(): if key in data and not isinstance(data[key], typ): return False return True # Auto-register core contracts for quick interoperability in MVP workflows. # This ensures a minimal, versioned contract surface is available as soon as # the module is imported, which benefits tooling and adapters that rely on # contract versioning without requiring explicit setup code in downstream # components. for _name, _ver, _schema in [ ("PlanDelta", 1, {"required": ["agent_id", "delta", "timestamp"], "types": {"agent_id": str, "delta": dict, "timestamp": (int, float)}}), ("SharedSchedule", 1, {"required": ["schedule", "timestamp"], "types": {"schedule": dict, "timestamp": (int, float)}}), ("ResourceUsage", 1, {"required": ["agent_id", "resources", "timestamp"], "types": {"agent_id": str, "resources": dict, "timestamp": (int, float)}}), ("PrivacyBudget", 1, {"required": ["agent_id", "budget", "timestamp"], "types": {"agent_id": str, "budget": (int, float), "timestamp": (int, float)}}), ("AuditLog", 1, {"required": ["entry_id", "message", "timestamp"], "types": {"entry_id": str, "message": str, "timestamp": (int, float)}}), ]: ContractRegistry.register_schema(_name, _ver, _schema) # Lightweight Adapter Registry (Graph-of-Contracts for adapters) class AdapterRegistry: """Minimal registry to track adapter versions and their schemas. This mirrors the contract registry pattern but for adapter software units (e.g., rover HabitatAdapter, etc.). It enables plugging in vendor adapters while keeping a versioned contract surface for interoperability tooling. """ _registry: Dict[str, int] = {} _schemas: Dict[str, Dict[str, Dict[str, Any]]] = {} @classmethod def register_adapter(cls, name: str, version: int) -> None: cls._registry[name] = int(version) @classmethod def version_of(cls, name: str, default: int | None = None) -> int | None: return cls._registry.get(name, default) @classmethod def register_schema( cls, name: str, version: int, schema: Dict[str, Any], ) -> None: cls.register_adapter(name, version) cls._schemas.setdefault(name, {})[str(version)] = schema @classmethod def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None: return cls._schemas.get(name, {}).get(str(version)) @classmethod def list_schemas(cls) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for name, versions in cls._schemas.items(): for ver, schema in versions.items(): results.append({"name": name, "version": int(ver), "schema": schema}) return results @staticmethod def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: required = set(schema.get("required", [])) if not required.issubset(set(data.keys())): return False types: Dict[str, type] = schema.get("types", {}) for key, typ in types.items(): if key in data and not isinstance(data[key], typ): return False return True # Pre-register a couple of MVP adapter schemas to illustrate interoperability. AdapterRegistry.register_schema( name="RoverAdapter", version=1, schema={ "required": ["adapter_id", "status"], "types": {"adapter_id": str, "status": dict}, }, ) AdapterRegistry.register_schema( name="HabitatAdapter", version=1, schema={ "required": ["module_id", "status"], "types": {"module_id": str, "status": dict}, }, )