from __future__ import annotations from typing import List, Dict import hashlib class Signer: """Simple cryptographic signer placeholder. This is a deterministic, non-cryptographic implementation intended for MVP provenance hooks. """ @staticmethod def sign(payload: str, key: str) -> str: h = hashlib.sha256() h.update(payload.encode("utf-8")) h.update(key.encode("utf-8")) return h.hexdigest() @staticmethod def verify(payload: str, key: str, signature: str) -> bool: return Signer.sign(payload, key) == signature class SecureAggregator: """Placeholder secure-aggregation primitive. In a real system this would perform cryptographic aggregation (e.g., via MPC). Here we simply sum numeric signals and average others defensively. """ @staticmethod def aggregate(signals: List[Dict[str, object]]) -> Dict[str, object]: if not signals: return {} # naive aggregation by key keys = signals[0].keys() result: Dict[str, object] = {} for k in keys: try: vals = [float(s.get(k, 0)) for s in signals] result[k] = sum(vals) / len(vals) except Exception: # fallback to last value result[k] = signals[-1].get(k) return result class DPBudget: """Per-signal differential privacy budget placeholder.""" def __init__(self, total_budget: float = 1.0) -> None: self.total_budget = float(total_budget) self.used = 0.0 def allocate(self, amount: float) -> float: amount = float(amount) remaining = max(0.0, self.total_budget - self.used) take = min(remaining, amount) self.used += take return take def sign_audit_log(audit_log: object, signer_id: str, key: str) -> object: """Return a new AuditLog with a computed signature, chaining signer and payload. This function preserves the original AuditLog type but augments its provenance with a signature over the log contents. It is a thin MVP hook for verifiable provenance. """ # Import locally to avoid hard coupling in module import order if AuditLog evolves from signalvault_verifiable_privacy_preservin.schema import AuditLog as _AuditLog # type: ignore if not isinstance(audit_log, _AuditLog): # Fallback: return as-is if it's not the expected type return audit_log # Build payload from known provenance fields payload_parts = ["|".join(audit_log.entries), str(audit_log.timestamp), audit_log.contract_id or "", signer_id] payload = "|".join([p for p in payload_parts if p is not None and p != ""]) signature = Signer.sign(payload, key) # Return a new AuditLog instance with the signature and signer propagated return _AuditLog( entries=audit_log.entries, signer=signer_id, timestamp=audit_log.timestamp, contract_id=audit_log.contract_id, parent_hash=audit_log.parent_hash, signature=signature, )