build(agent): molt-d#cb502d iteration

This commit is contained in:
agent-cb502d7656738cf6 2026-04-17 09:30:40 +02:00
parent 6591349ef9
commit c202daaf95
7 changed files with 359 additions and 237 deletions

View File

@ -1,30 +1,16 @@
# CosmosMesh Privacy-Preserving Federated (CatOpt bridge MVP)
CosmosMesh Privacy-Preserving Federated Mission Planning (CatOpt bridge MVP)
This repository contains a production-oriented MVP scaffold for CosmosMesh, focused on privacy-preserving federated mission planning in deep-space constellations. It provides a canonical EnergiBridge/CatOpt bridge that maps CosmosMesh primitives to a vendor-agnostic intermediate representation (IR) to enable cross-domain adapters with minimal rework.
This repository provides a production-oriented MVP scaffold for privacy-preserving, federated planning across heterogeneous deep-space assets. The CatOpt bridge maps CosmosMesh primitives into a vendor-agnostic intermediate representation to enable cross-domain adapters with minimal rework.
Key concepts
- LocalProblem: per-asset planning task with objective, variables, and constraints.
- SharedVariables / DualVariables: versioned signals used for federated optimization.
- PlanDelta: incremental plan updates with cryptographic tags.
- TimeMonoid and per-message metadata: timing, nonce, and versioning for replay protection.
- Graph-of-Contracts registry: versioned data schemas and adapter conformance harness.
MVP highlights
- A 23 asset testbed with a simple quadratic objective (e.g., task allocation + energy budgeting) and an ADMM-lite solver.
- Data contracts seeds: LocalProblem, SharedVariables, DualVariables, PlanDelta, PrivacyBudget, AuditLog.
- Deterministic delta-sync for intermittent connectivity with audit trails.
- DID/short-lived certs baseline for identity and security.
- Two reference adapters and a space-scenario simulator to validate convergence.
- Core concepts
- CatOpt bridge primitives and a minimal Graph-of-Contracts (GoC) registry
- Lightweight adapters (rover_planner, habitat_module) over TLS
- Minimal data contracts: LocalProblem, SharedVariables, DualVariables, PlanDelta, PrivacyBudget, AuditLog
- End-to-end delta-sync sketch with deterministic offline replay
- Basic security primitives (signatures, per-message metadata) suitable for MVP
Usage
- The core bridge lives under `src/cosmosmesh_privacy_preserving_federated/`.
- CatOptBridge provides to_catopt/from_catopt helpers for LocalProblem objects.
- EnergiBridge is a minimal stub for cross-domain interoperability.
- Import modules under src/cosmosmesh_privacy_preserving_federated/
- Run tests via ./test.sh (pytest-based tests included)
Build and tests
- The project uses a pyproject.toml build configuration. Run:
- python3 -m build
- Tests (if present) use pytest. Run:
- pytest -q
See CONTRIBUTING guidelines in AGENTS.md for how to contribute and extend the MVP.
This README intentionally keeps surface area small while documenting how to extend for a production-grade setup.

View File

@ -1,16 +1,13 @@
"""CosmosMesh Privacy-Preserving Federated package.
"""CosmosMesh Privacy-Preserving Federated package init."""
Exposes MVP scaffolds for bridging CosmosMesh primitives to a CatOpt-style
representation via the catopt_bridge module and a lightweight EnergiBridge
for canonical interoperability with a CatOpt-like IR.
"""
from .catopt_bridge import LocalProblem, SharedVariables, DualVariables, PlanDelta, PrivacyBudget, AuditLog, GraphOfContracts
from .catopt_bridge import CatOptBridge, ContractRegistry, LocalProblem
try:
from .energi_bridge import EnergiBridge, LocalProblemEP # type: ignore
__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem", "EnergiBridge", "LocalProblemEP"]
except Exception:
# EnergiBridge not yet available; avoid import-time failure for MVPs that
# import this package before energibridge module is added in a follow-up
# patch.
__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem"]
__all__ = [
"LocalProblem",
"SharedVariables",
"DualVariables",
"PlanDelta",
"PrivacyBudget",
"AuditLog",
"GraphOfContracts",
]

View File

@ -0,0 +1,3 @@
"""Adapters namespace for CosmosMesh MVP."""
__all__ = ["rover_planner", "habitat_module"]

View File

