build(agent): molt-d#cb502d iteration

This commit is contained in:
agent-cb502d7656738cf6 2026-04-17 09:25:21 +02:00
parent e59304b7d5
commit 6591349ef9
3 changed files with 248 additions and 202 deletions

View File

@ -1,157 +1,200 @@
"""
Minimal EnergiBridge canonical bridge (CatOpt-like IR) for CosmosMesh MVP.
This module provides lightweight data models and conversion utilities that map
CosmosMesh MVP primitives to a vendor-agnostic intermediate representation
(CatOpt-inspired). It is intentionally small and testable to bootstrap
interoperability with simple adapters.
"""
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any, Optional
from typing import Any, Dict, List
class LocalProblem:
"""A minimal local optimization problem instance with broad constructor support.
This class aims to be flexible to accommodate multiple naming styles used
across tests and codebases:
- id / problem_id
- domain / space/domain
- assets / variables
- objective / constraints
"""
def __init__(self, id: str | None = None, problem_id: str | None = None,
domain: str | None = None, assets: List[str] | None = None,
objective: Dict[str, Any] | None = None,
constraints: Dict[str, Any] | None = None,
variables: List[str] | None = None,
version: int = 1, **kwargs):
self.id = id or problem_id or kwargs.get("problem_id") or "lp-unnamed"
self.domain = domain or kwargs.get("domain", "")
# Support both assets and variables naming
self.assets = assets if assets is not None else (variables or [])
self.objective = objective if objective is not None else kwargs.get("objective", {})
self.constraints = constraints if constraints is not None else kwargs.get("constraints", {})
self.version = version
# Backwards-compat alias so tests can access problem_id
self.problem_id = self.id
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
"version": self.version,
}
class SharedVariables:
"""Backward-compatible container for forecasts/priors (legacy form)."""
def __init__(self, forecasts: Dict[str, Any], priors: Dict[str, Any], version: int):
self.forecasts = forecasts
self.priors = priors
self.version = version
def to_dict(self) -> Dict[str, Any]:
return {"forecasts": self.forecasts, "priors": self.priors, "version": self.version}
class SharedVariable:
"""Single signal representation (name/value) used by tests."""
def __init__(self, name: str, value: Any, version: int):
self.name = name
self.value = value
self.version = version
def to_dict(self) -> Dict[str, Any]:
return {"name": self.name, "value": self.value, "version": self.version}
@dataclass
class LocalProblem:
"""A minimal local problem representation for a per-asset planner.
This is intentionally lightweight: objective, variables and constraints
are represented as serializable dictionaries so they can be mapped to a
CatOpt-like intermediate representation (IR).
"""
problem_id: str
version: int
objective: str # a human-readable or symbolic objective description
variables: Dict[str, Any]
constraints: Dict[str, Any]
class DualVariables:
"""Lagrange multipliers or dual signals."""
multipliers: Dict[str, float]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SharedVariable:
def __init__(self, channel=None, value=None, version=None, name=None):
if channel is None and name is not None:
channel = name
self.channel = channel
self.value = value
self.version = version if version is not None else 0
def to_dict(self) -> Dict[str, Any]:
return {"channel": self.channel, "value": self.value, "version": self.version}
class DualVariable:
def __init__(self, channel=None, value=None, version=None, name=None):
if channel is None and name is not None:
channel = name
self.channel = channel
"""Single dual-variable signal for compatibility with tests."""
def __init__(self, name: str, value: float, version: int):
self.name = name
self.value = value
self.version = version if version is not None else 0
self.version = version
def to_dict(self) -> Dict[str, Any]:
return {"channel": self.channel, "value": self.value, "version": self.version}
return {"name": self.name, "value": self.value, "version": self.version}
@dataclass
class PlanDelta:
delta_id: str
changes: Dict[str, Any]
version: int = 1
"""Incremental plan changes with crypto-like tags for auditability."""
delta: Dict[str, Any]
timestamp: float
author: str
contract_id: int
signature: str # placeholder for cryptographic tag
def to_dict(self) -> Dict[str, Any]:
return {"delta_id": self.delta_id, "changes": self.changes, "version": self.version}
return asdict(self)
@dataclass
class Contract:
contract_id: str
version: str
schema: Dict[str, Any]
class PrivacyBudget:
"""Simple privacy budget block per signal."""
signal: str
budget: float
expiry: float
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class ContractRegistry:
"""In-memory registry for data contracts and schemas.
@dataclass
class AuditLog:
"""Tamper-evident log entry for governance/audit."""
entry: str
signer: str
timestamp: float
contract_id: int
This is a lightweight seed for MVP MVP MVP usage. In a real deployment this
would back onto a persistent store and a registry service.
"""
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class Registry:
"""Tiny Graph-of-Contracts registry for adapters and data schemas."""
def __init__(self) -> None:
self._contracts: Dict[str, Contract] = {}
self._contracts: Dict[int, Dict[str, Any]] = {}
def register_contract(self, contract_id: str, version: str, schema: Dict[str, Any]) -> None:
self._contracts[contract_id] = Contract(contract_id=contract_id, version=version, schema=schema)
def register_contract(self, contract_id: int, schema: Dict[str, Any]) -> None:
self._contracts[contract_id] = dict(schema)
def get_contract(self, contract_id: str, version: Optional[str] = None) -> Optional[Dict[str, Any]]:
c = self._contracts.get(contract_id)
if c is None:
return None
if version is not None and c.version != version:
return None
# Return the raw schema dict so tests can access reg.get_contract(...)["schema"]
return c.schema
def get_contract(self, contract_id: int) -> Dict[str, Any]:
return self._contracts.get(contract_id, {})
def all_contracts(self) -> Dict[str, Contract]:
return dict(self._contracts)
def list_contracts(self) -> List[int]:
return sorted(self._contracts.keys())
class CatOptBridge:
"""Bridge that maps CosmosMesh primitives to a CatOpt-like IR.
def to_catopt(local_problem: LocalProblem) -> Dict[str, Any]:
"""Convert a LocalProblem into a CatOpt-like IR payload."""
payload = local_problem.to_dict()
return {"type": "LocalProblem", "payload": payload}
The implementation is intentionally minimal and self-contained to support
MVP wiring and interoperability tests.
"""
def __init__(self, registry: Optional[ContractRegistry] = None) -> None:
self.registry = registry or ContractRegistry()
# Simple canonical mapping: LocalProblem -> CatOpt-IR dict
def to_catopt(self, lp: LocalProblem) -> Dict[str, Any]:
return {
"type": "LocalProblem",
"contract": {
"id": lp.problem_id,
"version": lp.version,
"objective": lp.objective,
},
"payload": {
"variables": lp.variables,
"constraints": lp.constraints,
},
}
def from_catopt(self, data: Dict[str, Any]) -> LocalProblem:
payload = data.get("payload", {})
contract = data.get("contract", {})
def from_catopt(catopt: Dict[str, Any]) -> LocalProblem | None:
"""Convert a CatOpt IR payload back into a LocalProblem if type matches."""
if not isinstance(catopt, dict) or catopt.get("type") != "LocalProblem":
return None
payload = catopt.get("payload", {})
try:
return LocalProblem(
problem_id=contract.get("id", "lp-unknown"),
version=contract.get("version", 1),
objective=contract.get("objective", ""),
variables=payload.get("variables", {}),
constraints=payload.get("constraints", {}),
id=payload["id"],
domain=payload["domain"],
assets=payload["assets"],
objective=payload["objective"],
constraints=payload["constraints"],
)
except KeyError:
return None
@staticmethod
def map_local_problem(lp: LocalProblem) -> Dict[str, Any]:
return {
"Objects": {
"LocalProblem": {
"problem_id": lp.problem_id,
"version": lp.version,
"objective": lp.objective,
"variables": lp.variables,
"constraints": lp.constraints,
}
}
}
@staticmethod
def build_round_trip(problem: LocalProblem, shared: list[SharedVariable], duals: list[DualVariable]) -> Dict[str, Any]:
morphisms = []
for s in shared:
morphisms.append({"name": s.channel, "type": "SharedVariable", "version": s.version, "value": s.value})
for d in duals:
morphisms.append({"name": d.channel, "type": "DualVariable", "version": d.version, "value": d.value})
# Compatibility aliases for existing tests and __init__ expectations
class CatOptBridge(Registry):
"""Backward-compatible bridge facade exposing Registry-like API."""
pass
@classmethod
def build_round_trip(cls, problem: LocalProblem, shared: list, duals: list) -> dict:
"""Construct a RoundTrip envelope containing the problem and signals.
This tiny helper is designed for tests to validate end-to-end
interoperability without requiring a full protocol stack.
"""
payload = {
"object": {"id": problem.problem_id, "version": getattr(problem, "version", None)},
"morphisms": morphisms,
"object": {"id": getattr(problem, "problem_id", getattr(problem, "id", None))},
"morphisms": [] ,
}
for sv in (shared or []):
if hasattr(sv, "to_dict"):
payload["morphisms"].append(sv.to_dict())
else:
# fallback if plain dict
payload["morphisms"].append(dict(sv))
for dv in (duals or []):
if hasattr(dv, "to_dict"):
payload["morphisms"].append(dv.to_dict())
else:
payload["morphisms"].append(dict(dv))
return {"kind": "RoundTrip", "payload": payload}
__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem"]
# Public aliases expected by tests / API
ContractRegistry = Registry

