88 lines
3.0 KiB
Python
88 lines
3.0 KiB
Python
from __future__ import annotations
|
|
from typing import List, Dict
|
|
import hashlib
|
|
|
|
|
|
class Signer:
|
|
"""Simple cryptographic signer placeholder.
|
|
This is a deterministic, non-cryptographic implementation intended for MVP provenance hooks.
|
|
"""
|
|
|
|
@staticmethod
|
|
def sign(payload: str, key: str) -> str:
|
|
h = hashlib.sha256()
|
|
h.update(payload.encode("utf-8"))
|
|
h.update(key.encode("utf-8"))
|
|
return h.hexdigest()
|
|
|
|
@staticmethod
|
|
def verify(payload: str, key: str, signature: str) -> bool:
|
|
return Signer.sign(payload, key) == signature
|
|
|
|
|
|
class SecureAggregator:
|
|
"""Placeholder secure-aggregation primitive.
|
|
In a real system this would perform cryptographic aggregation (e.g., via MPC).
|
|
Here we simply sum numeric signals and average others defensively.
|
|
"""
|
|
|
|
@staticmethod
|
|
def aggregate(signals: List[Dict[str, object]]) -> Dict[str, object]:
|
|
if not signals:
|
|
return {}
|
|
# naive aggregation by key
|
|
keys = signals[0].keys()
|
|
result: Dict[str, object] = {}
|
|
for k in keys:
|
|
try:
|
|
vals = [float(s.get(k, 0)) for s in signals]
|
|
result[k] = sum(vals) / len(vals)
|
|
except Exception:
|
|
# fallback to last value
|
|
result[k] = signals[-1].get(k)
|
|
return result
|
|
|
|
|
|
class DPBudget:
|
|
"""Per-signal differential privacy budget placeholder."""
|
|
|
|
def __init__(self, total_budget: float = 1.0) -> None:
|
|
self.total_budget = float(total_budget)
|
|
self.used = 0.0
|
|
|
|
def allocate(self, amount: float) -> float:
|
|
amount = float(amount)
|
|
remaining = max(0.0, self.total_budget - self.used)
|
|
take = min(remaining, amount)
|
|
self.used += take
|
|
return take
|
|
|
|
|
|
def sign_audit_log(audit_log: object, signer_id: str, key: str) -> object:
|
|
"""Return a new AuditLog with a computed signature, chaining signer and payload.
|
|
|
|
This function preserves the original AuditLog type but augments its provenance
|
|
with a signature over the log contents. It is a thin MVP hook for verifiable provenance.
|
|
"""
|
|
# Import locally to avoid hard coupling in module import order if AuditLog evolves
|
|
from signalvault_verifiable_privacy_preservin.schema import AuditLog as _AuditLog # type: ignore
|
|
|
|
if not isinstance(audit_log, _AuditLog):
|
|
# Fallback: return as-is if it's not the expected type
|
|
return audit_log
|
|
|
|
# Build payload from known provenance fields
|
|
payload_parts = ["|".join(audit_log.entries), str(audit_log.timestamp), audit_log.contract_id or "", signer_id]
|
|
payload = "|".join([p for p in payload_parts if p is not None and p != ""])
|
|
signature = Signer.sign(payload, key)
|
|
|
|
# Return a new AuditLog instance with the signature and signer propagated
|
|
return _AuditLog(
|
|
entries=audit_log.entries,
|
|
signer=signer_id,
|
|
timestamp=audit_log.timestamp,
|
|
contract_id=audit_log.contract_id,
|
|
parent_hash=audit_log.parent_hash,
|
|
signature=signature,
|
|
)
|