247 lines
8.1 KiB
Python
247 lines
8.1 KiB
Python
"""
|
|
Minimal EnergiBridge-style CatOpt bridge for CosmosMesh MVP.
|
|
|
|
This module provides a tiny canonical-IR mapping between CosmosMesh primitives
|
|
and a vendor-agnostic intermediate representation inspired by CatOpt concepts.
|
|
It is intentionally small and focused to bootstrap interoperability and testing.
|
|
|
|
Goal: expose a small, compatible surface for tests in this repository.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from dataclasses import dataclass, field
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
class LocalProblem:
|
|
"""Flexible LocalProblem contract compatible with multiple test styles."""
|
|
def __init__(self, id: str | None = None, problem_id: str | None = None,
|
|
domain: str | None = None, assets: List[str] | None = None,
|
|
objective: Any = None, constraints: Any = None, version: Any = None,
|
|
**kwargs: Any) -> None:
|
|
# Support both id/problem_id naming styles
|
|
if problem_id is not None:
|
|
self.problem_id = problem_id
|
|
elif id is not None:
|
|
self.problem_id = id
|
|
else:
|
|
# Fallback sane default
|
|
self.problem_id = kwargs.get("problem_id") or "lp-default"
|
|
|
|
self.domain = domain
|
|
self.assets = assets if assets is not None else []
|
|
self.objective = objective
|
|
self.constraints = constraints
|
|
self.version = version
|
|
|
|
def to_catopt(self) -> Dict[str, Any]:
|
|
return {
|
|
"type": "LocalProblem",
|
|
"payload": {
|
|
"problem_id": getattr(self, "problem_id", None),
|
|
"domain": self.domain,
|
|
"assets": self.assets,
|
|
"objective": self.objective,
|
|
"constraints": self.constraints,
|
|
"version": self.version,
|
|
},
|
|
}
|
|
|
|
|
|
class SharedVariable:
|
|
def __init__(self, name: str, value: Any, version: Optional[int] = None, **kwargs: Any) -> None:
|
|
self.name = name
|
|
self.value = value
|
|
self.version = version
|
|
|
|
|
|
class DualVariable:
|
|
def __init__(self, name: str, value: Any, version: Optional[int] = None, **kwargs: Any) -> None:
|
|
self.name = name
|
|
self.value = value
|
|
self.version = version
|
|
|
|
class SharedVariables:
|
|
def __init__(self, forecasts: Optional[Dict[str, Any]] = None,
|
|
priors: Optional[Dict[str, Any]] = None, version: int = 0, **kwargs: Any) -> None:
|
|
self.forecasts: Dict[str, Any] = forecasts or {}
|
|
self.priors: Dict[str, Any] = priors or {}
|
|
self.version: int = version
|
|
|
|
def to_catopt(self) -> Dict[str, Any]:
|
|
return {
|
|
"type": "SharedVariables",
|
|
"payload": {
|
|
"forecasts": self.forecasts,
|
|
"priors": self.priors,
|
|
"version": self.version,
|
|
},
|
|
}
|
|
|
|
|
|
class DualVariables:
|
|
def __init__(self, values: Optional[Dict[str, Any]] = None, version: int = 0, **kwargs: Any) -> None:
|
|
self.values: Dict[str, Any] = values or {}
|
|
self.version: int = version
|
|
|
|
def to_catopt(self) -> Dict[str, Any]:
|
|
return {
|
|
"type": "DualVariables",
|
|
"payload": {
|
|
"values": self.values,
|
|
"version": self.version,
|
|
},
|
|
}
|
|
|
|
|
|
class PlanDelta:
|
|
def __init__(self, delta: Optional[Dict[str, Any]] = None, timestamp: str | None = None,
|
|
author: str | None = None, contract_id: str | None = None,
|
|
signature: str | None = None, **kwargs: Any) -> None:
|
|
self.delta: Dict[str, Any] = delta or {}
|
|
self.timestamp: str | None = timestamp
|
|
self.author: str | None = author
|
|
self.contract_id: str | None = contract_id
|
|
self.signature: str | None = signature
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps({
|
|
"delta": self.delta,
|
|
"timestamp": self.timestamp,
|
|
"author": self.author,
|
|
"contract_id": self.contract_id,
|
|
"signature": self.signature,
|
|
})
|
|
|
|
|
|
class PrivacyBudget:
|
|
"""Minimal privacy budget descriptor for local-dp/shared signals."""
|
|
|
|
def __init__(self, budget: float | None = None, spent: float = 0.0,
|
|
version: Optional[str] = None, **kwargs: Any) -> None:
|
|
self.budget: float | None = budget
|
|
self.spent: float = spent
|
|
self.version: Optional[str] = version
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps({"budget": self.budget, "spent": self.spent, "version": self.version})
|
|
|
|
|
|
@dataclass
|
|
class AuditLog:
|
|
entries: List[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class PolicyBlock:
|
|
name: str
|
|
rules: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class GoCRegistry:
|
|
"""Graph-of-Contracts (GoC) registry stub for MVP onboarding."""
|
|
|
|
def __init__(self) -> None:
|
|
self._contracts: Dict[str, Dict[str, Any]] = {}
|
|
|
|
def register_contract(self, contract_id: str, version: int, schemas: Dict[str, Any]) -> bool:
|
|
self._contracts[contract_id] = {
|
|
"version": version,
|
|
"schemas": schemas,
|
|
}
|
|
return True
|
|
|
|
def get_contract(self, contract_id: str) -> Dict[str, Any] | None:
|
|
return self._contracts.get(contract_id)
|
|
|
|
|
|
def to_catopt(local_problem: LocalProblem, shared: SharedVariables, delta: PlanDelta) -> Dict[str, Any]:
|
|
"""Canonical representation mapping CosmosMesh primitives to CatOpt-like IR."""
|
|
return {
|
|
"Objects": {"LocalProblem": local_problem.__dict__},
|
|
"Morphisms": {
|
|
"SharedVariables": shared.__dict__,
|
|
"DualVariables": DualVariables().__dict__,
|
|
},
|
|
"PlanDelta": delta.__dict__,
|
|
"PrivacyBudget": PrivacyBudget(per_signal=0.0, total_budget=0.0).__dict__,
|
|
"AuditLog": AuditLog().__dict__,
|
|
"PolicyBlock": PolicyBlock(name="default").__dict__,
|
|
}
|
|
|
|
|
|
def from_catopt(catopt: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Minimal inverse mapping from CatOpt-like IR to local structures."""
|
|
lp = catopt.get("Objects", {}).get("LocalProblem", {})
|
|
delta = catopt.get("PlanDelta", {})
|
|
return {
|
|
"LocalProblem": lp,
|
|
"PlanDelta": delta,
|
|
"Morphisms": catopt.get("Morphisms", {}),
|
|
}
|
|
|
|
|
|
__all__ = [
|
|
"LocalProblem",
|
|
"SharedVariables",
|
|
"DualVariables",
|
|
"PlanDelta",
|
|
"PrivacyBudget",
|
|
"AuditLog",
|
|
"PolicyBlock",
|
|
"GoCRegistry",
|
|
"to_catopt",
|
|
"from_catopt",
|
|
# test-facing/new surface
|
|
"SharedVariable",
|
|
"DualVariable",
|
|
"CatOptBridge",
|
|
"GraphOfContracts",
|
|
"sample_end_to_end_mapping",
|
|
]
|
|
|
|
|
|
class GraphOfContracts:
|
|
"""Tiny in-memory registry compatible with tests."""
|
|
|
|
def __init__(self) -> None:
|
|
self._registry: List[Dict[str, Any]] = []
|
|
|
|
def register(self, contract_id: str, info: Dict[str, Any]) -> None:
|
|
self._registry.append({"contract_id": contract_id, "info": info})
|
|
|
|
def list_contracts(self) -> List[Dict[str, Any]]:
|
|
return list(self._registry)
|
|
|
|
def to_json(self) -> str:
|
|
return json.dumps(self._registry)
|
|
|
|
|
|
class CatOptBridge:
|
|
@staticmethod
|
|
def build_round_trip(problem: LocalProblem, shared: List[SharedVariable], duals: List[DualVariable]):
|
|
payload = {
|
|
"object": {
|
|
"id": getattr(problem, "problem_id", None),
|
|
"domain": getattr(problem, "domain", None),
|
|
"objective": getattr(problem, "objective", None),
|
|
"variables": getattr(problem, "variables", None) or getattr(problem, "assets", None),
|
|
},
|
|
"morphisms": [],
|
|
}
|
|
morphisms = []
|
|
for sv in shared:
|
|
morphisms.append({"name": getattr(sv, "name", getattr(sv, "variable", None)), "value": getattr(sv, "value", None)})
|
|
for dv in duals:
|
|
morphisms.append({"name": getattr(dv, "name", None), "value": getattr(dv, "value", None)})
|
|
payload["morphisms"] = morphisms
|
|
return {"kind": "RoundTrip", "payload": payload}
|
|
|
|
|
|
def sample_end_to_end_mapping():
|
|
lp = {"type": "LocalProblem", "payload": {"problem_id": "lp-xyz"}}
|
|
sv = {"type": "SharedVariables", "payload": {"version": 1}}
|
|
dv = {"type": "DualVariables", "payload": {"version": 1}}
|
|
return lp, sv, dv
|