"""Contract primitives for OpenGrowth federated experiments. A tiny, production-friendly DSL-inspired layer that models local experiments, shared signals and plan deltas. This provides a foundation for a Graph-of-Contracts style federation layer without pulling in heavyweight dependencies. """ from __future__ import annotations from dataclasses import dataclass, field from datetime import datetime from typing import Any, Dict, Optional @dataclass class LocalExperiment: """Represents a local experiment definition. - name: human-friendly name of the experiment - variables: mapping of controllable variables (e.g., price, onboarding steps) - privacy_budget: optional privacy budget hint for local DP or aggregation limits - metadata: optional extra information """ name: str variables: Dict[str, Any] = field(default_factory=dict) privacy_budget: Optional[Dict[str, Any]] = None metadata: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return { "name": self.name, "variables": self.variables, "privacy_budget": self.privacy_budget, "metadata": self.metadata, } @dataclass class SharedSignals: """Represents signals shared after local aggregation (aggregated metrics). - signals: a dictionary of metric_name -> aggregated_value - provenance: optional metadata about sources """ signals: Dict[str, Any] = field(default_factory=dict) provenance: Dict[str, Any] = field(default_factory=dict) def to_dict(self) -> Dict[str, Any]: return {"signals": self.signals, "provenance": self.provenance} @dataclass class PlanDelta: """Represents incremental updates from a local experiment. - delta: map of changed variables or outcomes - timestamp: ISO timestamp of the delta - note: optional human note """ delta: Dict[str, Any] = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z") note: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return {"delta": self.delta, "timestamp": self.timestamp, "note": self.note} class GraphOfContracts: """Lightweight in-process registry for contracts. This is purposely simple: it stores versioned contracts by a key. It is designed to be extended with cryptographic signing and remote transport later. """ def __init__(self) -> None: self._contracts: Dict[str, Dict[str, Any]] = {} def register_contract(self, name: str, contract: Dict[str, Any], version: str = "1.0") -> None: self._contracts[name] = {"version": version, "contract": contract} def get_contract(self, name: str) -> Optional[Dict[str, Any]]: entry = self._contracts.get(name) if not entry: return None # return a shallow copy to avoid external mutation return {"version": entry["version"], "contract": dict(entry["contract"])} # --- Registry helpers for production-grade usage --- def list_contracts(self) -> Dict[str, Dict[str, Any]]: """Return a shallow copy of the entire registry for inspection. This enables lightweight auditing and debugging tooling to enumerate known contracts without mutating the registry. """ return {k: {"version": v["version"], "contract": dict(v["contract"])} for k, v in self._contracts.items()} def unregister_contract(self, name: str) -> bool: """Remove a contract by name from the registry. Returns True if the contract existed and was removed, False otherwise. This operation is non-destructive to other contracts. """ if name in self._contracts: del self._contracts[name] return True return False