101 lines
3.7 KiB
Python
101 lines
3.7 KiB
Python
import hashlib
|
|
import json
|
|
from typing import List
|
|
|
|
def _hash_bytes(data: bytes) -> str:
|
|
return hashlib.sha256(data).hexdigest()
|
|
|
|
def merkle_root(digests: List[str]) -> str:
|
|
if not digests:
|
|
return ""
|
|
level = [bytes.fromhex(d) for d in digests]
|
|
while len(level) > 1:
|
|
next_level = []
|
|
for i in range(0, len(level), 2):
|
|
left = level[i]
|
|
right = level[i+1] if i+1 < len(level) else level[i]
|
|
next_level.append(hashlib.sha256(left + right).digest())
|
|
level = next_level
|
|
return level[0].hex()
|
|
|
|
def merkle_path_for_index(digests: List[str], index: int) -> dict:
|
|
"""Compute Merkle root and the Merkle path for a given leaf index.
|
|
|
|
Returns a dict with keys:
|
|
- root: the Merkle root for the given leaves
|
|
- path: list of sibling digests (hex) from leaf to root
|
|
"""
|
|
if not digests:
|
|
return {"root": "", "path": []}
|
|
# convert hex digests to bytes for hashing
|
|
level = [bytes.fromhex(d) for d in digests]
|
|
idx = index
|
|
path = []
|
|
while len(level) > 1:
|
|
# determine sibling index for current level
|
|
sib_index = idx ^ 1 # toggle last bit -> sibling index
|
|
if 0 <= sib_index < len(level):
|
|
path.append(level[sib_index].hex())
|
|
else:
|
|
# no sibling, duplicate the current node
|
|
path.append(level[idx].hex())
|
|
# build next level
|
|
next_level = []
|
|
for i in range(0, len(level), 2):
|
|
left = level[i]
|
|
right = level[i+1] if i+1 < len(level) else level[i]
|
|
next_level.append(hashlib.sha256(left + right).digest())
|
|
level = next_level
|
|
idx = idx // 2
|
|
root = level[0].hex()
|
|
return {"root": root, "path": path}
|
|
|
|
class DeltaLog:
|
|
def __init__(self):
|
|
self.entries = [] # each entry is a dict with digest and payload
|
|
self.anchor = None # optional cloud/ground anchor for global verifiability
|
|
|
|
def add_entry(self, entry: dict) -> str:
|
|
# entry must be serializable, and we store a digest for Merkle
|
|
payload_bytes = json.dumps(entry, sort_keys=True).encode('utf-8')
|
|
digest = hashlib.sha256(payload_bytes).hexdigest()
|
|
self.entries.append({
|
|
"digest": digest,
|
|
"payload": entry,
|
|
})
|
|
return digest
|
|
|
|
def anchor_root(self, anchor: str) -> None:
|
|
"""Record an optional anchor for the current delta log.
|
|
|
|
This does not alter existing entries; it simply stores a reference
|
|
to a trusted anchor (e.g., ground control) to tie the local log to
|
|
an external verifiable state.
|
|
"""
|
|
self.anchor = anchor
|
|
|
|
def delta_from_index(self, index: int) -> List[dict]:
|
|
# Return full LedgerEntry dictionaries augmented with Merkle proofs.
|
|
# This enables recipients to reconstruct and verify the delta with
|
|
# compact proofs, while preserving compatibility with existing code.
|
|
leaves = [e["digest"] for e in self.entries]
|
|
result = []
|
|
for i in range(index, len(self.entries)):
|
|
entry = self.entries[i]
|
|
# Compute Merkle path for this leaf with respect to all leaves
|
|
path_info = merkle_path_for_index(leaves, i)
|
|
# The payload is the LedgerEntry dict previously stored in 'payload'
|
|
full_entry = entry["payload"].copy()
|
|
# Attach meta for verification without altering the payload structure
|
|
full_entry.update({
|
|
"digest": entry["digest"],
|
|
"delta_root": path_info["root"],
|
|
"merkle_path": path_info["path"],
|
|
})
|
|
result.append(full_entry)
|
|
return result
|
|
|
|
def root(self) -> str:
|
|
digests = [e["digest"] for e in self.entries]
|
|
return merkle_root(digests)
|