62 lines
1.5 KiB
Python
62 lines
1.5 KiB
Python
"""Simple data contracts used by NovaPlan MVP.
|
|
|
|
- PlanDelta: delta between local and global plans.
|
|
- SharedSchedule: aggregated schedule signals from agents.
|
|
- ResourceUsage: energy, time, or other resource consumptions.
|
|
- PrivacyBudget: basic DP-like budget for an agent (simulated).
|
|
- AuditLog: lightweight log entries for governance.
|
|
"""
|
|
from __future__ import annotations
|
|
from dataclasses import dataclass, asdict
|
|
from typing import Dict, Any, List
|
|
import json
|
|
|
|
@dataclass
|
|
class PlanDelta:
|
|
agent_id: str
|
|
delta: Dict[str, float]
|
|
timestamp: float
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(asdict(self))
|
|
|
|
@dataclass
|
|
class SharedSchedule:
|
|
schedule: Dict[str, Any]
|
|
timestamp: float
|
|
|
|
@dataclass
|
|
class ResourceUsage:
|
|
agent_id: str
|
|
resources: Dict[str, float]
|
|
timestamp: float
|
|
|
|
@dataclass
|
|
class PrivacyBudget:
|
|
agent_id: str
|
|
budget: float
|
|
timestamp: float
|
|
|
|
@dataclass
|
|
class AuditLog:
|
|
entry_id: str
|
|
message: str
|
|
timestamp: float
|
|
|
|
def serialize(obj: object) -> str:
|
|
if hasattr(obj, "__dict__"):
|
|
return json.dumps(obj.__dict__)
|
|
return json.dumps(obj)
|
|
|
|
# Lightweight contract registry for versioning and interoperability
|
|
class ContractRegistry:
|
|
_registry: Dict[str, int] = {}
|
|
|
|
@classmethod
|
|
def register(cls, name: str, version: int) -> None:
|
|
cls._registry[name] = int(version)
|
|
|
|
@classmethod
|
|
def version_of(cls, name: str, default: int | None = None) -> int | None:
|
|
return cls._registry.get(name, default)
|