novaplan-decentralized-priv.../nova_plan/contracts.py

341 lines
11 KiB
Python

"""NovaPlan Contracts (data contracts for MVP).
This module defines lightweight data contracts used by the NovaPlan MVP,
including PlanDelta, PrivacyBudget, AuditLog, and a few placeholder
structures that adapters and tests can rely on.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Dict, Any, Optional
import json
import hashlib
import time
# Lightweight in-memory signer registry for MVP provenance
class SignerStore:
"""In-memory signer key registry for MVP provenance."""
_keys: Dict[str, str] = {}
@classmethod
def register(cls, signer_id: str, key: str) -> None:
cls._keys[signer_id] = key
@classmethod
def get_key(cls, signer_id: str) -> Optional[str]:
return cls._keys.get(signer_id)
class GoCRegistry:
"""Graph-of-Contracts registry (in-memory MVP).
Stores signed CaC contracts and a delta history per contract to enable
provenance and deterministic replay in offline scenarios.
"""
_contracts: Dict[str, CaCContract] = {}
_signatures: Dict[str, str] = {}
_deltas: Dict[str, list] = {}
@classmethod
def register_signed_contract(cls, contract: CaCContract, signer: str, signature: str) -> None:
cls._contracts[contract.contract_id] = contract
cls._signatures[contract.contract_id] = signature
@classmethod
def get_signed_contract(cls, contract_id: str) -> Optional[Dict[str, Any]]:
c = cls._contracts.get(contract_id)
if not c:
return None
return {
"contract": c,
"signature": cls._signatures.get(contract_id),
}
@classmethod
def push_delta(cls, contract_id: str, delta: PlanDelta) -> None:
cls._deltas.setdefault(contract_id, []).append(delta)
@classmethod
def get_provenance(cls, contract_id: str) -> list:
return list(cls._deltas.get(contract_id, []))
def _delta_sign_digest(delta: PlanDelta, key: str) -> str:
payload = {
"agent_id": delta.agent_id,
"delta": delta.delta,
"timestamp": delta.timestamp,
"contract_id": delta.contract_id,
"parent_version": delta.parent_version,
"sequence": delta.sequence,
"nonce": delta.nonce,
}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
return hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest()
def sign_plan_delta(delta: PlanDelta, signer_id: str) -> PlanDelta:
"""Sign a PlanDelta using the signer's key from SignerStore.
This is a lightweight MVP signer; in a real system this would be a proper
crypto signature using a key management service.
"""
key = SignerStore.get_key(signer_id)
if not key:
# If no key registered, leave signature as None
delta.signature = None
return delta
delta.signature = _delta_sign_digest(delta, key)
return delta
def verify_plan_delta_signature(delta: PlanDelta) -> bool:
"""Verify the PlanDelta signature if signer key is known."""
if delta.signature is None:
return False
key = SignerStore.get_key(delta.agent_id)
if not key:
return False
expected = _delta_sign_digest(delta, key)
return delta.signature == expected
@dataclass
class PlanDelta:
"""Delta exchanged between agents describing local changes.
Attributes:
agent_id: Identifier of the originating agent.
delta: A dictionary representing changed local variables (name -> value).
timestamp: UNIX timestamp when the delta was created.
contract_id: Optional contract identifier for governance/traceability.
parent_version: Optional CRDT-style parent version (for merge semantics).
sequence: Optional sequence number for ordering.
signature: Optional signature or provenance tag for the delta.
"""
agent_id: str
delta: Dict[str, float] = field(default_factory=dict)
timestamp: float = field(default_factory=lambda: time.time())
contract_id: str = "default"
parent_version: Optional[int] = None
sequence: Optional[int] = None
nonce: Optional[int] = None
signature: Optional[str] = None
def to_json(self) -> str:
"""Serialize this PlanDelta to a deterministic JSON string.
This method is intentionally lightweight and stable for MVP tests
and external observers. The structure mirrors the dataclass fields
with a sorted JSON payload for replay and signing workflows.
"""
payload = {
"agent_id": self.agent_id,
"delta": self.delta,
"timestamp": self.timestamp,
"contract_id": self.contract_id,
"parent_version": self.parent_version,
"sequence": self.sequence,
"nonce": self.nonce,
"signature": self.signature,
}
return json.dumps(payload, sort_keys=True)
@dataclass
class CaCContract:
"""Contract-as-Code (CaC) minimal representation."""
contract_id: str
version: int
content: Dict[str, Any]
@dataclass
class SignedCaCContract:
"""Signed CaCContract wrapper."""
contract: CaCContract
signature: str
def to_json(self) -> str:
payload = {
"contract_id": self.contract.contract_id,
"version": self.contract.version,
"content": self.contract.content,
"signature": self.signature,
}
return json.dumps(payload)
class CaCRegistry:
"""In-memory registry for CaCContract instances."""
_store: Dict[str, CaCContract] = {}
@classmethod
def register(cls, contract: CaCContract) -> None:
cls._store[contract.contract_id] = contract
@classmethod
def get(cls, contract_id: str) -> Optional[CaCContract]:
return cls._store.get(contract_id)
def sign_ca_contract(contract: CaCContract, key: str) -> SignedCaCContract:
"""Very lightweight signing for MVP. Returns a SignedCaCContract."""
# Create a deterministic digest of contract data combined with a key.
payload = {
"contract_id": contract.contract_id,
"version": contract.version,
"content": contract.content,
}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
digest = hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest()
return SignedCaCContract(contract=contract, signature=digest)
def crdt_merge_deltas(d1: PlanDelta, d2: PlanDelta) -> PlanDelta:
"""Merge two PlanDelta objects with simple last-writer-wins semantics for keys."""
merged_delta = dict(d1.delta)
merged_delta.update(d2.delta)
# Timestamp goes to the latest
ts = max(d1.timestamp, d2.timestamp)
# Choose the second delta's identity as the merged agent/contract for simplicity
merged = PlanDelta(
agent_id=d2.agent_id,
delta=merged_delta,
timestamp=ts,
contract_id=d2.contract_id or d1.contract_id,
parent_version=None,
sequence=None,
signature=None,
)
return merged
class ContractRegistry:
"""Lightweight registry surface to satisfy tests.
Exposes a minimal API:
- register_schema(name, version, schema)
- get_schema(name, version)
- validate_against_schema(payload, schema)
"""
_schemas: Dict[tuple, Dict[str, Any]] = {
("PlanDelta", 1): {
"required": ["agent_id", "delta", "timestamp"],
"types": {
"agent_id": str,
"delta": dict,
"timestamp": (int, float),
},
}
}
@classmethod
def get_schema(cls, name: str, version: int) -> Optional[Dict[str, Any]]:
return cls._schemas.get((name, version))
@classmethod
def validate_against_schema(cls, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool:
# Basic required-field check
required = schema.get("required", [])
for field in required:
if field not in payload:
return False
# Type checking (best-effort)
types_map = schema.get("types", {})
for field, expected in types_map.items():
if field in payload:
val = payload[field]
if isinstance(expected, tuple):
if not isinstance(val, expected):
return False
else:
if not isinstance(val, expected):
return False
return True
# ---------------------------------------------------------------------------
# Simple API surface for Contract-as-Code (CaC) operations (MVP)
# ---------------------------------------------------------------------------
def register_contract(contract_id: str, version: int, content: Dict[str, Any], signer_id: Optional[str] = None) -> CaCContract:
"""Register a CaC contract and optionally sign and publish provenance.
If signer_id is provided and a matching key is registered in SignerStore,
this will also produce a signed contract artifact and register provenance
in the GoCRegistry for traceability.
"""
contract = CaCContract(contract_id=contract_id, version=version, content=content)
CaCRegistry.register(contract)
if signer_id is not None:
signed = sign_ca_contract(contract, signer_id)
GoCRegistry.register_signed_contract(contract, signer=signer_id, signature=signed.signature)
return contract
def push_signal(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta:
"""Sign and push a PlanDelta into the GoC provenance registry for a contract."""
signed = sign_plan_delta(delta, signer_id)
signed.contract_id = contract_id
GoCRegistry.push_delta(contract_id, signed)
return signed
def propose_delta(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta:
"""Convenience: sign a delta and push it for a contract (idempotent path)."""
signed = sign_plan_delta(delta, signer_id)
signed.contract_id = contract_id
GoCRegistry.push_delta(contract_id, signed)
return signed
def get_provenance(contract_id: str) -> list:
"""Return the provenance (delta history) for a given contract."""
return GoCRegistry.get_provenance(contract_id)
@dataclass
class PrivacyBudget:
"""Simple privacy budget block to accompany signals.
This is intentionally small for MVP purposes. It can carry information
about the remaining privacy budget for a stream and an expiry timestamp.
"""
signal: Dict[str, float] = field(default_factory=dict)
budget: float = 0.0
expiry: float | None = None
@dataclass
class AuditLog:
"""Auditable log entry for governance and provenance."""
entry: str
signer: str
timestamp: float = field(default_factory=lambda: time.time())
contract_id: str = "default"
@dataclass
class SharedSchedule:
"""Placeholder shared schedule object for MVP.
In a fuller implementation this would carry scheduling constraints and
execution windows shared across agents.
"""
schedule: Dict[str, Any] = field(default_factory=dict)
@dataclass
class ResourceUsage:
"""Lightweight resource usage record."""
resources: Dict[str, float] = field(default_factory=dict)
timestamp: float = field(default_factory=lambda: time.time())