106 lines
3.8 KiB
Python
106 lines
3.8 KiB
Python
"""Contract primitives for OpenGrowth federated experiments.
|
|
|
|
A tiny, production-friendly DSL-inspired layer that models local experiments,
|
|
shared signals and plan deltas. This provides a foundation for a Graph-of-Contracts
|
|
style federation layer without pulling in heavyweight dependencies.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from typing import Any, Dict, Optional
|
|
|
|
|
|
@dataclass
|
|
class LocalExperiment:
|
|
"""Represents a local experiment definition.
|
|
|
|
- name: human-friendly name of the experiment
|
|
- variables: mapping of controllable variables (e.g., price, onboarding steps)
|
|
- privacy_budget: optional privacy budget hint for local DP or aggregation limits
|
|
- metadata: optional extra information
|
|
"""
|
|
name: str
|
|
variables: Dict[str, Any] = field(default_factory=dict)
|
|
privacy_budget: Optional[Dict[str, Any]] = None
|
|
metadata: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"name": self.name,
|
|
"variables": self.variables,
|
|
"privacy_budget": self.privacy_budget,
|
|
"metadata": self.metadata,
|
|
}
|
|
|
|
|
|
@dataclass
|
|
class SharedSignals:
|
|
"""Represents signals shared after local aggregation (aggregated metrics).
|
|
|
|
- signals: a dictionary of metric_name -> aggregated_value
|
|
- provenance: optional metadata about sources
|
|
"""
|
|
signals: Dict[str, Any] = field(default_factory=dict)
|
|
provenance: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {"signals": self.signals, "provenance": self.provenance}
|
|
|
|
|
|
@dataclass
|
|
class PlanDelta:
|
|
"""Represents incremental updates from a local experiment.
|
|
|
|
- delta: map of changed variables or outcomes
|
|
- timestamp: ISO timestamp of the delta
|
|
- note: optional human note
|
|
"""
|
|
delta: Dict[str, Any] = field(default_factory=dict)
|
|
timestamp: str = field(default_factory=lambda: datetime.utcnow().isoformat() + "Z")
|
|
note: Optional[str] = None
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {"delta": self.delta, "timestamp": self.timestamp, "note": self.note}
|
|
|
|
|
|
class GraphOfContracts:
|
|
"""Lightweight in-process registry for contracts.
|
|
|
|
This is purposely simple: it stores versioned contracts by a key. It is
|
|
designed to be extended with cryptographic signing and remote transport later.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
self._contracts: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def register_contract(self, name: str, contract: Dict[str, Any], version: str = "1.0") -> None:
|
|
self._contracts[name] = {"version": version, "contract": contract}
|
|
|
|
def get_contract(self, name: str) -> Optional[Dict[str, Any]]:
|
|
entry = self._contracts.get(name)
|
|
if not entry:
|
|
return None
|
|
# return a shallow copy to avoid external mutation
|
|
return {"version": entry["version"], "contract": dict(entry["contract"])}
|
|
|
|
# --- Registry helpers for production-grade usage ---
|
|
def list_contracts(self) -> Dict[str, Dict[str, Any]]:
|
|
"""Return a shallow copy of the entire registry for inspection.
|
|
|
|
This enables lightweight auditing and debugging tooling to enumerate
|
|
known contracts without mutating the registry.
|
|
"""
|
|
return {k: {"version": v["version"], "contract": dict(v["contract"])} for k, v in self._contracts.items()}
|
|
|
|
def unregister_contract(self, name: str) -> bool:
|
|
"""Remove a contract by name from the registry.
|
|
|
|
Returns True if the contract existed and was removed, False otherwise.
|
|
This operation is non-destructive to other contracts.
|
|
"""
|
|
if name in self._contracts:
|
|
del self._contracts[name]
|
|
return True
|
|
return False
|