diff --git a/src/cosmosmesh_privacy_preserving_federated/contracts.py b/src/cosmosmesh_privacy_preserving_federated/contracts.py new file mode 100644 index 0000000..1f05339 --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated/contracts.py @@ -0,0 +1,155 @@ +"""Contracts and registry scaffolding for CosmosMesh MVP. + +This module provides lightweight data-contract primitives used by the +privacy-preserving federated planning MVP. The goal is to offer a minimal, +well-typed surface to express LocalProblem data, shared/delta signals, and +auditability metadata that can be exchanged between agents. +""" +from __future__ import annotations + +from dataclasses import dataclass, asdict +from typing import Any, Dict, Optional +import json + + +@dataclass +class LocalProblem: + """Represents a per-agent optimization problem contract. + + - agent_id: unique identifier for the agent + - variables: mapping of variable names to current values (e.g., decision vars) + - objective: a simple representation of the objective (coefficients or metadata) + - constraints: opaque dict describing constraints (kept generic for MVP) + - version: optional version of the contract for auditing + """ + + agent_id: str + variables: Dict[str, float] + objective: Dict[str, float] + constraints: Dict[str, Any] + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "LocalProblem": + data = json.loads(payload) + return LocalProblem(**data) # type: ignore[arg-type] + + +@dataclass +class SharedVariables: + """Container for signals shared across agents (e.g., primal variables).""" + + variables: Dict[str, float] + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "SharedVariables": + data = json.loads(payload) + return SharedVariables(**data) # type: ignore[arg-type] + + +@dataclass +class DualVariables: + """Dual variables carried alongside shared variables.""" + + values: Dict[str, float] + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "DualVariables": + data = json.loads(payload) + return DualVariables(**data) # type: ignore[arg-type] + + +@dataclass +class PlanDelta: + """Represents a delta/patch to a previously computed plan.""" + + agent_id: str + delta: Dict[str, Any] + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "PlanDelta": + data = json.loads(payload) + return PlanDelta(**data) # type: ignore[arg-type] + + +@dataclass +class PrivacyBudget: + """Minimal privacy budget descriptor for local-dp/shared signals.""" + + budget: float + spent: float = 0.0 + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "PrivacyBudget": + data = json.loads(payload) + return PrivacyBudget(**data) # type: ignore[arg-type] + + +@dataclass +class AuditLog: + """Auditable event payload used for replay protection and tracing.""" + + event: str + metadata: Dict[str, Any] + timestamp: float + version: Optional[str] = None + + def to_json(self) -> str: + return json.dumps(asdict(self)) + + @staticmethod + def from_json(payload: str) -> "AuditLog": + data = json.loads(payload) + return AuditLog(**data) # type: ignore[arg-type] + + +class GraphOfContracts: + """A tiny registry that maps contract names to versioned schemas. + + This is intentionally lightweight and intended for MVP scaffolding. It + allows registering known contract names with a version string and + retrieving their canonical representation. The actual schema registry would + be a separate service in a full implementation. + """ + + def __init__(self) -> None: + self._registry: Dict[str, str] = {} + + def register(self, name: str, version: str) -> None: + self._registry[str(name)] = str(version) + + def get_version(self, name: str) -> Optional[str]: + return self._registry.get(name) + + def to_json(self) -> str: + return json.dumps(self._registry) + + +__all__ = [ + "LocalProblem", + "SharedVariables", + "DualVariables", + "PlanDelta", + "PrivacyBudget", + "AuditLog", + "GraphOfContracts", +] diff --git a/tests/contracts_test.py b/tests/contracts_test.py new file mode 100644 index 0000000..b7bc0b6 --- /dev/null +++ b/tests/contracts_test.py @@ -0,0 +1,32 @@ +import json +import time + +from cosmosmesh_privacy_preserving_federated.contracts import LocalProblem, GraphOfContracts + + +def test_local_problem_json_roundtrip(): + lp = LocalProblem( + agent_id="rover-1", + variables={"x": 1.0, "y": 2.0}, + objective={"coef": 0.5}, + constraints={"type": "ineq", "limit": 10.0}, + version="v0.1", + ) + payload = lp.to_json() + lp2 = LocalProblem.from_json(payload) + assert lp2.agent_id == lp.agent_id + assert lp2.variables == lp.variables + assert lp2.version == lp.version + + +def test_graph_of_contracts_registry(): + g = GraphOfContracts() + g.register("LocalProblem", "v0.1") + g.register("PlanDelta", "v0.2") + assert g.get_version("LocalProblem") == "v0.1" + assert g.get_version("PlanDelta") == "v0.2" + # ensure JSON export works and is parseable + data = g.to_json() + assert isinstance(data, str) + parsed = json.loads(data) + assert "LocalProblem" in parsed