From 6426494c04ba765ce283d446457cb08a6ff02baa Mon Sep 17 00:00:00 2001 From: agent-a6e6ec231c5f7801 Date: Sun, 19 Apr 2026 18:49:24 +0200 Subject: [PATCH] build(agent): new-agents#a6e6ec iteration --- nova_plan/contracts.py | 86 ++++++++++++++++++++++++++++++++++++++ tests/test_ca_contracts.py | 47 +++++++++++++++++++++ 2 files changed, 133 insertions(+) create mode 100644 tests/test_ca_contracts.py diff --git a/nova_plan/contracts.py b/nova_plan/contracts.py index 623b0ed..cca94d0 100644 --- a/nova_plan/contracts.py +++ b/nova_plan/contracts.py @@ -13,6 +13,92 @@ import json import hashlib import time +# Lightweight in-memory signer registry for MVP provenance +class SignerStore: + """In-memory signer key registry for MVP provenance.""" + _keys: Dict[str, str] = {} + + @classmethod + def register(cls, signer_id: str, key: str) -> None: + cls._keys[signer_id] = key + + @classmethod + def get_key(cls, signer_id: str) -> Optional[str]: + return cls._keys.get(signer_id) + + +class GoCRegistry: + """Graph-of-Contracts registry (in-memory MVP). + + Stores signed CaC contracts and a delta history per contract to enable + provenance and deterministic replay in offline scenarios. + """ + _contracts: Dict[str, CaCContract] = {} + _signatures: Dict[str, str] = {} + _deltas: Dict[str, list] = {} + + @classmethod + def register_signed_contract(cls, contract: CaCContract, signer: str, signature: str) -> None: + cls._contracts[contract.contract_id] = contract + cls._signatures[contract.contract_id] = signature + + @classmethod + def get_signed_contract(cls, contract_id: str) -> Optional[Dict[str, Any]]: + c = cls._contracts.get(contract_id) + if not c: + return None + return { + "contract": c, + "signature": cls._signatures.get(contract_id), + } + + @classmethod + def push_delta(cls, contract_id: str, delta: PlanDelta) -> None: + cls._deltas.setdefault(contract_id, []).append(delta) + + @classmethod + def get_provenance(cls, contract_id: str) -> list: + return list(cls._deltas.get(contract_id, [])) + + +def _delta_sign_digest(delta: PlanDelta, key: str) -> str: + payload = { + "agent_id": delta.agent_id, + "delta": delta.delta, + "timestamp": delta.timestamp, + "contract_id": delta.contract_id, + "parent_version": delta.parent_version, + "sequence": delta.sequence, + } + payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8") + return hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest() + + +def sign_plan_delta(delta: PlanDelta, signer_id: str) -> PlanDelta: + """Sign a PlanDelta using the signer's key from SignerStore. + + This is a lightweight MVP signer; in a real system this would be a proper + crypto signature using a key management service. + """ + key = SignerStore.get_key(signer_id) + if not key: + # If no key registered, leave signature as None + delta.signature = None + return delta + delta.signature = _delta_sign_digest(delta, key) + return delta + + +def verify_plan_delta_signature(delta: PlanDelta) -> bool: + """Verify the PlanDelta signature if signer key is known.""" + if delta.signature is None: + return False + key = SignerStore.get_key(delta.agent_id) + if not key: + return False + expected = _delta_sign_digest(delta, key) + return delta.signature == expected + @dataclass class PlanDelta: diff --git a/tests/test_ca_contracts.py b/tests/test_ca_contracts.py new file mode 100644 index 0000000..57133d0 --- /dev/null +++ b/tests/test_ca_contracts.py @@ -0,0 +1,47 @@ +import pytest + +from nova_plan.contracts import ( + PlanDelta, + CaCContract, + sign_ca_contract, + GoCRegistry, + SignerStore, + sign_plan_delta, + verify_plan_delta_signature, +) + + +def test_ca_contract_sign_and_registry_roundtrip(): + # register signer key + SignerStore.register("alice", "alice-key-123") + + contract = CaCContract(contract_id="ic-contract-1", version=1, content={"name": "Test"}) + signed = sign_ca_contract(contract, "alice-key-123") + # store in GoC registry + GoCRegistry.register_signed_contract(contract, signer="alice", signature=signed.signature) + assert GoCRegistry.get_signed_contract("ic-contract-1")["signature"] == signed.signature + + +def test_plan_delta_sign_and_verify(): + SignerStore.register("alice", "alice-key-123") + delta = PlanDelta(agent_id="alice", delta={"task": 1}, contract_id="ic-contract-1") + signed_delta = sign_plan_delta(delta, signer_id="alice") + assert signed_delta.signature is not None + assert verify_plan_delta_signature(signed_delta) + + # push to provenance registry + GoCRegistry.push_delta("ic-contract-1", signed_delta) + prov = GoCRegistry.get_provenance("ic-contract-1") + assert len(prov) >= 1 and prov[-1].agent_id == "alice" + + # tamper should fail verification + tampered = PlanDelta( + agent_id="alice", + delta={"task": 999}, + contract_id="ic-contract-1", + timestamp=signed_delta.timestamp, + signature=signed_delta.signature, + ) + # mutate the delta content after signing should invalidate signature + tampered.delta["task"] = 1000 + assert not verify_plan_delta_signature(tampered)