From e59304b7d5446967035ece8d57c2586fdb8e4732 Mon Sep 17 00:00:00 2001 From: agent-cb502d7656738cf6 Date: Fri, 17 Apr 2026 09:21:05 +0200 Subject: [PATCH] build(agent): molt-d#cb502d iteration --- .../energi_bridge.py | 91 +++++++++++++++++++ 1 file changed, 91 insertions(+) create mode 100644 src/cosmosmesh_privacy_preserving_federated_/energi_bridge.py diff --git a/src/cosmosmesh_privacy_preserving_federated_/energi_bridge.py b/src/cosmosmesh_privacy_preserving_federated_/energi_bridge.py new file mode 100644 index 0000000..53f6270 --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated_/energi_bridge.py @@ -0,0 +1,91 @@ +"""EnergiBridge: minimal IR translator for CosmosMesh privacy-preserving federation + +This module provides a lightweight translator (to_ir) that converts a LocalProblemEP +and associated signals (SharedVariableEP, DualVariableEP, PlanDeltaEP) into a CatOpt- +style intermediate representation (IR) suitable for MVP testing and adapter wiring. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Dict, List + + +@dataclass +class LocalProblemEP: + problem_id: str + assets: List[str] + objective: Any + constraints: Any + data_contracts: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return { + "problem_id": self.problem_id, + "assets": self.assets, + "objective": self.objective, + "constraints": self.constraints, + "data_contracts": self.data_contracts, + } + + +@dataclass +class SharedVariableEP: + channel: str + version: int + payload: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return {"channel": self.channel, "version": self.version, "payload": self.payload} + + +@dataclass +class DualVariableEP: + channel: str + version: int + payload: Dict[str, Any] + + def to_dict(self) -> Dict[str, Any]: + return {"channel": self.channel, "version": self.version, "payload": self.payload} + + +@dataclass +class PlanDeltaEP: + delta_id: str + changes: Dict[str, Any] + timestamp: float | None = None + + def to_dict(self) -> Dict[str, Any]: + return {"delta_id": self.delta_id, "changes": self.changes, "timestamp": self.timestamp} + + +class EnergiBridge: + """Small translator that emits a CatOpt-like IR for a given problem and signals.""" + + def to_ir( + self, + lp: LocalProblemEP, + sv: List[SharedVariableEP], + dv: List[DualVariableEP], + deltas: List[PlanDeltaEP], + ) -> Dict[str, Any]: + # Build morphisms: one entry per signal (SharedVariable or DualVariable) + morphisms: List[Dict[str, Any]] = [] + for s in sv: + morphisms.append({"Morphisms": {"SharedVariable": s.to_dict()}}) + for d in dv: + morphisms.append({"Morphisms": {"DualVariable": d.to_dict()}}) + + ir: Dict[str, Any] = { + "Version": "0.1", + "Objects": { + "LocalProblem": lp.to_dict(), + # Plan deltas are kept as a list for audit/replay in the IR + "PlanDeltas": [delta.to_dict() for delta in deltas], + }, + "Morphisms": morphisms, + } + return ir + + +__all__ = ["LocalProblemEP", "SharedVariableEP", "DualVariableEP", "PlanDeltaEP", "EnergiBridge"]