nebulaforge-offline-resilie.../tests/test_catopt_bridge.py

29 lines
1.2 KiB
Python

import json
from nebulaforge.catopt_bridge import GoCRegistry, FoundationModelBlock, PlanningPolicyBlock, LocalProblemBlock
def test_registry_adapter_and_delta_logging():
reg = GoCRegistry()
# Register an adapter
ad = reg.register_adapter("rover-runtime", ["rovers", "drones"])
assert ad["adapter_id"] == "rover-runtime"
assert "rovers" in ad["supported_domains"]
# Publish a simple contract payload
contract = reg.publish_contract("foundation-1", {"type": "FoundationModel", "version": 1})
assert contract["contract_id"] == "foundation-1"
# Create and log a delta for the contract
delta = {"state": {"planning": {"holen": 1}}}
log_entry = reg.log_delta(delta, "foundation-1")
assert "signature" in log_entry
sig_hex = log_entry["signature"]
assert isinstance(sig_hex, str) and len(sig_hex) > 0
# Verify the signature deterministically using the same payload (replay-safe)
ts = log_entry["payload"]["timestamp"]
verify_payload = {"delta": delta, "contract_id": "foundation-1", "timestamp": ts}
verify_bytes = json.dumps(verify_payload, sort_keys=True).encode()
assert reg.verify(verify_bytes, bytes.fromhex(sig_hex))