openbench-privacy-preservin.../openbench_privacy_preservin.../contracts.py

53 lines
1.4 KiB
Python

from __future__ import annotations
import uuid
from dataclasses import dataclass, asdict
from typing import Dict, Any
from .core import KPIRecord
@dataclass
class DataContract:
"""A minimal, versioned data contract for KPI payloads.
This is intentionally lightweight for MVP purposes. Each contract
defines a unique id, a version, a JSON schema-like payload description,
and privacy controls flags used by aggregators.
"""
contract_id: str
version: int
schema: Dict[str, Any]
privacy_flags: Dict[str, Any]
description: str = ""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@staticmethod
def make_new(schema: Dict[str, Any], privacy_flags: Dict[str, Any], description: str = "") -> DataContract:
return DataContract(
contract_id=uuid.uuid4().hex,
version=1,
schema=schema,
privacy_flags=privacy_flags,
description=description,
)
class ContractRegistry:
"""In-memory registry for data contracts."""
def __init__(self) -> None:
self._contracts: Dict[str, DataContract] = {}
def register(self, contract: DataContract) -> None:
self._contracts[contract.contract_id] = contract
def get(self, contract_id: str) -> DataContract | None:
return self._contracts.get(contract_id)
def all(self) -> Dict[str, DataContract]:
return dict(self._contracts)