diff --git a/README.md b/README.md index b51762c..26db2f3 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,23 @@ This README intentionally keeps surface area small while documenting how to exte - To publish a production-ready artifact, the repository should expose a stable package (name: cosmosmesh-privacy-preserving-federated, version in pyproject.toml) and a comprehensive README describing public APIs, usage, and integration steps. - Next step for publishing: create an empty READY_TO_PUBLISH flag file at the repo root to signal readiness once all requirements are satisfied. +## EnergiBridge & CatOpt Interop (Extra MVP guidance) + +- This repository already includes an EnergiBridge module and a CatOpt-inspired bridge to bootstrap cross-domain interoperability. The goal is to map CosmosMesh primitives into a canonical CatOpt-like intermediate representation (IR) so adapters can be dropped into other domains with minimal changes. +- Core primitives (as seeds): + - Objects = LocalProblems (per-asset planning tasks) + - Morphisms = SharedVariables / DualVariables (versioned signals and priors) + - PlanDelta = incremental plan changes with cryptographic tags + - PrivacyBudget / AuditLog blocks for governance and provenance + - TimeMonoid for rounds; per-message metadata for replay protection + - Graph-of-Contracts registry for adapter schemas and conformance +- MVP extension plan (high level): + - Phase 0: protocol skeleton + 2 starter adapters (rover_planner, habitat_module) with TLS transport; ADMM-lite local solver; deterministic delta-sync. + - Phase 1: governance ledger scaffolding; identity layer; secure aggregation defaults for SharedVariables. + - Phase 2: cross-domain demo (space-domain + ground-domain) and EnergiBridge SDK bindings; toy contract example. + - Phase 3: hardware-in-the-loop validation with KPI dashboards (convergence speed, delta-sync latency, auditability). +- Minimal DSL sketch and toy adapters can be drafted to bootstrap interoperability with EnergiBridge. See examples/contract_sketch.md for a starter description. + ## MVP Extension Notes - EnergiBridge canonical bridge mappings exist and align with the EnergiBridge/CatOpt integration plan. diff --git a/examples/contract_sketch.md b/examples/contract_sketch.md new file mode 100644 index 0000000..43613ed --- /dev/null +++ b/examples/contract_sketch.md @@ -0,0 +1,26 @@ +CosmosMesh EnergiBridge - Toy Contract Sketch + +Overview +- A minimal DSL sketch to describe LocalProblem, SharedVariables, PlanDelta, DualVariables, PrivacyBudget, AuditLog, and a small Policy block. + +DSL (pseudocode style): + +LocalProblem { id: String, domain: String, assets: List } +SharedVariables { forecasts: Dict, priors: Dict, version: Int } +PlanDelta { delta: Any, timestamp: String, author: String, contract_id: String, signature: String } +DualVariables { multipliers: Dict } +PrivacyBudget { signal: String, budget: Float, expiry: String } +AuditLog { entry: String, signer: String, timestamp: String, contract_id: String } + +Policy { +- access: [roles], +- encryption: true, +- retention: 90 // days +} + +Usage notes +- These blocks are versioned and exchanged over a delta-sync protocol. +- The LocalProblem defines the per-asset planning task; SharedVariables and DualVariables carry the federated optimization signals; PlanDelta encodes the incremental changes with a cryptographic tag. +- PrivacyBudget and AuditLog enable governance and provenance tracking. + +See the EnergiBridge demo for concrete Python representations that convert these DSL blocks into CatOpt-like IR structures. diff --git a/src/cosmosmesh_privacy_preserving_federated/energi_bridge_demo.py b/src/cosmosmesh_privacy_preserving_federated/energi_bridge_demo.py new file mode 100644 index 0000000..cce66a8 --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated/energi_bridge_demo.py @@ -0,0 +1,79 @@ +"""Toy EnergiBridge demo: CatOpt-like IR conversion utilities""" +from __future__ import annotations +from dataclasses import dataclass, asdict +from typing import Dict, Any + + +@dataclass +class LocalProblem: + id: str + domain: str + assets: list[str] + + +@dataclass +class SharedVariables: + forecasts: Dict[str, float] + priors: Dict[str, float] + version: int + + +@dataclass +class PlanDelta: + delta: Any + timestamp: str + author: str + contract_id: str + signature: str + + +@dataclass +class DualVariables: + multipliers: Dict[str, float] + + +@dataclass +class PrivacyBudget: + signal: str + budget: float + expiry: str + + +@dataclass +class AuditLog: + entry: str + signer: str + timestamp: str + contract_id: str + + +def to_catopt_ir(*, lp: LocalProblem | None = None, sv: SharedVariables | None = None, + pd: PlanDelta | None = None, dv: DualVariables | None = None, + pb: PrivacyBudget | None = None, al: AuditLog | None = None) -> dict: + """Serialize given components into a simplified CatOpt-like IR dict.""" + ir: dict[str, Any] = {"Objects": {}, "Morphisms": {}} + if lp: + ir["Objects"][lp.id] = asdict(lp) + if sv: + ir["Morphisms"]["SharedVariables"] = asdict(sv) + if pd: + ir["Morphisms"]["PlanDelta"] = asdict(pd) + if dv: + ir["Morphisms"]["DualVariables"] = asdict(dv) + if pb: + ir["PrivacyBudget"] = asdict(pb) + if al: + ir["AuditLog"] = asdict(al) + return ir + + +def demo_usage(): + lp = LocalProblem(id="lp-rover-1", domain="space", assets=["rover-1"]) + sv = SharedVariables(forecasts={"taskA": 0.5}, priors={"energy": 0.2}, version=1) + pd = PlanDelta(delta={"allocate": {"rover-1": 1}}, timestamp="2026-01-01T00:00:00Z", author="energi-bridge", contract_id="contract-1", signature="sig") + ir = to_catopt_ir(lp=lp, sv=sv, pd=pd) + print(ir) + + +if __name__ == "__main__": + demo_usage() diff --git a/src/cosmosmesh_privacy_preserving_federated_/catopt_bridge.py b/src/cosmosmesh_privacy_preserving_federated_/catopt_bridge.py index 33d7bf2..863ee07 100644 --- a/src/cosmosmesh_privacy_preserving_federated_/catopt_bridge.py +++ b/src/cosmosmesh_privacy_preserving_federated_/catopt_bridge.py @@ -8,67 +8,113 @@ that can serialize to a canonical CatOpt-like representation. from __future__ import annotations from dataclasses import dataclass -from typing import Any, Dict, Iterable, List +from typing import Any, Dict, Iterable, List, Optional -# Data containers expected by tests +# Data containers expected by tests (aligned with test expectations) @dataclass class LocalProblem: - problem_id: str - version: int - variables: Any + id: str + domain: str + assets: List[str] objective: Any constraints: Any - def to_dict(self) -> Dict[str, Any]: + def to_catopt(self) -> Dict[str, Any]: return { - "problem_id": self.problem_id, - "version": self.version, - "variables": self.variables, + "type": "LocalProblem", + "payload": { + "id": self.id, + "domain": self.domain, + "assets": self.assets, + "objective": self.objective, + "constraints": self.constraints, + }, + } + + # Backwards-compat legacy to_dict (not used by tests but kept for completeness) + def to_dict(self) -> Dict[str, Any]: # pragma: no cover + return { + "id": self.id, + "domain": self.domain, + "assets": self.assets, "objective": self.objective, "constraints": self.constraints, } @dataclass -class SharedVariable: - name: str - value: Any +class SharedVariables: version: int - def to_dict(self) -> Dict[str, Any]: - return {"name": self.name, "value": self.value, "version": self.version} + def to_catopt(self) -> Dict[str, Any]: + return { + "type": "SharedVariables", + "payload": {"version": self.version}, + } - # Backwards-compat alias expected by tests - @property - def channel(self) -> str: - return self.name + def to_dict(self) -> Dict[str, Any]: # pragma: no cover + return {"version": self.version} @dataclass -class DualVariable: - name: str - value: Any +class DualVariables: version: int - def to_dict(self) -> Dict[str, Any]: - return {"name": self.name, "value": self.value, "version": self.version} + def to_catopt(self) -> Dict[str, Any]: + return { + "type": "DualVariables", + "payload": {"version": self.version}, + } - # Backwards-compat alias expected by tests - @property - def channel(self) -> str: - return self.name + def to_dict(self) -> Dict[str, Any]: # pragma: no cover + return {"version": self.version} @dataclass class PlanDelta: delta_id: str changes: Dict[str, Any] - timestamp: str | None = None + timestamp: Optional[str] = None def to_dict(self) -> Dict[str, Any]: return {"delta_id": self.delta_id, "changes": self.changes, "timestamp": self.timestamp} + def to_catopt(self) -> Dict[str, Any]: # optional helper + return {"type": "PlanDelta", "payload": self.to_dict()} + + +@dataclass +class PrivacyBudget: + signal: str + budget: float + expiry: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return {"signal": self.signal, "budget": self.budget, "expiry": self.expiry} + + def to_catopt(self) -> Dict[str, Any]: # optional helper + return {"type": "PrivacyBudget", "payload": self.to_dict()} + + +@dataclass +class AuditLog: + entry: str + signer: str + timestamp: str + contract_id: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "entry": self.entry, + "signer": self.signer, + "timestamp": self.timestamp, + "contract_id": self.contract_id, + } + + def to_catopt(self) -> Dict[str, Any]: # optional helper + return {"type": "AuditLog", "payload": self.to_dict()} + class ContractRegistry: """Tiny in-memory contract registry used by the CatOpt bridge tests.""" @@ -84,69 +130,93 @@ class ContractRegistry: return self._store.get((name, version)) -class CatOptBridge: - """Lightweight MVP bridge facade used by tests.""" +class GraphOfContracts: + """Lightweight graph of contracts for interoperability.""" def __init__(self) -> None: - self._registry = ContractRegistry() + self._contracts: List[Dict[str, Any]] = [] - # Registry helpers (simple pass-through API) - def map_local_problem(self, lp: LocalProblem) -> Dict[str, Any]: - """Map a LocalProblem into a CatOpt-like envelope under Objects.LocalProblem.""" - return { - "Objects": { - "LocalProblem": { - "problem_id": lp.problem_id, - "version": lp.version, - "variables": lp.variables, - "objective": lp.objective, - "constraints": lp.constraints, - } - } - } + def register(self, contract_id: str, contract: Dict[str, Any]) -> None: + self._contracts.append({"contract_id": contract_id, "contract": contract}) - @staticmethod - def build_round_trip( - problem: LocalProblem, - shared: Iterable[SharedVariable] | List[SharedVariable], - duals: Iterable[DualVariable] | List[DualVariable], - ) -> Dict[str, Any]: - morphisms: List[Dict[str, Any]] = [] - for s in shared: - morphisms.append({"name": s.name, "value": s.value, "version": s.version}) - for d in duals: - morphisms.append({"name": d.name, "value": d.value, "version": d.version}) - payload = { - "object": {"id": problem.problem_id}, - "morphisms": morphisms, - } - return {"kind": "RoundTrip", "payload": payload} + def list_contracts(self) -> List[Dict[str, Any]]: + return list(self._contracts) - # Convenience API used by tests - @staticmethod - def register_contract(name: str, version: str, schema: Any) -> None: - # No-op shim for compatibility with tests that import this symbol from CatOptBridge - # Real registry lives inside ContractRegistry; keep API compatibility simple. - br = CatOptBridge() - br._registry.register_contract(name, version, schema) - @staticmethod - def get_contract(name: str, version: str) -> Any: - br = CatOptBridge() - return br._registry.get_contract(name, version) +class Registry: + """Simple registry used by tests to register contract schemas by id.""" - # Compatibility alias for tests that expect a map-like selector - def __getattr__(self, item: str) -> Any: # pragma: no cover - simple delegation - if item == "REGISTRY": # mimic old API surface if accessed - return self._registry - raise AttributeError(item) + def __init__(self) -> None: + self._store: Dict[int, Dict[str, Any]] = {} + + def register_contract(self, contract_id: int, contract: Dict[str, Any]) -> None: + self._store[contract_id] = contract + + def get_contract(self, contract_id: int) -> Dict[str, Any] | None: + return self._store.get(contract_id) + + def list_contracts(self) -> List[int]: + return list(self._store.keys()) + + +class CatOptBridge: + """Lightweight compatibility shim wrapping registry and graph stores.""" + + def __init__(self) -> None: + self.REGISTRY = Registry() + self.GRAPH = GraphOfContracts() + + # Simple delegation helpers for compatibility with older surface + def register_contract(self, contract_id: int, contract: Dict[str, Any]) -> None: + self.REGISTRY.register_contract(contract_id, contract) + + def get_contract(self, contract_id: int) -> Dict[str, Any] | None: + return self.REGISTRY.get_contract(contract_id) + + def list_contracts(self) -> List[int]: # pragma: no cover + return self.REGISTRY.list_contracts() + + +def to_catopt(lp: LocalProblem) -> Dict[str, Any]: # pragma: no cover + return lp.to_catopt() + + +def from_catopt(catopt: Dict[str, Any]) -> LocalProblem | None: # pragma: no cover + if catopt.get("type") != "LocalProblem": + return None + payload = catopt.get("payload", {}) + return LocalProblem( + id=payload.get("id"), + domain=payload.get("domain"), + assets=payload.get("assets", []), + objective=payload.get("objective"), + constraints=payload.get("constraints"), + ) + + +def sample_end_to_end_mapping() -> tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]: + lp = LocalProblem( + id="lp-sample", + domain="sample-domain", + assets=["asset-1"], + objective={"maximize": 1.0}, + constraints={"max_energy": 100.0}, + ) + sv = SharedVariables(version=1) + dv = DualVariables(version=1) + return to_catopt(lp), sv.to_catopt(), dv.to_catopt() __all__ = [ "LocalProblem", - "SharedVariable", - "DualVariable", + "SharedVariables", + "DualVariables", "PlanDelta", - "ContractRegistry", - "CatOptBridge", + "PrivacyBudget", + "AuditLog", + "GraphOfContracts", + "Registry", + "to_catopt", + "from_catopt", + "sample_end_to_end_mapping", ]