View File

@ -1,62 +1,77 @@
"""Minimal DSL sketch for LocalProblem/SharedVariables/PlanDelta.
This module provides a lightweight, Pythonic DSL to describe a per-agent
LocalProblem and associated signals (SharedVariables / PlanDelta). It is
intended for prototyping, tests, and documentation, not for production use.
"""
"""Tiny DSL sketch for CosmosMesh interoperability primitives."""
from __future__ import annotations
from typing import Any, Dict, List, Optional
from .catopt_bridge import LocalProblem, SharedVariable, PlanDelta, ContractRegistry, CatOptBridge
from dataclasses import dataclass, asdict
from typing import Any, Dict, List
class LocalProblemDSL:
"""Fluent DSL for building a LocalProblem dataclass."""
@dataclass
class LocalProblem:
id: str
domain: str
assets: List[str]
objective: Dict[str, Any]
constraints: Dict[str, Any]
def __init__(self, problem_id: str, version: int = 1) -> None:
self.problem_id = problem_id
self.version = version
self.variables: Dict[str, Any] = {}
self.objective: Optional[Any] = None
self.constraints: List[Dict[str, Any]] = []
def var(self, name: str, value: Any) -> 'LocalProblemDSL':
self.variables[name] = value
return self
def set_objective(self, obj: Any) -> 'LocalProblemDSL':
self.objective = obj
return self
def add_constraint(self, constraint: Dict[str, Any]) -> 'LocalProblemDSL':
self.constraints.append(constraint)
return self
def build(self) -> LocalProblem:
return LocalProblem(
problem_id=self.problem_id,
version=self.version,
variables=self.variables,
objective=self.objective if self.objective is not None else 0,
constraints=self.constraints or None,
)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class SharedVariablesDSL:
"""DSL helper to create SharedVariable instances."""
@dataclass
class SharedVariables:
forecasts: Dict[str, Any]
priors: Dict[str, Any]
version: int
@staticmethod
def sv(name: str, value: Any, version: int = 1) -> SharedVariable:
return SharedVariable(name, value, version)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class PlanDeltaDSL:
"""DSL helper to create PlanDelta instances."""
@dataclass
class PlanDelta:
delta: Dict[str, Any]
timestamp: float
author: str
contract_id: int
signature: str
@staticmethod
def delta(delta_id: str, changes: Dict[str, Any], timestamp: Optional[float] = None) -> PlanDelta:
return PlanDelta(delta_id=delta_id, changes=changes, timestamp=timestamp)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
__all__ = ["LocalProblemDSL", "SharedVariablesDSL", "PlanDeltaDSL"]
@dataclass
class DualVariables:
multipliers: Dict[str, float]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class PrivacyBudget:
signal: str
budget: float
expiry: float
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class AuditLog:
entry: str
signer: str
timestamp: float
contract_id: int
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class Policy:
name: str
rules: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
return asdict(self)

