import hashlib import json from typing import List def _hash_bytes(data: bytes) -> str: return hashlib.sha256(data).hexdigest() def merkle_root(digests: List[str]) -> str: if not digests: return "" level = [bytes.fromhex(d) for d in digests] while len(level) > 1: next_level = [] for i in range(0, len(level), 2): left = level[i] right = level[i+1] if i+1 < len(level) else level[i] next_level.append(hashlib.sha256(left + right).digest()) level = next_level return level[0].hex() def merkle_path_for_index(digests: List[str], index: int) -> dict: """Compute Merkle root and the Merkle path for a given leaf index. Returns a dict with keys: - root: the Merkle root for the given leaves - path: list of sibling digests (hex) from leaf to root """ if not digests: return {"root": "", "path": []} # convert hex digests to bytes for hashing level = [bytes.fromhex(d) for d in digests] idx = index path = [] while len(level) > 1: # determine sibling index for current level sib_index = idx ^ 1 # toggle last bit -> sibling index if 0 <= sib_index < len(level): path.append(level[sib_index].hex()) else: # no sibling, duplicate the current node path.append(level[idx].hex()) # build next level next_level = [] for i in range(0, len(level), 2): left = level[i] right = level[i+1] if i+1 < len(level) else level[i] next_level.append(hashlib.sha256(left + right).digest()) level = next_level idx = idx // 2 root = level[0].hex() return {"root": root, "path": path} def verify_delta_root(delta_entries: List[dict]) -> bool: """Verify that a sequence of delta entries shares a consistent Merkle root. Each delta entry is expected to contain a top-level "digest" field, which is the SHA-256 digest of the serialized entry payload. The function computes the Merkle root over all provided digests and ensures that every delta entry that carries a "delta_root" field agrees with the computed root. If no delta_root is present, the function returns True once a root can be derived from the available digests. This helper is intended for cross-node verification when exchanging delta blocks. """ if not delta_entries: return True # Collect all available digests in the delta entries digests = [e.get("digest") for e in delta_entries if e.get("digest") is not None] if not digests: return True computed_root = merkle_root(digests) # If entries provide an explicit delta_root, ensure consistency across them for e in delta_entries: explicit = e.get("delta_root") if explicit is not None and explicit != computed_root: return False return True class DeltaLog: def __init__(self): self.entries = [] # each entry is a dict with digest and payload self.anchor = None # optional cloud/ground anchor for global verifiability def add_entry(self, entry: dict) -> str: # entry must be serializable, and we store a digest for Merkle payload_bytes = json.dumps(entry, sort_keys=True).encode('utf-8') digest = hashlib.sha256(payload_bytes).hexdigest() self.entries.append({ "digest": digest, "payload": entry, }) return digest def anchor_root(self, anchor: str) -> None: """Record an optional anchor for the current delta log. This does not alter existing entries; it simply stores a reference to a trusted anchor (e.g., ground control) to tie the local log to an external verifiable state. """ self.anchor = anchor def delta_from_index(self, index: int) -> List[dict]: # Return full LedgerEntry dictionaries augmented with Merkle proofs. # This enables recipients to reconstruct and verify the delta with # compact proofs, while preserving compatibility with existing code. leaves = [e["digest"] for e in self.entries] result = [] for i in range(index, len(self.entries)): entry = self.entries[i] # Compute Merkle path for this leaf with respect to all leaves path_info = merkle_path_for_index(leaves, i) # The payload is the LedgerEntry dict previously stored in 'payload' full_entry = entry["payload"].copy() # Attach meta for verification without altering the payload structure full_entry.update({ "digest": entry["digest"], "delta_root": path_info["root"], "merkle_path": path_info["path"], }) result.append(full_entry) return result def root(self) -> str: digests = [e["digest"] for e in self.entries] return merkle_root(digests)