diff --git a/src/cosmosmesh_privacy_preserving_federated/energi_bridge.py b/src/cosmosmesh_privacy_preserving_federated/energi_bridge.py index 1e2c139..beeaffe 100644 --- a/src/cosmosmesh_privacy_preserving_federated/energi_bridge.py +++ b/src/cosmosmesh_privacy_preserving_federated/energi_bridge.py @@ -1,7 +1,10 @@ from __future__ import annotations -from dataclasses import dataclass, asdict -from typing import Dict, Any, List +from dataclasses import dataclass +from typing import Dict, Any, Optional, List + +# Reuse core CatOpt primitives for minimal compatibility in MVP +from .catopt_bridge import LocalProblem, SharedVariable, DualVariable, PlanDelta @dataclass @@ -13,7 +16,23 @@ class LocalProblemEP: data_contracts: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: - return asdict(self) + return { + "problem_id": self.problem_id, + "assets": self.assets, + "objective": self.objective, + "constraints": self.constraints, + "data_contracts": self.data_contracts, + } + + +@dataclass +class EnergiSignal: + channel: str + value: Any + version: int = 0 + + def to_dict(self) -> Dict[str, Any]: + return {"channel": self.channel, "value": self.value, "version": self.version} @dataclass @@ -21,71 +40,65 @@ class SharedVariableEP: channel: str version: int payload: Dict[str, Any] - def to_dict(self) -> Dict[str, Any]: return {"channel": self.channel, "version": self.version, "payload": self.payload} - @dataclass class DualVariableEP: channel: str version: int payload: Dict[str, Any] - def to_dict(self) -> Dict[str, Any]: return {"channel": self.channel, "version": self.version, "payload": self.payload} - @dataclass class PlanDeltaEP: delta_id: str changes: Dict[str, Any] timestamp: float - def to_dict(self) -> Dict[str, Any]: return {"delta_id": self.delta_id, "changes": self.changes, "timestamp": self.timestamp} class EnergiBridge: - """Minimal EnergiBridge for cross-domain interoperability (MVP). + """Minimal EnergiBridge canonical bridge implementation. - Exposes a to_ir method that translates a local problem and its signals into - a canonical IR suitable for adapters in other domains. + Maps CosmosMesh primitives into a vendor-agnostic Intermediary Representation + suitable for cross-domain adapters. """ - def __init__(self) -> None: - pass + def to_energi( + self, + lp: LocalProblemEP, + shared: List[SharedVariableEP], + duals: List[DualVariableEP], + deltas: Optional[List[PlanDeltaEP]] = None, + ) -> Dict[str, Any]: + data = { + "Version": "0.1", + "Objects": { + "LocalProblem": lp.to_dict(), + }, + "Morphisms": [ + {"Morphisms": {"SharedVariable": sv.to_dict()}} for sv in shared + ] + [ + {"Morphisms": {"DualVariable": dv.to_dict()}} for dv in duals + ], + } + if deltas: + data["Objects"]["PlanDeltas"] = [p.to_dict() for p in deltas] + return data + # Backwards-compatible API expected by existing tests def to_ir( self, lp: LocalProblemEP, shared: List[SharedVariableEP], duals: List[DualVariableEP], - deltas: List[PlanDeltaEP], + deltas: Optional[List[PlanDeltaEP]] = None, ) -> Dict[str, Any]: - morphs = [] - for s in shared: - morphs.append({"Morphisms": {"SharedVariable": s.channel, "version": s.version, "payload": s.payload}}) - for d in duals: - morphs.append({"Morphisms": {"DualVariable": d.channel, "version": d.version, "payload": d.payload}}) + return self.to_energi(lp, shared, duals, deltas) - # Lightweight per-round timing/provenance metadata aligned with the MVP" - time_monoid = {"round": 1, "timestamp": None} - metadata = {"generated_by": "EnergiBridge", "timestamp": None} - return { - "Version": "0.1", - "Objects": { - "LocalProblem": { - "problem_id": lp.problem_id, - "assets": lp.assets, - "objective": lp.objective, - "constraints": lp.constraints, - "data_contracts": lp.data_contracts, - }, - "PlanDeltas": [p.to_dict() for p in deltas], - }, - "Morphisms": morphs, - "TimeMonoid": time_monoid, - "Metadata": metadata, - } + +__all__ = ["EnergiBridge", "EnergiLocalProblem", "EnergiSignal"]