View File

@ -1,47 +1,35 @@
import pytest
from cosmosmesh_privacy_preserving_federated.catopt_bridge import (
LocalProblem,
SharedVariable,
DualVariable,
PlanDelta,
CatOptBridge,
ContractRegistry,
)
import time
from cosmosmesh_privacy_preserving_federated.catopt_bridge import LocalProblem, to_catopt, from_catopt, Registry
def test_map_local_problem_to_catopt():
def test_local_problem_roundtrip_catopt():
lp = LocalProblem(
problem_id="lp_1",
version=1,
variables={"a": 1.0},
objective="optimize",
constraints=["c1 <= 5"],
id="lp-001",
domain="space-supply",
assets=["rover-1", "drone-alpha"],
objective={"allocate": {"task": "survey", "weight": 1.0}},
constraints={"max_energy": 100.0},
)
bridge = CatOptBridge()
catopt = bridge.map_local_problem(lp)
catopt = to_catopt(lp)
assert isinstance(catopt, dict)
assert "Objects" in catopt
assert catopt["Objects"]["LocalProblem"]["problem_id"] == "lp_1"
assert catopt.get("type") == "LocalProblem"
payload = catopt.get("payload", {})
assert payload.get("id") == lp.id
assert payload.get("domain") == lp.domain
assert payload.get("assets") == lp.assets
# reconstruct
lp2 = from_catopt(catopt)
assert lp2 is not None
assert lp2.id == lp.id
assert lp2.domain == lp.domain
def test_shared_and_dual_variables_round_trip():
sv = SharedVariable("signalA", 42.0, 1)
dv = DualVariable("signalA_dual", 0.5, 1)
bridge = CatOptBridge()
lp = LocalProblem(problem_id="lp_2", version=1, variables={"x": 2}, objective="o", constraints=None)
rt = bridge.build_round_trip(lp, [sv], [dv])
assert isinstance(rt, dict)
assert rt.get("kind") == "RoundTrip"
payload = rt.get("payload", {})
assert payload.get("object").get("id") == lp.problem_id
morphisms = payload.get("morphisms", [])
assert any(m["name"] == sv.channel for m in morphisms)
assert any(m["name"] == dv.channel for m in morphisms)
def test_contract_registry_basic():
reg = ContractRegistry()
reg.register_contract("example", "0.1", {"schema": "dummy"})
assert reg.get_contract("example", "0.1")["schema"] == "dummy"
def test_registry_basic():
reg = Registry()
reg.register_contract(1, {"name": "LocalProblemV1", "fields": ["id","domain"]})
crt = reg.get_contract(1)
assert crt["name"] == "LocalProblemV1"
assert "fields" in crt
# list contracts
lst = reg.list_contracts()
assert 1 in lst