novaplan-decentralized-priv.../nova_plan/contracts.py

114 lines
4.0 KiB
Python

"""Simple data contracts used by NovaPlan MVP.
- PlanDelta: delta between local and global plans.
- SharedSchedule: aggregated schedule signals from agents.
- ResourceUsage: energy, time, or other resource consumptions.
- PrivacyBudget: basic DP-like budget for an agent (simulated).
- AuditLog: lightweight log entries for governance.
"""
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, List
import json
@dataclass
class PlanDelta:
agent_id: str
delta: Dict[str, float]
timestamp: float
def to_json(self) -> str:
return json.dumps(asdict(self))
@dataclass
class SharedSchedule:
schedule: Dict[str, Any]
timestamp: float
@dataclass
class ResourceUsage:
agent_id: str
resources: Dict[str, float]
timestamp: float
@dataclass
class PrivacyBudget:
agent_id: str
budget: float
timestamp: float
@dataclass
class AuditLog:
entry_id: str
message: str
timestamp: float
def serialize(obj: object) -> str:
if hasattr(obj, "__dict__"):
return json.dumps(obj.__dict__)
return json.dumps(obj)
# Lightweight contract registry for versioning and interoperability
class ContractRegistry:
_registry: Dict[str, int] = {}
_schemas: Dict[str, Dict[str, Dict[str, Any]]] = {}
@classmethod
def register(cls, name: str, version: int) -> None:
cls._registry[name] = int(version)
@classmethod
def version_of(cls, name: str, default: int | None = None) -> int | None:
return cls._registry.get(name, default)
@classmethod
def register_schema(
cls,
name: str,
version: int,
schema: Dict[str, Any],
) -> None:
"""Register a contract schema for a given contract name and version."""
cls.register(name, version)
cls._schemas.setdefault(name, {})[str(version)] = schema
@classmethod
def get_schema(cls, name: str, version: int) -> Dict[str, Any] | None:
return cls._schemas.get(name, {}).get(str(version))
@classmethod
def list_schemas(cls) -> List[Dict[str, Any]]:
results: List[Dict[str, Any]] = []
for name, versions in cls._schemas.items():
for ver, schema in versions.items():
results.append({"name": name, "version": int(ver), "schema": schema})
return results
@staticmethod
def validate_against_schema(data: Dict[str, Any], schema: Dict[str, Any]) -> bool:
"""Minimal validation: check required keys and basic type hints if provided."""
required = set(schema.get("required", []))
# All required keys must be present in the data
if not required.issubset(set(data.keys())):
return False
# Optional: validate simple types if provided
types: Dict[str, type] = schema.get("types", {})
for key, typ in types.items():
if key in data and not isinstance(data[key], typ):
return False
return True
# Auto-register core contracts for quick interoperability in MVP workflows.
# This ensures a minimal, versioned contract surface is available as soon as
# the module is imported, which benefits tooling and adapters that rely on
# contract versioning without requiring explicit setup code in downstream
# components.
for _name, _ver, _schema in [
("PlanDelta", 1, {"required": ["agent_id", "delta", "timestamp"], "types": {"agent_id": str, "delta": dict, "timestamp": (int, float)}}),
("SharedSchedule", 1, {"required": ["schedule", "timestamp"], "types": {"schedule": dict, "timestamp": (int, float)}}),
("ResourceUsage", 1, {"required": ["agent_id", "resources", "timestamp"], "types": {"agent_id": str, "resources": dict, "timestamp": (int, float)}}),
("PrivacyBudget", 1, {"required": ["agent_id", "budget", "timestamp"], "types": {"agent_id": str, "budget": (int, float), "timestamp": (int, float)}}),
("AuditLog", 1, {"required": ["entry_id", "message", "timestamp"], "types": {"entry_id": str, "message": str, "timestamp": (int, float)}}),
]:
ContractRegistry.register_schema(_name, _ver, _schema)