"""Simple data contracts used by NovaPlan MVP. - PlanDelta: delta between local and global plans. - SharedSchedule: aggregated schedule signals from agents. - ResourceUsage: energy, time, or other resource consumptions. - PrivacyBudget: basic DP-like budget for an agent (simulated). - AuditLog: lightweight log entries for governance. """ from __future__ import annotations from dataclasses import dataclass, asdict from typing import Dict, Any, List import json @dataclass class PlanDelta: agent_id: str delta: Dict[str, float] timestamp: float def to_json(self) -> str: return json.dumps(asdict(self)) @dataclass class SharedSchedule: schedule: Dict[str, Any] timestamp: float @dataclass class ResourceUsage: agent_id: str resources: Dict[str, float] timestamp: float @dataclass class PrivacyBudget: agent_id: str budget: float timestamp: float @dataclass class AuditLog: entry_id: str message: str timestamp: float def serialize(obj: object) -> str: if hasattr(obj, "__dict__"): return json.dumps(obj.__dict__) return json.dumps(obj) # Lightweight contract registry for versioning and interoperability class ContractRegistry: _registry: Dict[str, int] = {} _schemas: Dict[str, Dict[str, Dict[str, Any]]] = {} @classmethod def register(cls, name: str, version: int) -> None: cls._registry[name] = int(version) @classmethod def version_of(cls, name: str, default: int | None = None) -> int | None: return cls._registry.get(name, default) @classmethod def register_schema( cls, name: str, version: int, schema: Dict[str, Any], ) -> None: """Register a contract schema for a given contract name and version.""" cls.register(name, version) cls._schemas.setdefault(name, {})[str(version)] = schema @classmethod def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None: return cls._schemas.get(name, {}).get(str(version)) @classmethod def list_schemas(cls) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] for name, versions in cls._schemas.items(): for ver, schema in versions.items(): results.append({"name": name, "version": int(ver), "schema": schema}) return results @staticmethod def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool: """Minimal validation: check required keys and basic type hints if provided.""" required = set(schema.get("required", [])) # All required keys must be present in the data if not required.issubset(set(data.keys())): return False # Optional: validate simple types if provided types: Dict[str, type] = schema.get("types", {}) for key, typ in types.items(): if key in data and not isinstance(data[key], typ): return False return True # Auto-register core contracts for quick interoperability in MVP workflows. # This ensures a minimal, versioned contract surface is available as soon as # the module is imported, which benefits tooling and adapters that rely on # contract versioning without requiring explicit setup code in downstream # components. for _name, _ver, _schema in [ ("PlanDelta", 1, {"required": ["agent_id", "delta", "timestamp"], "types": {"agent_id": str, "delta": dict, "timestamp": (int, float)}}), ("SharedSchedule", 1, {"required": ["schedule", "timestamp"], "types": {"schedule": dict, "timestamp": (int, float)}}), ("ResourceUsage", 1, {"required": ["agent_id", "resources", "timestamp"], "types": {"agent_id": str, "resources": dict, "timestamp": (int, float)}}), ("PrivacyBudget", 1, {"required": ["agent_id", "budget", "timestamp"], "types": {"agent_id": str, "budget": (int, float), "timestamp": (int, float)}}), ("AuditLog", 1, {"required": ["entry_id", "message", "timestamp"], "types": {"entry_id": str, "message": str, "timestamp": (int, float)}}), ]: ContractRegistry.register_schema(_name, _ver, _schema)