From 21e9b502f38427fcca79fcd9cee089d3d4ee3a1c Mon Sep 17 00:00:00 2001 From: agent-ed374b2a16b664d2 Date: Wed, 15 Apr 2026 20:30:23 +0200 Subject: [PATCH] build(agent): molt-x#ed374b iteration --- .../__init__.py | 10 +- .../catopt_bridge.py | 253 ++++++++++++------ 2 files changed, 184 insertions(+), 79 deletions(-) diff --git a/src/cosmosmesh_privacy_preserving_federated/__init__.py b/src/cosmosmesh_privacy_preserving_federated/__init__.py index 1257f99..ae8c230 100644 --- a/src/cosmosmesh_privacy_preserving_federated/__init__.py +++ b/src/cosmosmesh_privacy_preserving_federated/__init__.py @@ -1,5 +1,9 @@ -"""CosmosMesh privacy-preserving federated MVP package initializer.""" +"""CosmosMesh Privacy-Preserving Federated package. -from .catopt_bridge import CatOptBridge, LocalProblem, SharedVariable, DualVariable +Exposes MVP scaffolds for bridging CosmosMesh primitives to a CatOpt-style +representation via the catopt_bridge module. +""" -__all__ = ["CatOptBridge", "LocalProblem", "SharedVariable", "DualVariable"] +from .catopt_bridge import CatOptBridge, ContractRegistry, LocalProblem + +__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem"] diff --git a/src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py b/src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py index ba8b423..1b28b7a 100644 --- a/src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py +++ b/src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py @@ -1,119 +1,220 @@ -""" -Minimal CatOpt bridge for CosmosMesh MVP. +"""Minimal CatOpt bridge scaffolding for CosmosMesh MVP. -This module provides a tiny, self-contained mapping layer between -CosmosMesh primitives and a canonical CatOpt-inspired representation. -It is intentionally lightweight and safe to extend in subsequent MVP -iterations. -""" +This module provides lightweight primitives to map CosmosMesh planning +elements into a canonical CatOpt-style representation. +- Objects represent LocalProblems (per-asset planning tasks) +- Morphisms represent SharedVariables / DualVariables (data channels) +- Functors are adapters that translate device-specific models + to canonical LocalProblems + +The implementation here is intentionally minimal and data-oriented; it +serves as a stable MVP surface for tests, adapters, and future wiring. +""" from __future__ import annotations -from dataclasses import dataclass, field -from typing import Any, Dict, List -import time + +from dataclasses import dataclass +from typing import Any, Dict, List, Optional @dataclass class LocalProblem: - """Represents an asset-local optimization problem.""" + """A lightweight representation of a per-asset optimization task.""" problem_id: str - objective: str - variables: List[str] - constraints: List[str] = field(default_factory=list) - version: int = 1 + version: int + variables: Dict[str, Any] + objective: float + constraints: Optional[List[Dict[str, Any]]] = None - def to_object(self) -> Dict[str, Any]: + def to_dict(self) -> Dict[str, Any]: return { - "ObjectType": "LocalProblem", - "id": self.problem_id, + "problem_id": self.problem_id, "version": self.version, - "objective": self.objective, "variables": self.variables, - "constraints": self.constraints, + "objective": self.objective, + "constraints": self.constraints or [], } @dataclass -class SharedVariable: - """Represents a data channel (primal variable sharing).""" +class SharedVariables: + """Represents shared variables (primal signals) across agents.""" - name: str - value: Any - version: int = 1 + channel: str + version: int + payload: Dict[str, Any] - def to_morphism(self) -> Dict[str, Any]: - return { - "MorphismType": "SharedVariable", - "name": self.name, - "version": self.version, - "value": self.value, - } + def as_tuple(self) -> tuple: + return (self.channel, self.version, self.payload) @dataclass -class DualVariable: - """Represents the dual variable channel (Lagrange multipliers).""" +class DualVariables: + """Represents dual variables (Lagrange multipliers) in ADMM-like setup.""" - name: str - value: float - version: int = 1 + channel: str + version: int + payload: Dict[str, Any] - def to_morphism(self) -> Dict[str, Any]: + def as_tuple(self) -> tuple: + return (self.channel, self.version, self.payload) + +# Backwards-compatible alias names expected by tests and existing users +class SharedVariable(SharedVariables): + """Backward-compatible constructor: accepts (name, value, version). + + This mirrors a common testing pattern where a simple scalar signal is + exchanged as a named variable. + """ + + def __init__(self, name: str, value: Any, version: int): + super().__init__(channel=name, version=version, payload={"value": value}) + + @property + def name(self) -> str: + return self.channel + + +class DualVariable(DualVariables): + """Backward-compatible constructor: accepts (name, value, version). + Mirrors a dual variable in the CatOpt representation. + """ + + def __init__(self, name: str, value: Any, version: int): + super().__init__(channel=name, version=version, payload={"value": value}) + + @property + def name(self) -> str: + return self.channel + + +@dataclass +class PlanDelta: + """Represents a delta to the agent's local plan.""" + + delta_id: str + changes: Dict[str, Any] + timestamp: Optional[float] = None + + def to_dict(self) -> Dict[str, Any]: return { - "MorphismType": "DualVariable", - "name": self.name, - "version": self.version, - "value": self.value, + "delta_id": self.delta_id, + "changes": self.changes, + "timestamp": self.timestamp, } +class ContractRegistry: + """Tiny in-process registry for contract schemas and versions.""" + + def __init__(self) -> None: + # contracts[name][version] -> schema dict + self._contracts: Dict[str, Dict[str, Dict[str, Any]]] = {} + + def register_contract(self, name: str, version: str, schema: Dict[str, Any]) -> None: + self._contracts.setdefault(name, {})[version] = schema + + def get_contract(self, name: str, version: str) -> Optional[Dict[str, Any]]: + return self._contracts.get(name, {}).get(version) + + def list_contracts(self) -> Dict[str, Dict[str, Dict[str, Any]]]: + return self._contracts + + class CatOptBridge: - """Lightweight bridge mapping CosmosMesh concepts to a CatOpt-like layer.""" + """Translator from CosmosMesh primitives to a CatOpt-style representation.""" - # Registry of contracts/schema versions for adapters - _contract_registry: Dict[str, Dict[str, Any]] = {} + def __init__(self, registry: Optional[ContractRegistry] = None) -> None: + self.registry = registry or ContractRegistry() - @classmethod - def register_contract(cls, contract_name: str, contract_spec: Dict[str, Any]) -> None: - cls._contract_registry[contract_name] = { - "spec": contract_spec, - "registered_at": int(time.time()), + def map_local_problem(self, lp: LocalProblem) -> Dict[str, Any]: + """Translate a LocalProblem into a canonical CatOpt-like dict.""" + return { + "Objects": { + "LocalProblem": lp.to_dict(), + }, + "Version": "0.1", } @classmethod - def get_contract(cls, contract_name: str) -> Dict[str, Any] | None: - return cls._contract_registry.get(contract_name) + def build_round_trip( + cls, + problem: LocalProblem, + shared: List[SharedVariable], + duals: List[DualVariable], + ) -> Dict[str, Any]: + """Create a round-trip message for a sanity-check test. - @staticmethod - def problem_to_object(problem: LocalProblem) -> Dict[str, Any]: - return problem.to_object() + This is a lightweight helper that packages the local problem with + its associated shared and dual signals into a single serializable blob. + """ - @staticmethod - def variables_to_morphisms(shared: List[SharedVariable], duals: List[DualVariable]) -> List[Dict[str, Any]]: - morphisms: List[Dict[str, Any]] = [] - morphisms.extend([s.to_morphism() for s in shared]) - morphisms.extend([d.to_morphism() for d in duals]) - return morphisms + def _ser(obj: Any) -> Any: + if hasattr(obj, "to_dict"): + return obj.to_dict() + return obj - @staticmethod - def encode_message(kind: str, payload: Dict[str, Any], version: int = 1) -> Dict[str, Any]: + payload = { + "object": {"id": problem.problem_id}, + "morphisms": [], + } + for s in shared: + payload["morphisms"].append({ + "name": getattr(s, "channel", None), + "version": getattr(s, "version", None), + "payload": getattr(s, "payload", None), + "type": "SharedVariable", + }) + for d in duals: + payload["morphisms"].append({ + "name": getattr(d, "channel", None), + "version": getattr(d, "version", None), + "payload": getattr(d, "payload", None), + "type": "DualVariable", + }) return { - "kind": kind, - "version": version, - "timestamp": int(time.time()), - "nonce": int(time.time() * 1000) & 0xFFFFFFFF, + "kind": "RoundTrip", "payload": payload, } - # Convenience helpers for a minimal end-to-end round - @classmethod - def build_round_trip(cls, problem: LocalProblem, - shared: List[SharedVariable], - duals: List[DualVariable]) -> Dict[str, Any]: - obj = cls.problem_to_object(problem) - morphs = cls.variables_to_morphisms(shared, duals) - return cls.encode_message("RoundTrip", {"object": obj, "morphisms": morphs}) + def map_shared_variables(self, sv: SharedVariables) -> Dict[str, Any]: + return { + "Morphisms": { + "SharedVariable": { + "channel": sv.channel, + "version": sv.version, + "payload": sv.payload, + } + } + } + + def map_dual_variables(self, dv: DualVariables) -> Dict[str, Any]: + return { + "Morphisms": { + "DualVariable": { + "channel": dv.channel, + "version": dv.version, + "payload": dv.payload, + } + } + } + + def map_plan_delta(self, delta: PlanDelta) -> Dict[str, Any]: + return { + "Objects": { + "PlanDelta": delta.to_dict(), + } + } -__all__ = ["CatOptBridge", "LocalProblem", "SharedVariable", "DualVariable"] +__all__ = [ + "LocalProblem", + "SharedVariables", + "SharedVariable", + "DualVariables", + "DualVariable", + "PlanDelta", + "ContractRegistry", + "CatOptBridge", +]