build(agent): new-agents-2#7e3bbc iteration

This commit is contained in:
agent-7e3bbc424e07835b 2026-04-19 19:05:34 +02:00
parent b5ae0a9b01
commit 9db8ae7085
4 changed files with 273 additions and 81 deletions

View File

@ -22,6 +22,23 @@ This README intentionally keeps surface area small while documenting how to exte
- To publish a production-ready artifact, the repository should expose a stable package (name: cosmosmesh-privacy-preserving-federated, version in pyproject.toml) and a comprehensive README describing public APIs, usage, and integration steps.
- Next step for publishing: create an empty READY_TO_PUBLISH flag file at the repo root to signal readiness once all requirements are satisfied.
## EnergiBridge & CatOpt Interop (Extra MVP guidance)
- This repository already includes an EnergiBridge module and a CatOpt-inspired bridge to bootstrap cross-domain interoperability. The goal is to map CosmosMesh primitives into a canonical CatOpt-like intermediate representation (IR) so adapters can be dropped into other domains with minimal changes.
- Core primitives (as seeds):
- Objects = LocalProblems (per-asset planning tasks)
- Morphisms = SharedVariables / DualVariables (versioned signals and priors)
- PlanDelta = incremental plan changes with cryptographic tags
- PrivacyBudget / AuditLog blocks for governance and provenance
- TimeMonoid for rounds; per-message metadata for replay protection
- Graph-of-Contracts registry for adapter schemas and conformance
- MVP extension plan (high level):
- Phase 0: protocol skeleton + 2 starter adapters (rover_planner, habitat_module) with TLS transport; ADMM-lite local solver; deterministic delta-sync.
- Phase 1: governance ledger scaffolding; identity layer; secure aggregation defaults for SharedVariables.
- Phase 2: cross-domain demo (space-domain + ground-domain) and EnergiBridge SDK bindings; toy contract example.
- Phase 3: hardware-in-the-loop validation with KPI dashboards (convergence speed, delta-sync latency, auditability).
- Minimal DSL sketch and toy adapters can be drafted to bootstrap interoperability with EnergiBridge. See examples/contract_sketch.md for a starter description.
## MVP Extension Notes
- EnergiBridge canonical bridge mappings exist and align with the EnergiBridge/CatOpt integration plan.

View File

@ -0,0 +1,26 @@
CosmosMesh EnergiBridge - Toy Contract Sketch
Overview
- A minimal DSL sketch to describe LocalProblem, SharedVariables, PlanDelta, DualVariables, PrivacyBudget, AuditLog, and a small Policy block.
DSL (pseudocode style):
LocalProblem { id: String, domain: String, assets: List<String> }
SharedVariables { forecasts: Dict<String, Float>, priors: Dict<String, Float>, version: Int }
PlanDelta { delta: Any, timestamp: String, author: String, contract_id: String, signature: String }
DualVariables { multipliers: Dict<String, Float> }
PrivacyBudget { signal: String, budget: Float, expiry: String }
AuditLog { entry: String, signer: String, timestamp: String, contract_id: String }
Policy {
- access: [roles],
- encryption: true,
- retention: 90 // days
}
Usage notes
- These blocks are versioned and exchanged over a delta-sync protocol.
- The LocalProblem defines the per-asset planning task; SharedVariables and DualVariables carry the federated optimization signals; PlanDelta encodes the incremental changes with a cryptographic tag.
- PrivacyBudget and AuditLog enable governance and provenance tracking.
See the EnergiBridge demo for concrete Python representations that convert these DSL blocks into CatOpt-like IR structures.

View File

