build(agent): molt-x#ed374b iteration

This commit is contained in:
agent-ed374b2a16b664d2 2026-04-15 20:30:23 +02:00
parent fee6eb6aee
commit 21e9b502f3
2 changed files with 184 additions and 79 deletions

View File

@ -1,5 +1,9 @@
"""CosmosMesh privacy-preserving federated MVP package initializer."""
"""CosmosMesh Privacy-Preserving Federated package.
from .catopt_bridge import CatOptBridge, LocalProblem, SharedVariable, DualVariable
Exposes MVP scaffolds for bridging CosmosMesh primitives to a CatOpt-style
representation via the catopt_bridge module.
"""
__all__ = ["CatOptBridge", "LocalProblem", "SharedVariable", "DualVariable"]
from .catopt_bridge import CatOptBridge, ContractRegistry, LocalProblem
__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem"]

View File

@ -1,119 +1,220 @@
"""
Minimal CatOpt bridge for CosmosMesh MVP.
"""Minimal CatOpt bridge scaffolding for CosmosMesh MVP.
This module provides a tiny, self-contained mapping layer between
CosmosMesh primitives and a canonical CatOpt-inspired representation.
It is intentionally lightweight and safe to extend in subsequent MVP
iterations.
"""
This module provides lightweight primitives to map CosmosMesh planning
elements into a canonical CatOpt-style representation.
- Objects represent LocalProblems (per-asset planning tasks)
- Morphisms represent SharedVariables / DualVariables (data channels)
- Functors are adapters that translate device-specific models
to canonical LocalProblems
The implementation here is intentionally minimal and data-oriented; it
serves as a stable MVP surface for tests, adapters, and future wiring.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List
import time
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
@dataclass
class LocalProblem:
"""Represents an asset-local optimization problem."""
"""A lightweight representation of a per-asset optimization task."""
problem_id: str
objective: str
variables: List[str]
constraints: List[str] = field(default_factory=list)
version: int = 1
version: int
variables: Dict[str, Any]
objective: float
constraints: Optional[List[Dict[str, Any]]] = None
def to_object(self) -> Dict[str, Any]:
def to_dict(self) -> Dict[str, Any]:
return {
"ObjectType": "LocalProblem",
"id": self.problem_id,
"problem_id": self.problem_id,
"version": self.version,
"objective": self.objective,
"variables": self.variables,
"constraints": self.constraints,
"objective": self.objective,
"constraints": self.constraints or [],
}
@dataclass
class SharedVariable:
"""Represents a data channel (primal variable sharing)."""
class SharedVariables:
"""Represents shared variables (primal signals) across agents."""
name: str
value: Any
version: int = 1
channel: str
version: int
payload: Dict[str, Any]
def to_morphism(self) -> Dict[str, Any]:
return {
"MorphismType": "SharedVariable",
"name": self.name,
"version": self.version,
"value": self.value,
}
def as_tuple(self) -> tuple:
return (self.channel, self.version, self.payload)
@dataclass
class DualVariable:
"""Represents the dual variable channel (Lagrange multipliers)."""
class DualVariables:
"""Represents dual variables (Lagrange multipliers) in ADMM-like setup."""
name: str
value: float
version: int = 1
channel: str
version: int
payload: Dict[str, Any]
def to_morphism(self) -> Dict[str, Any]:
def as_tuple(self) -> tuple:
return (self.channel, self.version, self.payload)
# Backwards-compatible alias names expected by tests and existing users
class SharedVariable(SharedVariables):
"""Backward-compatible constructor: accepts (name, value, version).
This mirrors a common testing pattern where a simple scalar signal is
exchanged as a named variable.
"""
def __init__(self, name: str, value: Any, version: int):
super().__init__(channel=name, version=version, payload={"value": value})
@property
def name(self) -> str:
return self.channel
class DualVariable(DualVariables):
"""Backward-compatible constructor: accepts (name, value, version).
Mirrors a dual variable in the CatOpt representation.
"""
def __init__(self, name: str, value: Any, version: int):
super().__init__(channel=name, version=version, payload={"value": value})
@property
def name(self) -> str:
return self.channel
@dataclass
class PlanDelta:
"""Represents a delta to the agent's local plan."""
delta_id: str
changes: Dict[str, Any]
timestamp: Optional[float] = None
def to_dict(self) -> Dict[str, Any]:
return {
"MorphismType": "DualVariable",
"name": self.name,
"version": self.version,
"value": self.value,
"delta_id": self.delta_id,
"changes": self.changes,
"timestamp": self.timestamp,
}
class ContractRegistry:
"""Tiny in-process registry for contract schemas and versions."""
def __init__(self) -> None:
# contracts[name][version] -> schema dict
self._contracts: Dict[str, Dict[str, Dict[str, Any]]] = {}
def register_contract(self, name: str, version: str, schema: Dict[str, Any]) -> None:
self._contracts.setdefault(name, {})[version] = schema
def get_contract(self, name: str, version: str) -> Optional[Dict[str, Any]]:
return self._contracts.get(name, {}).get(version)
def list_contracts(self) -> Dict[str, Dict[str, Dict[str, Any]]]:
return self._contracts
class CatOptBridge:
"""Lightweight bridge mapping CosmosMesh concepts to a CatOpt-like layer."""
"""Translator from CosmosMesh primitives to a CatOpt-style representation."""
# Registry of contracts/schema versions for adapters
_contract_registry: Dict[str, Dict[str, Any]] = {}
def __init__(self, registry: Optional[ContractRegistry] = None) -> None:
self.registry = registry or ContractRegistry()
@classmethod
def register_contract(cls, contract_name: str, contract_spec: Dict[str, Any]) -> None:
cls._contract_registry[contract_name] = {
"spec": contract_spec,
"registered_at": int(time.time()),
def map_local_problem(self, lp: LocalProblem) -> Dict[str, Any]:
"""Translate a LocalProblem into a canonical CatOpt-like dict."""
return {
"Objects": {
"LocalProblem": lp.to_dict(),
},
"Version": "0.1",
}
@classmethod
def get_contract(cls, contract_name: str) -> Dict[str, Any] | None:
return cls._contract_registry.get(contract_name)
def build_round_trip(
cls,
problem: LocalProblem,
shared: List[SharedVariable],
duals: List[DualVariable],
) -> Dict[str, Any]:
"""Create a round-trip message for a sanity-check test.
@staticmethod
def problem_to_object(problem: LocalProblem) -> Dict[str, Any]:
return problem.to_object()
This is a lightweight helper that packages the local problem with
its associated shared and dual signals into a single serializable blob.
"""
@staticmethod
def variables_to_morphisms(shared: List[SharedVariable], duals: List[DualVariable]) -> List[Dict[str, Any]]:
morphisms: List[Dict[str, Any]] = []
morphisms.extend([s.to_morphism() for s in shared])
morphisms.extend([d.to_morphism() for d in duals])
return morphisms
def _ser(obj: Any) -> Any:
if hasattr(obj, "to_dict"):
return obj.to_dict()
return obj
@staticmethod
def encode_message(kind: str, payload: Dict[str, Any], version: int = 1) -> Dict[str, Any]:
payload = {
"object": {"id": problem.problem_id},
"morphisms": [],
}
for s in shared:
payload["morphisms"].append({
"name": getattr(s, "channel", None),
"version": getattr(s, "version", None),
"payload": getattr(s, "payload", None),
"type": "SharedVariable",
})
for d in duals:
payload["morphisms"].append({
"name": getattr(d, "channel", None),
"version": getattr(d, "version", None),
"payload": getattr(d, "payload", None),
"type": "DualVariable",
})
return {
"kind": kind,
"version": version,
"timestamp": int(time.time()),
"nonce": int(time.time() * 1000) & 0xFFFFFFFF,
"kind": "RoundTrip",
"payload": payload,
}
# Convenience helpers for a minimal end-to-end round
@classmethod
def build_round_trip(cls, problem: LocalProblem,
shared: List[SharedVariable],
duals: List[DualVariable]) -> Dict[str, Any]:
obj = cls.problem_to_object(problem)
morphs = cls.variables_to_morphisms(shared, duals)
return cls.encode_message("RoundTrip", {"object": obj, "morphisms": morphs})
def map_shared_variables(self, sv: SharedVariables) -> Dict[str, Any]:
return {
"Morphisms": {
"SharedVariable": {
"channel": sv.channel,
"version": sv.version,
"payload": sv.payload,
}
}
}
def map_dual_variables(self, dv: DualVariables) -> Dict[str, Any]:
return {
"Morphisms": {
"DualVariable": {
"channel": dv.channel,
"version": dv.version,
"payload": dv.payload,
}
}
}
def map_plan_delta(self, delta: PlanDelta) -> Dict[str, Any]:
return {
"Objects": {
"PlanDelta": delta.to_dict(),
}
}
__all__ = ["CatOptBridge", "LocalProblem", "SharedVariable", "DualVariable"]
__all__ = [
"LocalProblem",
"SharedVariables",
"SharedVariable",
"DualVariables",
"DualVariable",
"PlanDelta",
"ContractRegistry",
"CatOptBridge",
]