"""NovaPlan Contracts (data contracts for MVP). This module defines lightweight data contracts used by the NovaPlan MVP, including PlanDelta, PrivacyBudget, AuditLog, and a few placeholder structures that adapters and tests can rely on. """ from __future__ import annotations from dataclasses import dataclass, field, asdict from typing import Dict, Any, Optional import json import hashlib import time # Lightweight in-memory signer registry for MVP provenance class SignerStore: """In-memory signer key registry for MVP provenance.""" _keys: Dict[str, str] = {} @classmethod def register(cls, signer_id: str, key: str) -> None: cls._keys[signer_id] = key @classmethod def get_key(cls, signer_id: str) -> Optional[str]: return cls._keys.get(signer_id) class GoCRegistry: """Graph-of-Contracts registry (in-memory MVP). Stores signed CaC contracts and a delta history per contract to enable provenance and deterministic replay in offline scenarios. """ _contracts: Dict[str, CaCContract] = {} _signatures: Dict[str, str] = {} _deltas: Dict[str, list] = {} @classmethod def register_signed_contract(cls, contract: CaCContract, signer: str, signature: str) -> None: cls._contracts[contract.contract_id] = contract cls._signatures[contract.contract_id] = signature @classmethod def get_signed_contract(cls, contract_id: str) -> Optional[Dict[str, Any]]: c = cls._contracts.get(contract_id) if not c: return None return { "contract": c, "signature": cls._signatures.get(contract_id), } @classmethod def push_delta(cls, contract_id: str, delta: PlanDelta) -> None: cls._deltas.setdefault(contract_id, []).append(delta) @classmethod def get_provenance(cls, contract_id: str) -> list: return list(cls._deltas.get(contract_id, [])) def _delta_sign_digest(delta: PlanDelta, key: str) -> str: payload = { "agent_id": delta.agent_id, "delta": delta.delta, "timestamp": delta.timestamp, "contract_id": delta.contract_id, "parent_version": delta.parent_version, "sequence": delta.sequence, "nonce": delta.nonce, } payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8") return hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest() def sign_plan_delta(delta: PlanDelta, signer_id: str) -> PlanDelta: """Sign a PlanDelta using the signer's key from SignerStore. This is a lightweight MVP signer; in a real system this would be a proper crypto signature using a key management service. """ key = SignerStore.get_key(signer_id) if not key: # If no key registered, leave signature as None delta.signature = None return delta delta.signature = _delta_sign_digest(delta, key) return delta def verify_plan_delta_signature(delta: PlanDelta) -> bool: """Verify the PlanDelta signature if signer key is known.""" if delta.signature is None: return False key = SignerStore.get_key(delta.agent_id) if not key: return False expected = _delta_sign_digest(delta, key) return delta.signature == expected @dataclass class PlanDelta: """Delta exchanged between agents describing local changes. Attributes: agent_id: Identifier of the originating agent. delta: A dictionary representing changed local variables (name -> value). timestamp: UNIX timestamp when the delta was created. contract_id: Optional contract identifier for governance/traceability. parent_version: Optional CRDT-style parent version (for merge semantics). sequence: Optional sequence number for ordering. signature: Optional signature or provenance tag for the delta. """ agent_id: str delta: Dict[str, float] = field(default_factory=dict) timestamp: float = field(default_factory=lambda: time.time()) contract_id: str = "default" parent_version: Optional[int] = None sequence: Optional[int] = None nonce: Optional[int] = None signature: Optional[str] = None def to_json(self) -> str: """Serialize this PlanDelta to a deterministic JSON string. This method is intentionally lightweight and stable for MVP tests and external observers. The structure mirrors the dataclass fields with a sorted JSON payload for replay and signing workflows. """ payload = { "agent_id": self.agent_id, "delta": self.delta, "timestamp": self.timestamp, "contract_id": self.contract_id, "parent_version": self.parent_version, "sequence": self.sequence, "nonce": self.nonce, "signature": self.signature, } return json.dumps(payload, sort_keys=True) @dataclass class CaCContract: """Contract-as-Code (CaC) minimal representation.""" contract_id: str version: int content: Dict[str, Any] @dataclass class SignedCaCContract: """Signed CaCContract wrapper.""" contract: CaCContract signature: str def to_json(self) -> str: payload = { "contract_id": self.contract.contract_id, "version": self.contract.version, "content": self.contract.content, "signature": self.signature, } return json.dumps(payload) class CaCRegistry: """In-memory registry for CaCContract instances.""" _store: Dict[str, CaCContract] = {} @classmethod def register(cls, contract: CaCContract) -> None: cls._store[contract.contract_id] = contract @classmethod def get(cls, contract_id: str) -> Optional[CaCContract]: return cls._store.get(contract_id) def sign_ca_contract(contract: CaCContract, key: str) -> SignedCaCContract: """Very lightweight signing for MVP. Returns a SignedCaCContract.""" # Create a deterministic digest of contract data combined with a key. payload = { "contract_id": contract.contract_id, "version": contract.version, "content": contract.content, } payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8") digest = hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest() return SignedCaCContract(contract=contract, signature=digest) def crdt_merge_deltas(d1: PlanDelta, d2: PlanDelta) -> PlanDelta: """Merge two PlanDelta objects with simple last-writer-wins semantics for keys.""" merged_delta = dict(d1.delta) merged_delta.update(d2.delta) # Timestamp goes to the latest ts = max(d1.timestamp, d2.timestamp) # Choose the second delta's identity as the merged agent/contract for simplicity merged = PlanDelta( agent_id=d2.agent_id, delta=merged_delta, timestamp=ts, contract_id=d2.contract_id or d1.contract_id, parent_version=None, sequence=None, signature=None, ) return merged class ContractRegistry: """Lightweight registry surface to satisfy tests. Exposes a minimal API: - register_schema(name, version, schema) - get_schema(name, version) - validate_against_schema(payload, schema) """ _schemas: Dict[tuple, Dict[str, Any]] = { ("PlanDelta", 1): { "required": ["agent_id", "delta", "timestamp"], "types": { "agent_id": str, "delta": dict, "timestamp": (int, float), }, } } @classmethod def get_schema(cls, name: str, version: int) -> Optional[Dict[str, Any]]: return cls._schemas.get((name, version)) @classmethod def validate_against_schema(cls, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool: # Basic required-field check required = schema.get("required", []) for field in required: if field not in payload: return False # Type checking (best-effort) types_map = schema.get("types", {}) for field, expected in types_map.items(): if field in payload: val = payload[field] if isinstance(expected, tuple): if not isinstance(val, expected): return False else: if not isinstance(val, expected): return False return True # --------------------------------------------------------------------------- # Simple API surface for Contract-as-Code (CaC) operations (MVP) # --------------------------------------------------------------------------- def register_contract(contract_id: str, version: int, content: Dict[str, Any], signer_id: Optional[str] = None) -> CaCContract: """Register a CaC contract and optionally sign and publish provenance. If signer_id is provided and a matching key is registered in SignerStore, this will also produce a signed contract artifact and register provenance in the GoCRegistry for traceability. """ contract = CaCContract(contract_id=contract_id, version=version, content=content) CaCRegistry.register(contract) if signer_id is not None: signed = sign_ca_contract(contract, signer_id) GoCRegistry.register_signed_contract(contract, signer=signer_id, signature=signed.signature) return contract def push_signal(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta: """Sign and push a PlanDelta into the GoC provenance registry for a contract.""" signed = sign_plan_delta(delta, signer_id) signed.contract_id = contract_id GoCRegistry.push_delta(contract_id, signed) return signed def propose_delta(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta: """Convenience: sign a delta and push it for a contract (idempotent path).""" signed = sign_plan_delta(delta, signer_id) signed.contract_id = contract_id GoCRegistry.push_delta(contract_id, signed) return signed def get_provenance(contract_id: str) -> list: """Return the provenance (delta history) for a given contract.""" return GoCRegistry.get_provenance(contract_id) @dataclass class PrivacyBudget: """Simple privacy budget block to accompany signals. This is intentionally small for MVP purposes. It can carry information about the remaining privacy budget for a stream and an expiry timestamp. """ signal: Dict[str, float] = field(default_factory=dict) budget: float = 0.0 expiry: float | None = None @dataclass class AuditLog: """Auditable log entry for governance and provenance.""" entry: str signer: str timestamp: float = field(default_factory=lambda: time.time()) contract_id: str = "default" @dataclass class SharedSchedule: """Placeholder shared schedule object for MVP. In a fuller implementation this would carry scheduling constraints and execution windows shared across agents. """ schedule: Dict[str, Any] = field(default_factory=dict) @dataclass class ResourceUsage: """Lightweight resource usage record.""" resources: Dict[str, float] = field(default_factory=dict) timestamp: float = field(default_factory=lambda: time.time()) @dataclass class SharedVariables: """Canonical container for signals shared among agents. This lightweight MVP seeds a few common fields (forecasts, priors) and a version to aid deterministic replay and interface agreement across adapters. """ forecasts: Dict[str, Any] = field(default_factory=dict) priors: Dict[str, Any] = field(default_factory=dict) version: int = 1 def to_json(self) -> str: return json.dumps(asdict(self), sort_keys=True) @dataclass class DualVariables: """MVP representation of dual variables (Lagrange multipliers) for ADMM-like updates.""" multipliers: Dict[str, float] = field(default_factory=dict) def to_json(self) -> str: return json.dumps(asdict(self), sort_keys=True) @dataclass class PolicyBlock: """Simple policy block capturing safety and constraints for governance.""" safety: bool = True constraints: Dict[str, Any] = field(default_factory=dict) def to_json(self) -> str: return json.dumps(asdict(self), sort_keys=True)