import json import hashlib import time from typing import List, Dict, Any import hmac # Simple, self-contained MVP: local provenance ledger with a Merkle audit log HASH_ALGO = hashlib.sha256 SIGNING_KEY = b"demo-secret-key" # In a real product, use a proper KMS/PKI; kept here for MVP. def _serialize(obj: Any) -> bytes: return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8") def _hash_block(block: Dict[str, Any]) -> str: data = _serialize(block) return HASH_ALGO(data).hexdigest() def _sign(data: bytes) -> str: # Simple HMAC-based signature for MVP (not a full cryptographic signature scheme) return hmac.new(SIGNING_KEY, data, HASH_ALGO).hexdigest() class LocalProvenanceBlock: def __init__(self, author: str, tool: str, action: str, metadata: Dict[str, Any], license_: str): self.author = author self.tool = tool self.action = action # e.g., "create", "modify" self.metadata = metadata self.license = license_ self.timestamp = time.time() self.block_id = hashlib.sha256(f"{author}:{tool}:{action}:{self.timestamp}".encode("utf-8")).hexdigest() self.signature = None # to be filled by ledger when appended def to_dict(self) -> Dict[str, Any]: return { "block_id": self.block_id, "author": self.author, "tool": self.tool, "action": self.action, "metadata": self.metadata, "license": self.license, "timestamp": self.timestamp, "signature": self.signature, } def __repr__(self) -> str: return f"LocalProvenanceBlock(id={self.block_id})" class MerkleAuditLog: def __init__(self): self.blocks: List[Dict[str, Any]] = [] self.merkle_root: str = "" def append(self, block: LocalProvenanceBlock) -> None: blob = block.to_dict() blob["signature"] = block.signature self.blocks.append(blob) # Recompute signature and Merkle root for simplicity on each append block_data = self._compute_hash_chain() self.merkle_root = block_data[0] if isinstance(block_data, (list, tuple)) else block_data def _compute_hash_chain(self) -> List[str]: leaves = [HASH_ALGO(_serialize(b)).hexdigest() for b in self.blocks] if not leaves: return [""] # Simple binary Merkle; pad with last leaf if needed current = leaves while len(current) > 1: next_level = [] for i in range(0, len(current), 2): a = current[i] b = current[i + 1] if i + 1 < len(current) else a next_level.append(HASH_ALGO((a + b).encode("utf-8")).hexdigest()) current = next_level return current def get_root(self) -> str: return self.merkle_root def to_dict(self) -> Dict[str, Any]: return { "root": self.merkle_root, "count": len(self.blocks), "blocks": self.blocks, } class DeltaSync: def __init__(self, log: MerkleAuditLog): self.log = log self.synced_index = 0 def create_delta(self) -> Dict[str, Any]: # Return the delta since last sync delta_blocks = self.log.blocks[self.synced_index :] delta = { "start_index": self.synced_index, "count": len(delta_blocks), "blocks": delta_blocks, "root": self.log.get_root(), } self.synced_index = len(self.log.blocks) return delta def apply_delta(self, delta: Dict[str, Any]) -> None: # For MVP: simply set internal state to the provided delta's root and blocks # In real use, this would verify delta provenance and apply safely for b in delta.get("blocks", []): if b not in self.log.blocks: self.log.blocks.append(b) self.log.merkle_root = delta.get("root", self.log.get_root()) def is_in_sync_with(self, other_root: str) -> bool: return self.log.get_root() == other_root class Adapter: def __init__(self, author: str, license_: str = "CC-BY-4.0"): self.author = author self.license = license_ def emit(self) -> LocalProvenanceBlock: raise NotImplementedError class BlenderAdapter(Adapter): def emit(self) -> LocalProvenanceBlock: block = LocalProvenanceBlock( author=self.author, tool="Blender", action="create_asset", metadata={"asset_type": "3d_model", "scene": "SampleScene"}, license_=self.license, ) return block class FigmaAdapter(Adapter): def emit(self) -> LocalProvenanceBlock: block = LocalProvenanceBlock( author=self.author, tool="Figma", action="update_design", metadata={"frame": "HeroSection", "pages": ["Landing", "Docs"]}, license_=self.license, ) return block def attach_signature(block: LocalProvenanceBlock) -> None: data = block.to_dict() # Sign the block serialization excluding signature itself data.pop("signature", None) sig = _sign(_serialize(data)) block.signature = sig