signalvault-verifiable-priv.../signalvault_verifiable_priv.../privacy.py

88 lines
3.0 KiB
Python

from __future__ import annotations
from typing import List, Dict
import hashlib
class Signer:
"""Simple cryptographic signer placeholder.
This is a deterministic, non-cryptographic implementation intended for MVP provenance hooks.
"""
@staticmethod
def sign(payload: str, key: str) -> str:
h = hashlib.sha256()
h.update(payload.encode("utf-8"))
h.update(key.encode("utf-8"))
return h.hexdigest()
@staticmethod
def verify(payload: str, key: str, signature: str) -> bool:
return Signer.sign(payload, key) == signature
class SecureAggregator:
"""Placeholder secure-aggregation primitive.
In a real system this would perform cryptographic aggregation (e.g., via MPC).
Here we simply sum numeric signals and average others defensively.
"""
@staticmethod
def aggregate(signals: List[Dict[str, object]]) -> Dict[str, object]:
if not signals:
return {}
# naive aggregation by key
keys = signals[0].keys()
result: Dict[str, object] = {}
for k in keys:
try:
vals = [float(s.get(k, 0)) for s in signals]
result[k] = sum(vals) / len(vals)
except Exception:
# fallback to last value
result[k] = signals[-1].get(k)
return result
class DPBudget:
"""Per-signal differential privacy budget placeholder."""
def __init__(self, total_budget: float = 1.0) -> None:
self.total_budget = float(total_budget)
self.used = 0.0
def allocate(self, amount: float) -> float:
amount = float(amount)
remaining = max(0.0, self.total_budget - self.used)
take = min(remaining, amount)
self.used += take
return take
def sign_audit_log(audit_log: object, signer_id: str, key: str) -> object:
"""Return a new AuditLog with a computed signature, chaining signer and payload.
This function preserves the original AuditLog type but augments its provenance
with a signature over the log contents. It is a thin MVP hook for verifiable provenance.
"""
# Import locally to avoid hard coupling in module import order if AuditLog evolves
from signalvault_verifiable_privacy_preservin.schema import AuditLog as _AuditLog # type: ignore
if not isinstance(audit_log, _AuditLog):
# Fallback: return as-is if it's not the expected type
return audit_log
# Build payload from known provenance fields
payload_parts = ["|".join(audit_log.entries), str(audit_log.timestamp), audit_log.contract_id or "", signer_id]
payload = "|".join([p for p in payload_parts if p is not None and p != ""])
signature = Signer.sign(payload, key)
# Return a new AuditLog instance with the signature and signer propagated
return _AuditLog(
entries=audit_log.entries,
signer=signer_id,
timestamp=audit_log.timestamp,
contract_id=audit_log.contract_id,
parent_hash=audit_log.parent_hash,
signature=signature,
)