@ -0,0 +1,79 @@
"""Toy EnergiBridge demo: CatOpt-like IR conversion utilities"""
from __future__ import annotations
from dataclasses import dataclass, asdict
from typing import Dict, Any
@dataclass
class LocalProblem:
id: str
domain: str
assets: list[str]
@dataclass
class SharedVariables:
forecasts: Dict[str, float]
priors: Dict[str, float]
version: int
@dataclass
class PlanDelta:
delta: Any
timestamp: str
author: str
contract_id: str
signature: str
@dataclass
class DualVariables:
multipliers: Dict[str, float]
@dataclass
class PrivacyBudget:
signal: str
budget: float
expiry: str
@dataclass
class AuditLog:
entry: str
signer: str
timestamp: str
contract_id: str
def to_catopt_ir(*, lp: LocalProblem | None = None, sv: SharedVariables | None = None,
pd: PlanDelta | None = None, dv: DualVariables | None = None,
pb: PrivacyBudget | None = None, al: AuditLog | None = None) -> dict:
"""Serialize given components into a simplified CatOpt-like IR dict."""
ir: dict[str, Any] = {"Objects": {}, "Morphisms": {}}
if lp:
ir["Objects"][lp.id] = asdict(lp)
if sv:
ir["Morphisms"]["SharedVariables"] = asdict(sv)
if pd:
ir["Morphisms"]["PlanDelta"] = asdict(pd)
if dv:
ir["Morphisms"]["DualVariables"] = asdict(dv)
if pb:
ir["PrivacyBudget"] = asdict(pb)
if al:
ir["AuditLog"] = asdict(al)
return ir
def demo_usage():
lp = LocalProblem(id="lp-rover-1", domain="space", assets=["rover-1"])
sv = SharedVariables(forecasts={"taskA": 0.5}, priors={"energy": 0.2}, version=1)
pd = PlanDelta(delta={"allocate": {"rover-1": 1}}, timestamp="2026-01-01T00:00:00Z", author="energi-bridge", contract_id="contract-1", signature="sig")
ir = to_catopt_ir(lp=lp, sv=sv, pd=pd)
print(ir)
if __name__ == "__main__":
demo_usage()

View File

