341 lines
11 KiB
Python
341 lines
11 KiB
Python
"""NovaPlan Contracts (data contracts for MVP).
|
|
|
|
This module defines lightweight data contracts used by the NovaPlan MVP,
|
|
including PlanDelta, PrivacyBudget, AuditLog, and a few placeholder
|
|
structures that adapters and tests can rely on.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import Dict, Any, Optional
|
|
import json
|
|
import hashlib
|
|
import time
|
|
|
|
# Lightweight in-memory signer registry for MVP provenance
|
|
class SignerStore:
|
|
"""In-memory signer key registry for MVP provenance."""
|
|
_keys: Dict[str, str] = {}
|
|
|
|
@classmethod
|
|
def register(cls, signer_id: str, key: str) -> None:
|
|
cls._keys[signer_id] = key
|
|
|
|
@classmethod
|
|
def get_key(cls, signer_id: str) -> Optional[str]:
|
|
return cls._keys.get(signer_id)
|
|
|
|
|
|
class GoCRegistry:
|
|
"""Graph-of-Contracts registry (in-memory MVP).
|
|
|
|
Stores signed CaC contracts and a delta history per contract to enable
|
|
provenance and deterministic replay in offline scenarios.
|
|
"""
|
|
_contracts: Dict[str, CaCContract] = {}
|
|
_signatures: Dict[str, str] = {}
|
|
_deltas: Dict[str, list] = {}
|
|
|
|
@classmethod
|
|
def register_signed_contract(cls, contract: CaCContract, signer: str, signature: str) -> None:
|
|
cls._contracts[contract.contract_id] = contract
|
|
cls._signatures[contract.contract_id] = signature
|
|
|
|
@classmethod
|
|
def get_signed_contract(cls, contract_id: str) -> Optional[Dict[str, Any]]:
|
|
c = cls._contracts.get(contract_id)
|
|
if not c:
|
|
return None
|
|
return {
|
|
"contract": c,
|
|
"signature": cls._signatures.get(contract_id),
|
|
}
|
|
|
|
@classmethod
|
|
def push_delta(cls, contract_id: str, delta: PlanDelta) -> None:
|
|
cls._deltas.setdefault(contract_id, []).append(delta)
|
|
|
|
@classmethod
|
|
def get_provenance(cls, contract_id: str) -> list:
|
|
return list(cls._deltas.get(contract_id, []))
|
|
|
|
|
|
def _delta_sign_digest(delta: PlanDelta, key: str) -> str:
|
|
payload = {
|
|
"agent_id": delta.agent_id,
|
|
"delta": delta.delta,
|
|
"timestamp": delta.timestamp,
|
|
"contract_id": delta.contract_id,
|
|
"parent_version": delta.parent_version,
|
|
"sequence": delta.sequence,
|
|
"nonce": delta.nonce,
|
|
}
|
|
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
|
|
return hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def sign_plan_delta(delta: PlanDelta, signer_id: str) -> PlanDelta:
|
|
"""Sign a PlanDelta using the signer's key from SignerStore.
|
|
|
|
This is a lightweight MVP signer; in a real system this would be a proper
|
|
crypto signature using a key management service.
|
|
"""
|
|
key = SignerStore.get_key(signer_id)
|
|
if not key:
|
|
# If no key registered, leave signature as None
|
|
delta.signature = None
|
|
return delta
|
|
delta.signature = _delta_sign_digest(delta, key)
|
|
return delta
|
|
|
|
|
|
def verify_plan_delta_signature(delta: PlanDelta) -> bool:
|
|
"""Verify the PlanDelta signature if signer key is known."""
|
|
if delta.signature is None:
|
|
return False
|
|
key = SignerStore.get_key(delta.agent_id)
|
|
if not key:
|
|
return False
|
|
expected = _delta_sign_digest(delta, key)
|
|
return delta.signature == expected
|
|
|
|
|
|
@dataclass
|
|
class PlanDelta:
|
|
"""Delta exchanged between agents describing local changes.
|
|
|
|
Attributes:
|
|
agent_id: Identifier of the originating agent.
|
|
delta: A dictionary representing changed local variables (name -> value).
|
|
timestamp: UNIX timestamp when the delta was created.
|
|
contract_id: Optional contract identifier for governance/traceability.
|
|
parent_version: Optional CRDT-style parent version (for merge semantics).
|
|
sequence: Optional sequence number for ordering.
|
|
signature: Optional signature or provenance tag for the delta.
|
|
"""
|
|
|
|
agent_id: str
|
|
delta: Dict[str, float] = field(default_factory=dict)
|
|
timestamp: float = field(default_factory=lambda: time.time())
|
|
contract_id: str = "default"
|
|
parent_version: Optional[int] = None
|
|
sequence: Optional[int] = None
|
|
nonce: Optional[int] = None
|
|
signature: Optional[str] = None
|
|
|
|
def to_json(self) -> str:
|
|
"""Serialize this PlanDelta to a deterministic JSON string.
|
|
|
|
This method is intentionally lightweight and stable for MVP tests
|
|
and external observers. The structure mirrors the dataclass fields
|
|
with a sorted JSON payload for replay and signing workflows.
|
|
"""
|
|
payload = {
|
|
"agent_id": self.agent_id,
|
|
"delta": self.delta,
|
|
"timestamp": self.timestamp,
|
|
"contract_id": self.contract_id,
|
|
"parent_version": self.parent_version,
|
|
"sequence": self.sequence,
|
|
"nonce": self.nonce,
|
|
"signature": self.signature,
|
|
}
|
|
return json.dumps(payload, sort_keys=True)
|
|
|
|
|
|
@dataclass
|
|
class CaCContract:
|
|
"""Contract-as-Code (CaC) minimal representation."""
|
|
|
|
contract_id: str
|
|
version: int
|
|
content: Dict[str, Any]
|
|
|
|
|
|
@dataclass
|
|
class SignedCaCContract:
|
|
"""Signed CaCContract wrapper."""
|
|
|
|
contract: CaCContract
|
|
signature: str
|
|
|
|
def to_json(self) -> str:
|
|
payload = {
|
|
"contract_id": self.contract.contract_id,
|
|
"version": self.contract.version,
|
|
"content": self.contract.content,
|
|
"signature": self.signature,
|
|
}
|
|
return json.dumps(payload)
|
|
|
|
|
|
class CaCRegistry:
|
|
"""In-memory registry for CaCContract instances."""
|
|
_store: Dict[str, CaCContract] = {}
|
|
|
|
@classmethod
|
|
def register(cls, contract: CaCContract) -> None:
|
|
cls._store[contract.contract_id] = contract
|
|
|
|
@classmethod
|
|
def get(cls, contract_id: str) -> Optional[CaCContract]:
|
|
return cls._store.get(contract_id)
|
|
|
|
|
|
def sign_ca_contract(contract: CaCContract, key: str) -> SignedCaCContract:
|
|
"""Very lightweight signing for MVP. Returns a SignedCaCContract."""
|
|
# Create a deterministic digest of contract data combined with a key.
|
|
payload = {
|
|
"contract_id": contract.contract_id,
|
|
"version": contract.version,
|
|
"content": contract.content,
|
|
}
|
|
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
|
|
digest = hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest()
|
|
return SignedCaCContract(contract=contract, signature=digest)
|
|
|
|
|
|
def crdt_merge_deltas(d1: PlanDelta, d2: PlanDelta) -> PlanDelta:
|
|
"""Merge two PlanDelta objects with simple last-writer-wins semantics for keys."""
|
|
merged_delta = dict(d1.delta)
|
|
merged_delta.update(d2.delta)
|
|
# Timestamp goes to the latest
|
|
ts = max(d1.timestamp, d2.timestamp)
|
|
# Choose the second delta's identity as the merged agent/contract for simplicity
|
|
merged = PlanDelta(
|
|
agent_id=d2.agent_id,
|
|
delta=merged_delta,
|
|
timestamp=ts,
|
|
contract_id=d2.contract_id or d1.contract_id,
|
|
parent_version=None,
|
|
sequence=None,
|
|
signature=None,
|
|
)
|
|
return merged
|
|
|
|
|
|
class ContractRegistry:
|
|
"""Lightweight registry surface to satisfy tests.
|
|
|
|
Exposes a minimal API:
|
|
- register_schema(name, version, schema)
|
|
- get_schema(name, version)
|
|
- validate_against_schema(payload, schema)
|
|
"""
|
|
|
|
_schemas: Dict[tuple, Dict[str, Any]] = {
|
|
("PlanDelta", 1): {
|
|
"required": ["agent_id", "delta", "timestamp"],
|
|
"types": {
|
|
"agent_id": str,
|
|
"delta": dict,
|
|
"timestamp": (int, float),
|
|
},
|
|
}
|
|
}
|
|
|
|
@classmethod
|
|
def get_schema(cls, name: str, version: int) -> Optional[Dict[str, Any]]:
|
|
return cls._schemas.get((name, version))
|
|
|
|
@classmethod
|
|
def validate_against_schema(cls, payload: Dict[str, Any], schema: Dict[str, Any]) -> bool:
|
|
# Basic required-field check
|
|
required = schema.get("required", [])
|
|
for field in required:
|
|
if field not in payload:
|
|
return False
|
|
# Type checking (best-effort)
|
|
types_map = schema.get("types", {})
|
|
for field, expected in types_map.items():
|
|
if field in payload:
|
|
val = payload[field]
|
|
if isinstance(expected, tuple):
|
|
if not isinstance(val, expected):
|
|
return False
|
|
else:
|
|
if not isinstance(val, expected):
|
|
return False
|
|
return True
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Simple API surface for Contract-as-Code (CaC) operations (MVP)
|
|
# ---------------------------------------------------------------------------
|
|
def register_contract(contract_id: str, version: int, content: Dict[str, Any], signer_id: Optional[str] = None) -> CaCContract:
|
|
"""Register a CaC contract and optionally sign and publish provenance.
|
|
|
|
If signer_id is provided and a matching key is registered in SignerStore,
|
|
this will also produce a signed contract artifact and register provenance
|
|
in the GoCRegistry for traceability.
|
|
"""
|
|
contract = CaCContract(contract_id=contract_id, version=version, content=content)
|
|
CaCRegistry.register(contract)
|
|
if signer_id is not None:
|
|
signed = sign_ca_contract(contract, signer_id)
|
|
GoCRegistry.register_signed_contract(contract, signer=signer_id, signature=signed.signature)
|
|
return contract
|
|
|
|
|
|
def push_signal(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta:
|
|
"""Sign and push a PlanDelta into the GoC provenance registry for a contract."""
|
|
signed = sign_plan_delta(delta, signer_id)
|
|
signed.contract_id = contract_id
|
|
GoCRegistry.push_delta(contract_id, signed)
|
|
return signed
|
|
|
|
|
|
def propose_delta(contract_id: str, delta: PlanDelta, signer_id: str) -> PlanDelta:
|
|
"""Convenience: sign a delta and push it for a contract (idempotent path)."""
|
|
signed = sign_plan_delta(delta, signer_id)
|
|
signed.contract_id = contract_id
|
|
GoCRegistry.push_delta(contract_id, signed)
|
|
return signed
|
|
|
|
|
|
def get_provenance(contract_id: str) -> list:
|
|
"""Return the provenance (delta history) for a given contract."""
|
|
return GoCRegistry.get_provenance(contract_id)
|
|
|
|
@dataclass
|
|
class PrivacyBudget:
|
|
"""Simple privacy budget block to accompany signals.
|
|
|
|
This is intentionally small for MVP purposes. It can carry information
|
|
about the remaining privacy budget for a stream and an expiry timestamp.
|
|
"""
|
|
|
|
signal: Dict[str, float] = field(default_factory=dict)
|
|
budget: float = 0.0
|
|
expiry: float | None = None
|
|
|
|
|
|
@dataclass
|
|
class AuditLog:
|
|
"""Auditable log entry for governance and provenance."""
|
|
|
|
entry: str
|
|
signer: str
|
|
timestamp: float = field(default_factory=lambda: time.time())
|
|
contract_id: str = "default"
|
|
|
|
|
|
@dataclass
|
|
class SharedSchedule:
|
|
"""Placeholder shared schedule object for MVP.
|
|
|
|
In a fuller implementation this would carry scheduling constraints and
|
|
execution windows shared across agents.
|
|
"""
|
|
|
|
schedule: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
@dataclass
|
|
class ResourceUsage:
|
|
"""Lightweight resource usage record."""
|
|
|
|
resources: Dict[str, float] = field(default_factory=dict)
|
|
timestamp: float = field(default_factory=lambda: time.time())
|