citygrid-policy-driven-fede.../citygrid/bridge/energi_bridge.py

41 lines
1.4 KiB
Python

from __future__ import annotations
from typing import Dict, Any
from citygrid import LocalProblem, SharedVariables
class EnergiBridge:
def __init__(self, registry=None):
# Optional registry for adapters; kept for compatibility with tests
self.registry = registry
"""Lightweight bridge translating between adapters and the canonical IR.
This is intentionally small: it provides two helpers to map between a
simplistic adapter payload (local problem) and the canonical LocalProblem/SharedVariables
structures used by the solver.
"""
@staticmethod
def to_canonical(local_problem: Dict[str, Any]) -> LocalProblem:
return LocalProblem(
id=local_problem.get("id", "lp-unknown"),
domain=local_problem.get("domain", "unknown"),
assets=local_problem.get("assets", []),
objective=local_problem.get("objective", {}),
constraints=local_problem.get("constraints", {}),
solver_hint=local_problem.get("solver_hint"),
)
@staticmethod
def from_canonical(shared: SharedVariables) -> Dict[str, Any]:
return {"version": shared.version, "signals": shared.signals}
# Compatibility helper: translate a canonical LocalProblem into an external message
def to_external_message(self, local_problem: LocalProblem) -> Dict[str, Any]:
return {
"type": "LocalProblem",
"id": local_problem.id,
}