"""NovaPlan Contracts (data contracts for MVP). This module defines lightweight data contracts used by the NovaPlan MVP, including PlanDelta, PrivacyBudget, AuditLog, and a few placeholder structures that adapters and tests can rely on. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, Any, Optional import json import hashlib import time @dataclass class PlanDelta: """Delta exchanged between agents describing local changes. Attributes: agent_id: Identifier of the originating agent. delta: A dictionary representing changed local variables (name -> value). timestamp: UNIX timestamp when the delta was created. contract_id: Optional contract identifier for governance/traceability. parent_version: Optional CRDT-style parent version (for merge semantics). sequence: Optional sequence number for ordering. signature: Optional signature or provenance tag for the delta. """ agent_id: str delta: Dict[str, float] = field(default_factory=dict) timestamp: float = field(default_factory=lambda: time.time()) contract_id: str = "default" parent_version: Optional[int] = None sequence: Optional[int] = None signature: Optional[str] = None def to_json(self) -> str: """Serialize this PlanDelta to a deterministic JSON string. This method is intentionally lightweight and stable for MVP tests and external observers. The structure mirrors the dataclass fields with a sorted JSON payload for replay and signing workflows. """ payload = { "agent_id": self.agent_id, "delta": self.delta, "timestamp": self.timestamp, "contract_id": self.contract_id, "parent_version": self.parent_version, "sequence": self.sequence, "signature": self.signature, } return json.dumps(payload, sort_keys=True) @dataclass class CaCContract: """Contract-as-Code (CaC) minimal representation.""" contract_id: str version: int content: Dict[str, Any] @dataclass class SignedCaCContract: """Signed CaCContract wrapper.""" contract: CaCContract signature: str def to_json(self) -> str: payload = { "contract_id": self.contract.contract_id, "version": self.contract.version, "content": self.contract.content, "signature": self.signature, } return json.dumps(payload) class CaCRegistry: """In-memory registry for CaCContract instances.""" _store: Dict[str, CaCContract] = {} @classmethod def register(cls, contract: CaCContract) -> None: cls._store[contract.contract_id] = contract @classmethod def get(cls, contract_id: str) -> Optional[CaCContract]: return cls._store.get(contract_id) def sign_ca_contract(contract: CaCContract, key: str) -> SignedCaCContract: """Very lightweight signing for MVP. Returns a SignedCaCContract.""" # Create a deterministic digest of contract data combined with a key. payload = { "contract_id": contract.contract_id, "version": contract.version, "content": contract.content, } payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8") digest = hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest() return SignedCaCContract(contract=contract, signature=digest) def crdt_merge_deltas(d1: PlanDelta, d2: PlanDelta) -> PlanDelta: """Merge two PlanDelta objects with simple last-writer-wins semantics for keys.""" merged_delta = dict(d1.delta) merged_delta.update(d2.delta) # Timestamp goes to the latest ts = max(d1.timestamp, d2.timestamp) # Choose the second delta's identity as the merged agent/contract for simplicity merged = PlanDelta( agent_id=d2.agent_id, delta=merged_delta, timestamp=ts, contract_id=d2.contract_id or d1.contract_id, parent_version=None, sequence=None, signature=None, ) return merged class ContractRegistry: """Lightweight registry surface to satisfy tests. Exposes a minimal API: - register_schema(name, version, schema) - get_schema(name, version) - validate_against_schema(payload, schema) """ _schemas: Dict[tuple, Dict[str, Any]] = { ("PlanDelta", 1): { "required": ["agent_id", "delta", "timestamp"], "types": { "agent_id": str, "delta": dict, "timestamp": (int, float), }, } } @classmethod def get_schema(cls, name: str, version: int) -> Optional[Dict[str, Any]]: return cls._schemas.get((name, version)) @classmethod def validate_against_schema(cls, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool: # Basic required-field check required = schema.get("required", []) for field in required: if field not in payload: return False # Type checking (best-effort) types_map = schema.get("types", {}) for field, expected in types_map.items(): if field in payload: val = payload[field] if isinstance(expected, tuple): if not isinstance(val, expected): return False else: if not isinstance(val, expected): return False return True @dataclass class PrivacyBudget: """Simple privacy budget block to accompany signals. This is intentionally small for MVP purposes. It can carry information about the remaining privacy budget for a stream and an expiry timestamp. """ signal: Dict[str, float] = field(default_factory=dict) budget: float = 0.0 expiry: float | None = None @dataclass class AuditLog: """Auditable log entry for governance and provenance.""" entry: str signer: str timestamp: float = field(default_factory=lambda: time.time()) contract_id: str = "default" @dataclass class SharedSchedule: """Placeholder shared schedule object for MVP. In a fuller implementation this would carry scheduling constraints and execution windows shared across agents. """ schedule: Dict[str, Any] = field(default_factory=dict) @dataclass class ResourceUsage: """Lightweight resource usage record.""" resources: Dict[str, float] = field(default_factory=dict) timestamp: float = field(default_factory=lambda: time.time())