41 lines
1.4 KiB
Python
41 lines
1.4 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Dict, Any
|
|
|
|
from citygrid import LocalProblem, SharedVariables
|
|
|
|
|
|
class EnergiBridge:
|
|
def __init__(self, registry=None):
|
|
# Optional registry for adapters; kept for compatibility with tests
|
|
self.registry = registry
|
|
|
|
"""Lightweight bridge translating between adapters and the canonical IR.
|
|
|
|
This is intentionally small: it provides two helpers to map between a
|
|
simplistic adapter payload (local problem) and the canonical LocalProblem/SharedVariables
|
|
structures used by the solver.
|
|
"""
|
|
|
|
@staticmethod
|
|
def to_canonical(local_problem: Dict[str, Any]) -> LocalProblem:
|
|
return LocalProblem(
|
|
id=local_problem.get("id", "lp-unknown"),
|
|
domain=local_problem.get("domain", "unknown"),
|
|
assets=local_problem.get("assets", []),
|
|
objective=local_problem.get("objective", {}),
|
|
constraints=local_problem.get("constraints", {}),
|
|
solver_hint=local_problem.get("solver_hint"),
|
|
)
|
|
|
|
@staticmethod
|
|
def from_canonical(shared: SharedVariables) -> Dict[str, Any]:
|
|
return {"version": shared.version, "signals": shared.signals}
|
|
|
|
# Compatibility helper: translate a canonical LocalProblem into an external message
|
|
def to_external_message(self, local_problem: LocalProblem) -> Dict[str, Any]:
|
|
return {
|
|
"type": "LocalProblem",
|
|
"id": local_problem.id,
|
|
}
|