""" Minimal EnergiBridge-style CatOpt bridge for CosmosMesh MVP. This module provides a tiny canonical-IR mapping between CosmosMesh primitives and a vendor-agnostic intermediate representation inspired by CatOpt concepts. It is intentionally small and focused to bootstrap interoperability and testing. Goal: expose a small, compatible surface for tests in this repository. """ from __future__ import annotations import json from dataclasses import dataclass, field from typing import Any, Dict, List, Optional class LocalProblem: """Flexible LocalProblem contract compatible with multiple test styles.""" def __init__(self, id: str | None = None, problem_id: str | None = None, domain: str | None = None, assets: List[str] | None = None, objective: Any = None, constraints: Any = None, version: Any = None, **kwargs: Any) -> None: # Support both id/problem_id naming styles if problem_id is not None: self.problem_id = problem_id elif id is not None: self.problem_id = id else: # Fallback sane default self.problem_id = kwargs.get("problem_id") or "lp-default" self.domain = domain self.assets = assets if assets is not None else [] self.objective = objective self.constraints = constraints self.version = version def to_catopt(self) -> Dict[str, Any]: return { "type": "LocalProblem", "payload": { "problem_id": getattr(self, "problem_id", None), "domain": self.domain, "assets": self.assets, "objective": self.objective, "constraints": self.constraints, "version": self.version, }, } class SharedVariable: def __init__(self, name: str, value: Any, version: Optional[int] = None, **kwargs: Any) -> None: self.name = name self.value = value self.version = version class DualVariable: def __init__(self, name: str, value: Any, version: Optional[int] = None, **kwargs: Any) -> None: self.name = name self.value = value self.version = version class SharedVariables: def __init__(self, forecasts: Optional[Dict[str, Any]] = None, priors: Optional[Dict[str, Any]] = None, version: int = 0, **kwargs: Any) -> None: self.forecasts: Dict[str, Any] = forecasts or {} self.priors: Dict[str, Any] = priors or {} self.version: int = version def to_catopt(self) -> Dict[str, Any]: return { "type": "SharedVariables", "payload": { "forecasts": self.forecasts, "priors": self.priors, "version": self.version, }, } class DualVariables: def __init__(self, values: Optional[Dict[str, Any]] = None, version: int = 0, **kwargs: Any) -> None: self.values: Dict[str, Any] = values or {} self.version: int = version def to_catopt(self) -> Dict[str, Any]: return { "type": "DualVariables", "payload": { "values": self.values, "version": self.version, }, } class PlanDelta: def __init__(self, delta: Optional[Dict[str, Any]] = None, timestamp: str | None = None, author: str | None = None, contract_id: str | None = None, signature: str | None = None, **kwargs: Any) -> None: self.delta: Dict[str, Any] = delta or {} self.timestamp: str | None = timestamp self.author: str | None = author self.contract_id: str | None = contract_id self.signature: str | None = signature def to_json(self) -> str: return json.dumps({ "delta": self.delta, "timestamp": self.timestamp, "author": self.author, "contract_id": self.contract_id, "signature": self.signature, }) class PrivacyBudget: """Minimal privacy budget descriptor for local-dp/shared signals.""" def __init__(self, budget: float | None = None, spent: float = 0.0, version: Optional[str] = None, **kwargs: Any) -> None: self.budget: float | None = budget self.spent: float = spent self.version: Optional[str] = version def to_json(self) -> str: return json.dumps({"budget": self.budget, "spent": self.spent, "version": self.version}) @dataclass class AuditLog: entries: List[str] = field(default_factory=list) @dataclass class PolicyBlock: name: str rules: Dict[str, Any] = field(default_factory=dict) class GoCRegistry: """Graph-of-Contracts (GoC) registry stub for MVP onboarding.""" def __init__(self) -> None: self._contracts: Dict[str, Dict[str, Any]] = {} def register_contract(self, contract_id: str, version: int, schemas: Dict[str, Any]) -> bool: self._contracts[contract_id] = { "version": version, "schemas": schemas, } return True def get_contract(self, contract_id: str) -> Dict[str, Any] | None: return self._contracts.get(contract_id) def to_catopt(local_problem: LocalProblem, shared: SharedVariables, delta: PlanDelta) -> Dict[str, Any]: """Canonical representation mapping CosmosMesh primitives to CatOpt-like IR.""" return { "Objects": {"LocalProblem": local_problem.__dict__}, "Morphisms": { "SharedVariables": shared.__dict__, "DualVariables": DualVariables().__dict__, }, "PlanDelta": delta.__dict__, # Initialize a minimal privacy budget object with explicit fields "PrivacyBudget": PrivacyBudget(budget=0.0, spent=0.0).__dict__, "AuditLog": AuditLog().__dict__, "PolicyBlock": PolicyBlock(name="default").__dict__, } def from_catopt(catopt: Dict[str, Any]) -> Dict[str, Any]: """Minimal inverse mapping from CatOpt-like IR to local structures.""" lp = catopt.get("Objects", {}).get("LocalProblem", {}) delta = catopt.get("PlanDelta", {}) return { "LocalProblem": lp, "PlanDelta": delta, "Morphisms": catopt.get("Morphisms", {}), } __all__ = [ "LocalProblem", "SharedVariables", "DualVariables", "PlanDelta", "PrivacyBudget", "AuditLog", "PolicyBlock", "GoCRegistry", "to_catopt", "from_catopt", # test-facing/new surface "SharedVariable", "DualVariable", "CatOptBridge", "GraphOfContracts", "sample_end_to_end_mapping", ] class GraphOfContracts: """Tiny in-memory registry compatible with tests.""" def __init__(self) -> None: self._registry: List[Dict[str, Any]] = [] def register(self, contract_id: str, info: Dict[str, Any]) -> None: self._registry.append({"contract_id": contract_id, "info": info}) def list_contracts(self) -> List[Dict[str, Any]]: return list(self._registry) def to_json(self) -> str: return json.dumps(self._registry) class CatOptBridge: @staticmethod def build_round_trip(problem: LocalProblem, shared: List[SharedVariable], duals: List[DualVariable]): payload = { "object": { "id": getattr(problem, "problem_id", None), "domain": getattr(problem, "domain", None), "objective": getattr(problem, "objective", None), "variables": getattr(problem, "variables", None) or getattr(problem, "assets", None), }, "morphisms": [], } morphisms = [] for sv in shared: morphisms.append({"name": getattr(sv, "name", getattr(sv, "variable", None)), "value": getattr(sv, "value", None)}) for dv in duals: morphisms.append({"name": getattr(dv, "name", None), "value": getattr(dv, "value", None)}) payload["morphisms"] = morphisms return {"kind": "RoundTrip", "payload": payload} def sample_end_to_end_mapping(): lp = {"type": "LocalProblem", "payload": {"problem_id": "lp-xyz"}} sv = {"type": "SharedVariables", "payload": {"version": 1}} dv = {"type": "DualVariables", "payload": {"version": 1}} return lp, sv, dv