energiamesh-federated-contr.../src/energiamesh/catopt_bridge.py

75 lines
2.5 KiB
Python

"""Minimal CatOpt bridge for EnergiaMesh primitives.
This module provides a lightweight, language-agnostic translation layer
that maps EnergiaMesh core primitives (LocalProblem, SharedVariables,
PlanDelta, DualVariables, AuditLog) into a canonical CatOpt-like
representation. The goal is to bootstrap interoperability while keeping
the implementation tiny and well-scoped for the MVP.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict
from energiamesh.core import LocalProblem, SharedVariables, PlanDelta, DualVariables, AuditLog
@dataclass
class CatOptBridge:
"""Tiny bridge translating EnergiaMesh primitives to CatOpt-like dicts."""
version: str = "0.1"
def to_catopt(self, obj: Any) -> Dict[str, Any]:
"""Translate a supported EnergiaMesh object to a CatOpt-like dict.
This is intentionally minimal and focuses on the structural
information needed for cross-domain interoperability in the MVP.
"""
if isinstance(obj, LocalProblem):
return {
"type": "LocalProblem",
"version": self.version,
"site_id": obj.site_id,
"objective": obj.objective,
"variables": obj.variables,
"constraints": obj.constraints,
"status": obj.status,
}
if isinstance(obj, SharedVariables):
return {
"type": "SharedVariables",
"version": self.version,
"signals": obj.signals,
"version_tag": obj.version,
"timestamp": obj.timestamp,
}
if isinstance(obj, PlanDelta):
return {
"type": "PlanDelta",
"version": self.version,
"delta_id": obj.delta_id,
"updates": obj.updates,
"metadata": obj.metadata,
"timestamp": obj.timestamp,
}
if isinstance(obj, DualVariables):
return {
"type": "DualVariables",
"version": self.version,
"multipliers": obj.multipliers,
"primal": obj.primal,
"timestamp": obj.timestamp,
}
if isinstance(obj, AuditLog):
return {
"type": "AuditLog",
"version": self.version,
"entries": obj.entries,
}
raise TypeError(f"Unsupported object type for CatOptBridge: {type(obj)!r}")
__all__ = ["CatOptBridge"]