257 lines
8.3 KiB
Python
257 lines
8.3 KiB
Python
import json
|
|
import hashlib
|
|
import time
|
|
from typing import List, Dict, Any, Optional
|
|
import hmac
|
|
|
|
# Simple, self-contained MVP: local provenance ledger with a Merkle audit log
|
|
HASH_ALGO = hashlib.sha256
|
|
SIGNING_KEY = b"demo-secret-key" # In a real product, use a proper KMS/PKI; kept here for MVP.
|
|
|
|
|
|
def _serialize(obj: Any) -> bytes:
|
|
return json.dumps(obj, sort_keys=True, separators=(",", ":")).encode("utf-8")
|
|
|
|
|
|
def _hash_block(block: Dict[str, Any]) -> str:
|
|
data = _serialize(block)
|
|
return HASH_ALGO(data).hexdigest()
|
|
|
|
|
|
def _sign(data: bytes) -> str:
|
|
# Simple HMAC-based signature for MVP (not a full cryptographic signature scheme)
|
|
return hmac.new(SIGNING_KEY, data, HASH_ALGO).hexdigest()
|
|
|
|
|
|
class LocalProvenanceBlock:
|
|
"""A single provenance step with rich context for cross-tool workflows.
|
|
|
|
Extended MVP fields (optional): prompt, model_version, seed, parameters,
|
|
sources, outputs. These complement the core fields to support richer
|
|
provenance while remaining backward-compatible with existing usage.
|
|
"""
|
|
def __init__(
|
|
self,
|
|
author: str,
|
|
tool: str,
|
|
action: str,
|
|
metadata: Dict[str, Any],
|
|
license_: str,
|
|
prompt: Optional[str] = None,
|
|
model_version: Optional[str] = None,
|
|
seed: Optional[Any] = None,
|
|
parameters: Optional[Dict[str, Any]] = None,
|
|
sources: Optional[List[str]] = None,
|
|
outputs: Optional[Dict[str, Any]] = None,
|
|
):
|
|
self.author = author
|
|
self.tool = tool
|
|
self.action = action # e.g., "create", "modify"
|
|
self.metadata = metadata
|
|
self.license = license_
|
|
self.prompt = prompt
|
|
self.model_version = model_version
|
|
self.seed = seed
|
|
self.parameters = parameters
|
|
self.sources = sources
|
|
self.outputs = outputs
|
|
self.timestamp = time.time()
|
|
self.block_id = hashlib.sha256(f"{author}:{tool}:{action}:{self.timestamp}".encode("utf-8")).hexdigest()
|
|
self.signature: Optional[str] = None # to be filled by ledger when appended
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
d: Dict[str, Any] = {
|
|
"block_id": self.block_id,
|
|
"author": self.author,
|
|
"tool": self.tool,
|
|
"action": self.action,
|
|
"metadata": self.metadata,
|
|
"license": self.license,
|
|
"timestamp": self.timestamp,
|
|
"signature": self.signature,
|
|
}
|
|
if self.prompt is not None:
|
|
d["prompt"] = self.prompt
|
|
if self.model_version is not None:
|
|
d["model_version"] = self.model_version
|
|
if self.seed is not None:
|
|
d["seed"] = self.seed
|
|
if self.parameters is not None:
|
|
d["parameters"] = self.parameters
|
|
if self.sources is not None:
|
|
d["sources"] = self.sources
|
|
if self.outputs is not None:
|
|
d["outputs"] = self.outputs
|
|
return d
|
|
|
|
def __repr__(self) -> str:
|
|
return f"LocalProvenanceBlock(id={self.block_id})"
|
|
|
|
|
|
class MerkleAuditLog:
|
|
def __init__(self):
|
|
self.blocks: List[Dict[str, Any]] = []
|
|
self.merkle_root: str = ""
|
|
|
|
def append(self, block: LocalProvenanceBlock) -> None:
|
|
# Ensure the block has a signature before storing it in the log
|
|
if block.signature is None:
|
|
attach_signature(block)
|
|
blob = block.to_dict()
|
|
self.blocks.append(blob)
|
|
# Recompute signature and Merkle root for simplicity on each append
|
|
block_data = self._compute_hash_chain()
|
|
self.merkle_root = block_data[0] if isinstance(block_data, (list, tuple)) else block_data
|
|
|
|
def _compute_hash_chain(self) -> List[str]:
|
|
leaves = [HASH_ALGO(_serialize(b)).hexdigest() for b in self.blocks]
|
|
if not leaves:
|
|
return [""]
|
|
# Simple binary Merkle; pad with last leaf if needed
|
|
current = leaves
|
|
while len(current) > 1:
|
|
next_level = []
|
|
for i in range(0, len(current), 2):
|
|
a = current[i]
|
|
b = current[i + 1] if i + 1 < len(current) else a
|
|
next_level.append(HASH_ALGO((a + b).encode("utf-8")).hexdigest())
|
|
current = next_level
|
|
return current
|
|
|
|
def get_root(self) -> str:
|
|
return self.merkle_root
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"root": self.merkle_root,
|
|
"count": len(self.blocks),
|
|
"blocks": self.blocks,
|
|
}
|
|
|
|
|
|
class DeltaSync:
|
|
def __init__(self, log: MerkleAuditLog):
|
|
self.log = log
|
|
self.synced_index = 0
|
|
|
|
def create_delta(self) -> Dict[str, Any]:
|
|
# Return the delta since last sync
|
|
delta_blocks = self.log.blocks[self.synced_index :]
|
|
delta = {
|
|
"start_index": self.synced_index,
|
|
"count": len(delta_blocks),
|
|
"blocks": delta_blocks,
|
|
"root": self.log.get_root(),
|
|
}
|
|
self.synced_index = len(self.log.blocks)
|
|
return delta
|
|
|
|
def apply_delta(self, delta: Dict[str, Any]) -> None:
|
|
# For MVP: simply set internal state to the provided delta's root and blocks
|
|
# In real use, this would verify delta provenance and apply safely
|
|
for b in delta.get("blocks", []):
|
|
if b not in self.log.blocks:
|
|
self.log.blocks.append(b)
|
|
self.log.merkle_root = delta.get("root", self.log.get_root())
|
|
|
|
def is_in_sync_with(self, other_root: str) -> bool:
|
|
return self.log.get_root() == other_root
|
|
|
|
|
|
class Adapter:
|
|
def __init__(self, author: str, license_: str = "CC-BY-4.0"):
|
|
self.author = author
|
|
self.license = license_
|
|
|
|
def emit(self) -> LocalProvenanceBlock:
|
|
raise NotImplementedError
|
|
|
|
|
|
class BlenderAdapter(Adapter):
|
|
def emit(self) -> LocalProvenanceBlock:
|
|
block = LocalProvenanceBlock(
|
|
author=self.author,
|
|
tool="Blender",
|
|
action="create_asset",
|
|
metadata={"asset_type": "3d_model", "scene": "SampleScene"},
|
|
license_=self.license,
|
|
)
|
|
return block
|
|
|
|
|
|
class FigmaAdapter(Adapter):
|
|
def emit(self) -> LocalProvenanceBlock:
|
|
block = LocalProvenanceBlock(
|
|
author=self.author,
|
|
tool="Figma",
|
|
action="update_design",
|
|
metadata={"frame": "HeroSection", "pages": ["Landing", "Docs"]},
|
|
license_=self.license,
|
|
)
|
|
return block
|
|
|
|
|
|
def attach_signature(block: LocalProvenanceBlock) -> None:
|
|
data = block.to_dict()
|
|
# Sign the block serialization excluding signature itself
|
|
data.pop("signature", None)
|
|
sig = _sign(_serialize(data))
|
|
block.signature = sig
|
|
|
|
|
|
class LicenseContract:
|
|
"""A simple license contract artifact for provenance governance."""
|
|
|
|
def __init__(self, contract_id: str, terms: str, version: int = 1, signer: Optional[str] = None, timestamp: Optional[float] = None):
|
|
self.contract_id = contract_id
|
|
self.terms = terms
|
|
self.version = version
|
|
self.signer = signer
|
|
self.timestamp = timestamp if timestamp is not None else time.time()
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"contract_id": self.contract_id,
|
|
"terms": self.terms,
|
|
"version": self.version,
|
|
"signer": self.signer,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
def __repr__(self) -> str:
|
|
return f"LicenseContract(id={self.contract_id}, v{self.version})"
|
|
|
|
|
|
def _serialize_contract(c: LicenseContract) -> bytes:
|
|
return _serialize(c.to_dict())
|
|
|
|
|
|
def sign_contract(contract: LicenseContract) -> str:
|
|
return _sign(_serialize_contract(contract))
|
|
|
|
|
|
class SchemaRegistry:
|
|
"""Lightweight in-process schema registry for prompts and contracts."""
|
|
|
|
def __init__(self) -> None:
|
|
self._registry: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def register_schema(self, name: str, schema: Dict[str, Any]) -> None:
|
|
self._registry[name] = schema
|
|
|
|
def get_schema(self, name: str) -> Dict[str, Any]:
|
|
return self._registry.get(name, {})
|
|
|
|
|
|
class ContractMarketplace:
|
|
"""Tiny in-memory marketplace for licenses/contracts."""
|
|
|
|
def __init__(self) -> None:
|
|
self._contracts: Dict[str, LicenseContract] = {}
|
|
|
|
def publish_contract(self, contract: LicenseContract) -> None:
|
|
self._contracts[contract.contract_id] = contract
|
|
|
|
def list_contracts(self) -> List[Dict[str, Any]]:
|
|
return [c.to_dict() for c in self._contracts.values()]
|