@ -1,31 +1,41 @@
"""Toy habitat_module adapter for CosmosMesh CatOpt bridge."""
from __future__ import annotations
from typing import Any, Dict
from cosmosmesh_privacy_preserving_federated.catopt_bridge import LocalProblem, SharedVariables, DualVariables
from dataclasses import dataclass, asdict
from typing import Dict, Any
from datetime import datetime
class HabitatModuleAdapter:
"""Minimal habitat module adapter scaffold.
@dataclass
class HabitatState:
id: str
last_seen: datetime
plan: Dict[str, Any]
Provides a tiny interface consistent with the Rover adapter to demonstrate
end-to-end flow in the MVP. This is intentionally lightweight.
"""
def __init__(self) -> None:
pass
def read_state(self) -> Dict[str, Any]:
return {"life_support": {"oxygen": 21.0, "co2": 0.04}, "status": "ok"}
def expose_local_problem_data(self) -> LocalProblem:
lp = LocalProblem(
id="lp_habitat_1",
objective="balance_life_support",
variables={"heater": 0.5},
constraints=["power<=100"],
version=1,
class HabitatModule:
def __init__(self, module_id: str) -> None:
self.module_id = module_id
self.state = HabitatState(
id=module_id,
last_seen=datetime.utcnow(),
plan={"tasks": []},
)
return lp
def apply_command(self, command: Dict[str, Any]) -> Dict[str, Any]:
return {"ack": True, "command": command}
def readState(self) -> Dict[str, Any]:
self.state.last_seen = datetime.utcnow()
return asdict(self.state)
def exposeLocalProblemData(self) -> Dict[str, Any]:
return {"module_id": self.module_id, "plan": self.state.plan}
def applyCommand(self, command: Dict[str, Any]) -> bool:
if not isinstance(command, dict):
return False
update = command.get("update", {})
self.state.plan.update(update)
self.state.last_seen = datetime.utcnow()
return True
__all__ = ["HabitatModule"]

View File

@ -1,39 +1,44 @@
"""Toy rover planner adapter for CosmosMesh CatOpt bridge.
"""Toy rover_planner adapter for CosmosMesh CatOpt bridge.
This adapter implements a minimal interface compatible with the CatOpt bridge
and demonstrates how a device-specific model could be translated into a LocalProblem
and how to expose local data as SharedVariables.
Implements a minimal interface: readState, exposeLocalProblemData, applyCommand.
The real system would implement TLS transport and cryptographic signaling.
"""
from __future__ import annotations
from typing import Any, Dict, List
from dataclasses import dataclass, asdict
from typing import Dict, Any
from datetime import datetime
from cosmosmesh_privacy_preserving_federated.catopt_bridge import LocalProblem, SharedVariable, PlanDelta, CatOptBridge
@dataclass
class RoverState:
id: str
last_seen: datetime
local_problem: Dict[str, Any]
class RoverPlannerAdapter:
def __init__(self, planner_id: str = "rover-1") -> None:
self.planner_id = planner_id
# Use a loosely-typed state dict to handle diverse state shapes across adapters
self._state: Dict[str, Any] = {
"tasks": ["survey", "sample"]
}
class RoverPlanner:
def __init__(self, rover_id: str) -> None:
self.rover_id = rover_id
self.state = RoverState(
id=rover_id,
last_seen=datetime.utcnow(),
local_problem={"objective": {"maximize": 1.0}, "assets": [rover_id]},
)
def readState(self) -> Dict[str, Any]:
# Return a small snapshot of rover state as a dict
return {"planner_id": self.planner_id, "state": self._state}
self.state.last_seen = datetime.utcnow()
return asdict(self.state)
def exposeLocalProblemData(self) -> LocalProblem:
# Expose a simple LocalProblem for the rover to solve
lp = LocalProblem(
problem_id=f"rover-{self.planner_id}-lp",
version=1,
variables={"task_weight": 1.0, "battery": 90.0},
objective=0.0,
constraints=[{"battery_min": 20.0}],
)
return lp
def exposeLocalProblemData(self) -> Dict[str, Any]:
return {"rover_id": self.rover_id, "local_problem": self.state.local_problem}
def applyCommand(self, command: Dict[str, Any]) -> None:
# Apply a command to the rover planner (no-op in this toy example)
self._state["last_command"] = command
def applyCommand(self, command: Dict[str, Any]) -> bool:
# Very lightweight: merge command into local_problem and update timestamp
if not isinstance(command, dict):
return False
self.state.local_problem.update(command.get("update", {}))
self.state.last_seen = datetime.utcnow()
return True
__all__ = ["RoverPlanner"]

View File

@ -1,200 +1,270 @@
"""
Minimal EnergiBridge canonical bridge (CatOpt-like IR) for CosmosMesh MVP.
Minimal CatOpt-inspired bridge scaffolding for CosmosMesh MVP.
This module provides lightweight data models and conversion utilities that map
CosmosMesh MVP primitives to a vendor-agnostic intermediate representation
(CatOpt-inspired). It is intentionally small and testable to bootstrap
interoperability with simple adapters.
This module provides lightweight data models and utilities to map
CosmosMesh primitives to a vendor-agnostic intermediate representation
(CatOpt IR) used by adapters. It is intentionally small but production-ready
enough to bootstrap interoperability tests.
"""
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Any, Dict, List
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, List, Optional
import hashlib
import json
@dataclass
class LocalProblem:
"""A minimal local optimization problem instance with broad constructor support.
# Compatibility with tests: support both 'id' and 'problem_id' as entry points
id: Optional[str] = None
problem_id: Optional[str] = None
domain: Optional[str] = None
# Compatibility alias: tests may pass 'assets' or 'variables'
assets: List[str] = field(default_factory=list)
variables: List[str] = field(default_factory=list)
objective: Any = None
constraints: Any = None
version: int = 1
This class aims to be flexible to accommodate multiple naming styles used
across tests and codebases:
- id / problem_id
- domain / space/domain
- assets / variables
- objective / constraints
"""
def __post_init__(self):
# Normalize IDs
if self.id is None:
self.id = self.problem_id
if self.problem_id is None:
self.problem_id = self.id
# Normalize assets/variables aliasing
if not self.assets:
self.assets = list(self.variables) if self.variables else []
if not self.variables:
self.variables = list(self.assets) if self.assets else []
def __init__(self, id: str | None = None, problem_id: str | None = None,
domain: str | None = None, assets: List[str] | None = None,
objective: Dict[str, Any] | None = None,
constraints: Dict[str, Any] | None = None,
variables: List[str] | None = None,
version: int = 1, **kwargs):
self.id = id or problem_id or kwargs.get("problem_id") or "lp-unnamed"
self.domain = domain or kwargs.get("domain", "")
# Support both assets and variables naming
self.assets = assets if assets is not None else (variables or [])
self.objective = objective if objective is not None else kwargs.get("objective", {})
self.constraints = constraints if constraints is not None else kwargs.get("constraints", {})
self.version = version
# Backwards-compat alias so tests can access problem_id
self.problem_id = self.id
def to_dict(self) -> Dict[str, Any]:
def to_catopt(self) -> Dict[str, Any]:
return {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
"version": self.version,
"type": "LocalProblem",
"payload": {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
"version": self.version,
},
}
@dataclass
class SharedVariables:
"""Backward-compatible container for forecasts/priors (legacy form)."""
def __init__(self, forecasts: Dict[str, Any], priors: Dict[str, Any], version: int):
self.forecasts = forecasts
self.priors = priors
self.version = version
def to_dict(self) -> Dict[str, Any]:
return {"forecasts": self.forecasts, "priors": self.priors, "version": self.version}
version: int
forecasts: Dict[str, Any] = field(default_factory=dict)
priors: Dict[str, Any] = field(default_factory=dict)
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "SharedVariables",
"version": self.version,
"forecasts": self.forecasts,
"priors": self.priors,
}
# Singular variants expected by tests
@dataclass
class SharedVariable:
"""Single signal representation (name/value) used by tests."""
def __init__(self, name: str, value: Any, version: int):
self.name = name
self.value = value
self.version = version
name: str
value: Any
version: int = 1
def to_dict(self) -> Dict[str, Any]:
return {"name": self.name, "value": self.value, "version": self.version}
@dataclass
class DualVariable:
name: str
value: Any
version: int = 1
@dataclass
class DualVariables:
"""Lagrange multipliers or dual signals."""
multipliers: Dict[str, float]
version: int
multipliers: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
class DualVariable:
"""Single dual-variable signal for compatibility with tests."""
def __init__(self, name: str, value: float, version: int):
self.name = name
self.value = value
self.version = version
def to_dict(self) -> Dict[str, Any]:
return {"name": self.name, "value": self.value, "version": self.version}
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "DualVariables",
"version": self.version,
"multipliers": self.multipliers,
}
@dataclass
class PlanDelta:
"""Incremental plan changes with crypto-like tags for auditability."""
contract_id: str
delta: Dict[str, Any]
timestamp: float
timestamp: datetime
author: str
contract_id: int
signature: str # placeholder for cryptographic tag
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def sign(self, private_key: str) -> None:
# Very small deterministic sign for demo purposes
payload = json.dumps({
"contract_id": self.contract_id,
"delta": self.delta,
"timestamp": self.timestamp.isoformat(),
"author": self.author,
}, sort_keys=True)
# naive sign: hash of payload + key
self.signature = hashlib.sha256((payload + private_key).encode()).hexdigest()
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "PlanDelta",
"contract_id": self.contract_id,
"delta": self.delta,
"timestamp": self.timestamp.isoformat(),
"author": self.author,
"signature": self.signature,
}
@dataclass
class PrivacyBudget:
"""Simple privacy budget block per signal."""
signal: str
budget: float
expiry: float
actor: str
remaining: float
expiry: datetime
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "PrivacyBudget",
"actor": self.actor,
"remaining": self.remaining,
"expiry": self.expiry.isoformat(),
}
@dataclass
class AuditLog:
"""Tamper-evident log entry for governance/audit."""
contract_id: str
entry: str
signer: str
timestamp: float
contract_id: int
timestamp: datetime
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "AuditLog",
"contract_id": self.contract_id,
"entry": self.entry,
"signer": self.signer,
"timestamp": self.timestamp.isoformat(),
}
class GraphOfContracts:
"""Minimal registry of adapters and schemas (GoC).
This is intentionally tiny but demonstrates API shape for a registry.
"""
def __init__(self) -> None:
self._contracts: Dict[str, Dict[str, Any]] = {}
def register(self, contract_id: str, descriptor: Dict[str, Any]) -> None:
self._contracts[contract_id] = descriptor
def list_contracts(self) -> List[Dict[str, Any]]:
return [{"contract_id": cid, **desc} for cid, desc in self._contracts.items()]
def get(self, contract_id: str) -> Optional[Dict[str, Any]]:
return self._contracts.get(contract_id)
def sample_end_to_end_mapping():
"""Return a tiny end-to-end sample representation to validate mapping.
This is a convenience helper and not part of the public API surface.
"""
lp = LocalProblem(
id="lp-0001",
domain="space-ops",
assets=["rover-1", "drone-a"],
objective={"maximize": {"util": 1.0}},
constraints=[{"power": {"<=": 100.0}}],
)
sv = SharedVariables(version=1, forecasts={"deadline": 1234}, priors={"p": 0.5})
dv = DualVariables(version=1, multipliers={"lambda": 0.1})
return lp.to_catopt(), sv.to_catopt(), dv.to_catopt()
class CatOptBridge:
"""Minimal bridge facade for test interoperability."""
@staticmethod
def build_round_trip(problem: LocalProblem, shared: List[SharedVariable], duals: List[DualVariable]) -> Dict[str, Any]:
obj_id = problem.id or problem.problem_id
payload = {
"object": {
"id": obj_id,
"domain": problem.domain,
"assets": problem.assets or problem.variables,
"objective": problem.objective,
"constraints": problem.constraints,
"version": problem.version,
},
"morphisms": [],
}
for s in (shared or []):
payload["morphisms"].append({"name": s.name, "value": s.value, "version": s.version})
for d in (duals or []):
payload["morphisms"].append({"name": d.name, "value": d.value, "version": d.version})
return {"kind": "RoundTrip", "payload": payload}
__all__ = [
"LocalProblem",
"SharedVariables",
"DualVariables",
"PlanDelta",
"PrivacyBudget",
"AuditLog",
"GraphOfContracts",
"sample_end_to_end_mapping",
"SharedVariable",
"DualVariable",
"CatOptBridge",
"to_catopt",
"from_catopt",
"Registry",
]
# Public helpers expected by tests
def to_catopt(lp: LocalProblem) -> Dict[str, Any]:
return lp.to_catopt()
def from_catopt(catopt: Dict[str, Any]) -> Optional[LocalProblem]:
payload = catopt.get("payload") or {}
if not payload:
return None
lp = LocalProblem(
id=payload.get("id"),
problem_id=payload.get("id"),
domain=payload.get("domain"),
assets=payload.get("assets") or payload.get("variables") or [],
objective=payload.get("objective"),
constraints=payload.get("constraints"),
version=payload.get("version", 1),
)
return lp
class Registry:
"""Tiny Graph-of-Contracts registry for adapters and data schemas."""
"""Lightweight contract registry used by tests."""
def __init__(self) -> None:
self._contracts: Dict[int, Dict[str, Any]] = {}
def register_contract(self, contract_id: int, schema: Dict[str, Any]) -> None:
self._contracts[contract_id] = dict(schema)
def register_contract(self, contract_id: int, descriptor: Dict[str, Any]) -> None:
self._contracts[contract_id] = descriptor
def get_contract(self, contract_id: int) -> Dict[str, Any]:
return self._contracts.get(contract_id, {})
def get_contract(self, contract_id: int) -> Optional[Dict[str, Any]]:
return self._contracts.get(contract_id)
def list_contracts(self) -> List[int]:
return sorted(self._contracts.keys())
def to_catopt(local_problem: LocalProblem) -> Dict[str, Any]:
"""Convert a LocalProblem into a CatOpt-like IR payload."""
payload = local_problem.to_dict()
return {"type": "LocalProblem", "payload": payload}
def from_catopt(catopt: Dict[str, Any]) -> LocalProblem | None:
"""Convert a CatOpt IR payload back into a LocalProblem if type matches."""
if not isinstance(catopt, dict) or catopt.get("type") != "LocalProblem":
return None
payload = catopt.get("payload", {})
try:
return LocalProblem(
id=payload["id"],
domain=payload["domain"],
assets=payload["assets"],
objective=payload["objective"],
constraints=payload["constraints"],
)
except KeyError:
return None
# Compatibility aliases for existing tests and __init__ expectations
class CatOptBridge(Registry):
"""Backward-compatible bridge facade exposing Registry-like API."""
pass
@classmethod
def build_round_trip(cls, problem: LocalProblem, shared: list, duals: list) -> dict:
"""Construct a RoundTrip envelope containing the problem and signals.
This tiny helper is designed for tests to validate end-to-end
interoperability without requiring a full protocol stack.
"""
payload = {
"object": {"id": getattr(problem, "problem_id", getattr(problem, "id", None))},
"morphisms": [] ,
}
for sv in (shared or []):
if hasattr(sv, "to_dict"):
payload["morphisms"].append(sv.to_dict())
else:
# fallback if plain dict
payload["morphisms"].append(dict(sv))
for dv in (duals or []):
if hasattr(dv, "to_dict"):
payload["morphisms"].append(dv.to_dict())
else:
payload["morphisms"].append(dict(dv))
return {"kind": "RoundTrip", "payload": payload}
# Public aliases expected by tests / API
ContractRegistry = Registry
return list(self._contracts.keys())

View File

@ -0,0 +1,51 @@
import datetime
import pytest
import os
import sys
# Ensure the local src/ package is on PYTHONPATH for tests
ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SRC = os.path.join(ROOT, "src")
if SRC not in sys.path:
sys.path.insert(0, SRC)
from cosmosmesh_privacy_preserving_federated.catopt_bridge import (
LocalProblem,
SharedVariables,
DualVariables,
PlanDelta,
PrivacyBudget,
AuditLog,
GraphOfContracts,
sample_end_to_end_mapping,
)
def test_catopt_basic_dataclasses_roundtrip():
lp = LocalProblem(
id="lp-xyz",
domain="test",
assets=["a1"],
objective={"maximize": 1.0},
constraints=[{"c": 1}],
)
sv = SharedVariables(version=1)
dv = DualVariables(version=1)
_ = lp.to_catopt()
_ = sv.to_catopt()
_ = dv.to_catopt()
def test_sample_end_to_end_mapping_returns_three_catopt_dicts():
lp_dict, sv_dict, dv_dict = sample_end_to_end_mapping()
assert isinstance(lp_dict, dict) and lp_dict.get("type") == "LocalProblem"
assert isinstance(sv_dict, dict) and sv_dict.get("type") == "SharedVariables"
assert isinstance(dv_dict, dict) and dv_dict.get("type") == "DualVariables"
def test_graph_of_contracts_basic():
g = GraphOfContracts()
g.register("contract-A", {"name": "TestAdapter", "version": "0.0.1"})
items = g.list_contracts()
assert isinstance(items, list) and items[0]["contract_id"] == "contract-A"