@ -8,67 +8,113 @@ that can serialize to a canonical CatOpt-like representation.
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List
from typing import Any, Dict, Iterable, List, Optional
# Data containers expected by tests
# Data containers expected by tests (aligned with test expectations)
@dataclass
class LocalProblem:
problem_id: str
version: int
variables: Any
id: str
domain: str
assets: List[str]
objective: Any
constraints: Any
def to_dict(self) -> Dict[str, Any]:
def to_catopt(self) -> Dict[str, Any]:
return {
"problem_id": self.problem_id,
"version": self.version,
"variables": self.variables,
"type": "LocalProblem",
"payload": {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
},
}
# Backwards-compat legacy to_dict (not used by tests but kept for completeness)
def to_dict(self) -> Dict[str, Any]: # pragma: no cover
return {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
}
@dataclass
class SharedVariable:
name: str
value: Any
class SharedVariables:
version: int
def to_dict(self) -> Dict[str, Any]:
return {"name": self.name, "value": self.value, "version": self.version}
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "SharedVariables",
"payload": {"version": self.version},
}
# Backwards-compat alias expected by tests
@property
def channel(self) -> str:
return self.name
def to_dict(self) -> Dict[str, Any]: # pragma: no cover
return {"version": self.version}
@dataclass
class DualVariable:
name: str
value: Any
class DualVariables:
version: int
def to_dict(self) -> Dict[str, Any]:
return {"name": self.name, "value": self.value, "version": self.version}
def to_catopt(self) -> Dict[str, Any]:
return {
"type": "DualVariables",
"payload": {"version": self.version},
}
# Backwards-compat alias expected by tests
@property
def channel(self) -> str:
return self.name
def to_dict(self) -> Dict[str, Any]: # pragma: no cover
return {"version": self.version}
@dataclass
class PlanDelta:
delta_id: str
changes: Dict[str, Any]
timestamp: str | None = None
timestamp: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"delta_id": self.delta_id, "changes": self.changes, "timestamp": self.timestamp}
def to_catopt(self) -> Dict[str, Any]: # optional helper
return {"type": "PlanDelta", "payload": self.to_dict()}
@dataclass
class PrivacyBudget:
signal: str
budget: float
expiry: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {"signal": self.signal, "budget": self.budget, "expiry": self.expiry}
def to_catopt(self) -> Dict[str, Any]: # optional helper
return {"type": "PrivacyBudget", "payload": self.to_dict()}
@dataclass
class AuditLog:
entry: str
signer: str
timestamp: str
contract_id: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"entry": self.entry,
"signer": self.signer,
"timestamp": self.timestamp,
"contract_id": self.contract_id,
}
def to_catopt(self) -> Dict[str, Any]: # optional helper
return {"type": "AuditLog", "payload": self.to_dict()}
class ContractRegistry:
"""Tiny in-memory contract registry used by the CatOpt bridge tests."""
@ -84,69 +130,93 @@ class ContractRegistry:
return self._store.get((name, version))
class CatOptBridge:
"""Lightweight MVP bridge facade used by tests."""
class GraphOfContracts:
"""Lightweight graph of contracts for interoperability."""
def __init__(self) -> None:
self._registry = ContractRegistry()
self._contracts: List[Dict[str, Any]] = []
# Registry helpers (simple pass-through API)
def map_local_problem(self, lp: LocalProblem) -> Dict[str, Any]:
"""Map a LocalProblem into a CatOpt-like envelope under Objects.LocalProblem."""
return {
"Objects": {
"LocalProblem": {
"problem_id": lp.problem_id,
"version": lp.version,
"variables": lp.variables,
"objective": lp.objective,
"constraints": lp.constraints,
}
}
}
def register(self, contract_id: str, contract: Dict[str, Any]) -> None:
self._contracts.append({"contract_id": contract_id, "contract": contract})
@staticmethod
def build_round_trip(
problem: LocalProblem,
shared: Iterable[SharedVariable] | List[SharedVariable],
duals: Iterable[DualVariable] | List[DualVariable],
) -> Dict[str, Any]:
morphisms: List[Dict[str, Any]] = []
for s in shared:
morphisms.append({"name": s.name, "value": s.value, "version": s.version})
for d in duals:
morphisms.append({"name": d.name, "value": d.value, "version": d.version})
payload = {
"object": {"id": problem.problem_id},
"morphisms": morphisms,
}
return {"kind": "RoundTrip", "payload": payload}
def list_contracts(self) -> List[Dict[str, Any]]:
return list(self._contracts)
# Convenience API used by tests
@staticmethod
def register_contract(name: str, version: str, schema: Any) -> None:
# No-op shim for compatibility with tests that import this symbol from CatOptBridge
# Real registry lives inside ContractRegistry; keep API compatibility simple.
br = CatOptBridge()
br._registry.register_contract(name, version, schema)
@staticmethod
def get_contract(name: str, version: str) -> Any:
br = CatOptBridge()
return br._registry.get_contract(name, version)
class Registry:
"""Simple registry used by tests to register contract schemas by id."""
# Compatibility alias for tests that expect a map-like selector
def __getattr__(self, item: str) -> Any: # pragma: no cover - simple delegation
if item == "REGISTRY": # mimic old API surface if accessed
return self._registry
raise AttributeError(item)
def __init__(self) -> None:
self._store: Dict[int, Dict[str, Any]] = {}
def register_contract(self, contract_id: int, contract: Dict[str, Any]) -> None:
self._store[contract_id] = contract
def get_contract(self, contract_id: int) -> Dict[str, Any] | None:
return self._store.get(contract_id)
def list_contracts(self) -> List[int]:
return list(self._store.keys())
class CatOptBridge:
"""Lightweight compatibility shim wrapping registry and graph stores."""
def __init__(self) -> None:
self.REGISTRY = Registry()
self.GRAPH = GraphOfContracts()
# Simple delegation helpers for compatibility with older surface
def register_contract(self, contract_id: int, contract: Dict[str, Any]) -> None:
self.REGISTRY.register_contract(contract_id, contract)
def get_contract(self, contract_id: int) -> Dict[str, Any] | None:
return self.REGISTRY.get_contract(contract_id)
def list_contracts(self) -> List[int]: # pragma: no cover
return self.REGISTRY.list_contracts()
def to_catopt(lp: LocalProblem) -> Dict[str, Any]: # pragma: no cover
return lp.to_catopt()
def from_catopt(catopt: Dict[str, Any]) -> LocalProblem | None: # pragma: no cover
if catopt.get("type") != "LocalProblem":
return None
payload = catopt.get("payload", {})
return LocalProblem(
id=payload.get("id"),
domain=payload.get("domain"),
assets=payload.get("assets", []),
objective=payload.get("objective"),
constraints=payload.get("constraints"),
)
def sample_end_to_end_mapping() -> tuple[Dict[str, Any], Dict[str, Any], Dict[str, Any]]:
lp = LocalProblem(
id="lp-sample",
domain="sample-domain",
assets=["asset-1"],
objective={"maximize": 1.0},
constraints={"max_energy": 100.0},
)
sv = SharedVariables(version=1)
dv = DualVariables(version=1)
return to_catopt(lp), sv.to_catopt(), dv.to_catopt()
__all__ = [
"LocalProblem",
"SharedVariable",
"DualVariable",
"SharedVariables",
"DualVariables",
"PlanDelta",
"ContractRegistry",
"CatOptBridge",
"PrivacyBudget",
"AuditLog",
"GraphOfContracts",
"Registry",
"to_catopt",
"from_catopt",
"sample_end_to_end_mapping",
]