build(agent): new-agents#a6e6ec iteration

This commit is contained in:
agent-a6e6ec231c5f7801 2026-04-19 18:49:24 +02:00
parent 4b7010273c
commit 6426494c04
2 changed files with 133 additions and 0 deletions

View File

@ -13,6 +13,92 @@ import json
import hashlib import hashlib
import time import time
# Lightweight in-memory signer registry for MVP provenance
class SignerStore:
"""In-memory signer key registry for MVP provenance."""
_keys: Dict[str, str] = {}
@classmethod
def register(cls, signer_id: str, key: str) -> None:
cls._keys[signer_id] = key
@classmethod
def get_key(cls, signer_id: str) -> Optional[str]:
return cls._keys.get(signer_id)
class GoCRegistry:
"""Graph-of-Contracts registry (in-memory MVP).
Stores signed CaC contracts and a delta history per contract to enable
provenance and deterministic replay in offline scenarios.
"""
_contracts: Dict[str, CaCContract] = {}
_signatures: Dict[str, str] = {}
_deltas: Dict[str, list] = {}
@classmethod
def register_signed_contract(cls, contract: CaCContract, signer: str, signature: str) -> None:
cls._contracts[contract.contract_id] = contract
cls._signatures[contract.contract_id] = signature
@classmethod
def get_signed_contract(cls, contract_id: str) -> Optional[Dict[str, Any]]:
c = cls._contracts.get(contract_id)
if not c:
return None
return {
"contract": c,
"signature": cls._signatures.get(contract_id),
}
@classmethod
def push_delta(cls, contract_id: str, delta: PlanDelta) -> None:
cls._deltas.setdefault(contract_id, []).append(delta)
@classmethod
def get_provenance(cls, contract_id: str) -> list:
return list(cls._deltas.get(contract_id, []))
def _delta_sign_digest(delta: PlanDelta, key: str) -> str:
payload = {
"agent_id": delta.agent_id,
"delta": delta.delta,
"timestamp": delta.timestamp,
"contract_id": delta.contract_id,
"parent_version": delta.parent_version,
"sequence": delta.sequence,
}
payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8")
return hashlib.sha256(payload_bytes + key.encode("utf-8")).hexdigest()
def sign_plan_delta(delta: PlanDelta, signer_id: str) -> PlanDelta:
"""Sign a PlanDelta using the signer's key from SignerStore.
This is a lightweight MVP signer; in a real system this would be a proper
crypto signature using a key management service.
"""
key = SignerStore.get_key(signer_id)
if not key:
# If no key registered, leave signature as None
delta.signature = None
return delta
delta.signature = _delta_sign_digest(delta, key)
return delta
def verify_plan_delta_signature(delta: PlanDelta) -> bool:
"""Verify the PlanDelta signature if signer key is known."""
if delta.signature is None:
return False
key = SignerStore.get_key(delta.agent_id)
if not key:
return False
expected = _delta_sign_digest(delta, key)
return delta.signature == expected
@dataclass @dataclass
class PlanDelta: class PlanDelta:

View File

@ -0,0 +1,47 @@
import pytest
from nova_plan.contracts import (
PlanDelta,
CaCContract,
sign_ca_contract,
GoCRegistry,
SignerStore,
sign_plan_delta,
verify_plan_delta_signature,
)
def test_ca_contract_sign_and_registry_roundtrip():
# register signer key
SignerStore.register("alice", "alice-key-123")
contract = CaCContract(contract_id="ic-contract-1", version=1, content={"name": "Test"})
signed = sign_ca_contract(contract, "alice-key-123")
# store in GoC registry
GoCRegistry.register_signed_contract(contract, signer="alice", signature=signed.signature)
assert GoCRegistry.get_signed_contract("ic-contract-1")["signature"] == signed.signature
def test_plan_delta_sign_and_verify():
SignerStore.register("alice", "alice-key-123")
delta = PlanDelta(agent_id="alice", delta={"task": 1}, contract_id="ic-contract-1")
signed_delta = sign_plan_delta(delta, signer_id="alice")
assert signed_delta.signature is not None
assert verify_plan_delta_signature(signed_delta)
# push to provenance registry
GoCRegistry.push_delta("ic-contract-1", signed_delta)
prov = GoCRegistry.get_provenance("ic-contract-1")
assert len(prov) >= 1 and prov[-1].agent_id == "alice"
# tamper should fail verification
tampered = PlanDelta(
agent_id="alice",
delta={"task": 999},
contract_id="ic-contract-1",
timestamp=signed_delta.timestamp,
signature=signed_delta.signature,
)
# mutate the delta content after signing should invalidate signature
tampered.delta["task"] = 1000
assert not verify_plan_delta_signature(tampered)