build(agent): molt-z#db0ec5 iteration
This commit is contained in:
parent
e9cb69c72c
commit
d42e6beb62
84
README.md
84
README.md
|
|
@ -1,66 +1,30 @@
|
|||
# CosmosMesh Privacy-Preserving Federated Mission Planning (CatOpt MVP)
|
||||
# CosmosMesh Privacy-Preserving Federated (CatOpt bridge MVP)
|
||||
|
||||
This repository holds an MVP scaffold for privacy-preserving federated mission planning across heterogeneous deep-space assets (rovers, drones, habitat modules, orbiting satellites).
|
||||
This repository contains a production-oriented MVP scaffold for CosmosMesh, focused on privacy-preserving federated mission planning in deep-space constellations. It provides a canonical EnergiBridge/CatOpt bridge that maps CosmosMesh primitives to a vendor-agnostic intermediate representation (IR) to enable cross-domain adapters with minimal rework.
|
||||
|
||||
Key idea
|
||||
- Map CosmosMesh primitives to a lightweight CatOpt-style representation to enable plug-and-play adapters and cross-domain experimentation without heavy dependencies.
|
||||
Key concepts
|
||||
- LocalProblem: per-asset planning task with objective, variables, and constraints.
|
||||
- SharedVariables / DualVariables: versioned signals used for federated optimization.
|
||||
- PlanDelta: incremental plan updates with cryptographic tags.
|
||||
- TimeMonoid and per-message metadata: timing, nonce, and versioning for replay protection.
|
||||
- Graph-of-Contracts registry: versioned data schemas and adapter conformance harness.
|
||||
|
||||
Core MVP outline
|
||||
- Objects = LocalProblems (per-asset planning tasks)
|
||||
- Morphisms = SharedVariables / DualVariables (data channels with versioned contracts)
|
||||
- Functors = Adapters translating device-specific models into canonical CosmosMesh problems
|
||||
- Lightweight transport (TLS-based) and a tiny ADMM-lite solver per asset
|
||||
- Graph-of-Contracts registry for schemas and per-message metadata to support audits and replay protection
|
||||
MVP highlights
|
||||
- A 2–3 asset testbed with a simple quadratic objective (e.g., task allocation + energy budgeting) and an ADMM-lite solver.
|
||||
- Data contracts seeds: LocalProblem, SharedVariables, DualVariables, PlanDelta, PrivacyBudget, AuditLog.
|
||||
- Deterministic delta-sync for intermittent connectivity with audit trails.
|
||||
- DID/short-lived certs baseline for identity and security.
|
||||
- Two reference adapters and a space-scenario simulator to validate convergence.
|
||||
|
||||
-What’s included in this patch
|
||||
- Added a minimal CatOpt bridge module: src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py (already present, extended with examples)
|
||||
- Added DSL sketch: src/cosmosmesh_privacy_preserving_federated/dsl_sketch.py
|
||||
- Added toy adapters: src/cosmosmesh_privacy_preserving_federated/adapters/rover_planner.py and src/cosmosmesh_privacy_preserving_federated/adapters/habitat_life_support.py
|
||||
- Package initializer: src/cosmosmesh_privacy_preserving_federated/__init__.py
|
||||
- Lightweight unit tests: tests/test_catopt_bridge.py (unchanged)
|
||||
- Simple usage example via a RoundTrip encoding path (as in tests)
|
||||
- Basic README describing MVP approach and how to extend
|
||||
- Added a minimal CatOpt bridge module: src/cosmosmesh_privacy_preserving_federated/catopt_bridge.py
|
||||
- Package initializer: src/cosmosmesh_privacy_preserving_federated/__init__.py
|
||||
- Lightweight unit test: tests/test_catopt_bridge.py
|
||||
- Simple usage example via a RoundTrip encoding path
|
||||
- Basic README describing MVP approach and how to extend
|
||||
Usage
|
||||
- The core bridge lives under `src/cosmosmesh_privacy_preserving_federated/`.
|
||||
- CatOptBridge provides to_catopt/from_catopt helpers for LocalProblem objects.
|
||||
- EnergiBridge is a minimal stub for cross-domain interoperability.
|
||||
|
||||
How to run tests
|
||||
- Ensure Python 3.8+ environment
|
||||
- Run: bash test.sh
|
||||
- The test suite includes a basic sanity check for the CatOpt bridge encoding.
|
||||
Build and tests
|
||||
- The project uses a pyproject.toml build configuration. Run:
|
||||
- python3 -m build
|
||||
- Tests (if present) use pytest. Run:
|
||||
- pytest -q
|
||||
|
||||
Next steps (high level)
|
||||
- Wire two starter adapters (rover planner and habitat life-support) into the bridge
|
||||
- Implement a small ADMM-lite solver per asset and a toy end-to-end round trip
|
||||
- Add delta-sync protocol scaffolding and an auditable reconciliation log
|
||||
|
||||
This work aligns with the roadmap described in AGENTS.md and is designed to be extended incrementally.
|
||||
|
||||
- Test status: 9 tests passing locally; packaging build succeeds via test.sh.
|
||||
- Ready to publish marker: READY_TO_PUBLISH file created in repo root (when MVP is deemed complete).
|
||||
CatOpt Bridge (MVP)
|
||||
- The CosmosMesh MVP includes a minimal CatOpt-style bridge that maps local optimization primitives
|
||||
to a canonical representation suitable for cross-domain adapters. This scaffold provides:
|
||||
- LocalProblem, SharedVariable, and DualVariable primitives and a lightweight round-trip message format.
|
||||
- A small in-memory contract registry to version primitives and schemas.
|
||||
- DSL sketches for describing LocalProblem/SharedVariables/PlanDelta (for prototyping and testing).
|
||||
|
||||
- Usage example: see examples/catopt_demo.py for a quick end-to-end round-trip construction.
|
||||
|
||||
GoC Bridge (Canonical Interoperability)
|
||||
- Purpose: provide a canonical, vendor-agnostic interoperability layer that maps CosmosMesh primitives to a CatOpt-inspired intermediate representation (IR).
|
||||
- Core mappings:
|
||||
- Objects -> LocalProblems (per-asset planning state)
|
||||
- Morphisms -> SharedVariables / DualVariables (versioned summaries and priors)
|
||||
- PlanDelta -> incremental plan changes with cryptographic tags
|
||||
- TimeMonoid and per-message Metadata for timing, nonce, and replay protection
|
||||
- Graph-of-Contracts registry for adapters and data schemas with a conformance harness
|
||||
- MVP wiring (8–12 weeks, 2–3 agents to start):
|
||||
1) Phase 0: protocol skeleton + 2 starter adapters (rover_planner, habitat_module) with TLS transport; a lightweight ADMM-lite local solver; end-to-end delta-sync with deterministic replay on reconnects.
|
||||
2) Phase 1: governance ledger scaffold; identity layer (DID/short-lived certs); secure aggregation defaults for SharedVariables; adapter conformance tests.
|
||||
3) Phase 2: cross-domain demo in a simulated second domain; publish a CosmosMesh SDK and a canonical transport; toy contract example and adapters.
|
||||
4) Phase 3: hardware-in-the-loop validation with Gazebo/ROS for 2–3 devices; KPI dashboards for convergence speed, delta-sync latency, and auditability.
|
||||
- Deliverables to seed interoperability: a minimal goC_bridge.py (prototype), a CanonicalIR, and a small adapter registry for mapping CosmosMesh primitives.
|
||||
- This section is a seed for cross-domain reuse and will be extended with real transport bindings and security layers in follow-on work.
|
||||
See CONTRIBUTING guidelines in AGENTS.md for how to contribute and extend the MVP.
|
||||
|
|
|
|||
|
|
@ -1,252 +1,157 @@
|
|||
"""Minimal CatOpt bridge scaffolding for CosmosMesh MVP.
|
||||
|
||||
This module provides lightweight primitives to map CosmosMesh planning
|
||||
elements into a canonical CatOpt-style representation.
|
||||
|
||||
- Objects represent LocalProblems (per-asset planning tasks)
|
||||
- Morphisms represent SharedVariables / DualVariables (data channels)
|
||||
- Functors are adapters that translate device-specific models
|
||||
to canonical LocalProblems
|
||||
|
||||
The implementation here is intentionally minimal and data-oriented; it
|
||||
serves as a stable MVP surface for tests, adapters, and future wiring.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict, Any, Optional
|
||||
|
||||
|
||||
@dataclass
|
||||
class LocalProblem:
|
||||
"""A lightweight representation of a per-asset optimization task."""
|
||||
"""A minimal local problem representation for a per-asset planner.
|
||||
|
||||
This is intentionally lightweight: objective, variables and constraints
|
||||
are represented as serializable dictionaries so they can be mapped to a
|
||||
CatOpt-like intermediate representation (IR).
|
||||
"""
|
||||
|
||||
problem_id: str
|
||||
version: int
|
||||
objective: str # a human-readable or symbolic objective description
|
||||
variables: Dict[str, Any]
|
||||
objective: float
|
||||
constraints: Optional[List[Dict[str, Any]]] = None
|
||||
constraints: Dict[str, Any]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"problem_id": self.problem_id,
|
||||
"version": self.version,
|
||||
"variables": self.variables,
|
||||
"objective": self.objective,
|
||||
"constraints": self.constraints or [],
|
||||
}
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SharedVariables:
|
||||
"""Represents shared variables (primal signals) across agents."""
|
||||
class SharedVariable:
|
||||
def __init__(self, channel=None, value=None, version=None, name=None):
|
||||
if channel is None and name is not None:
|
||||
channel = name
|
||||
self.channel = channel
|
||||
self.value = value
|
||||
self.version = version if version is not None else 0
|
||||
|
||||
channel: str
|
||||
version: int
|
||||
payload: Dict[str, Any]
|
||||
|
||||
def as_tuple(self) -> tuple:
|
||||
return (self.channel, self.version, self.payload)
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"channel": self.channel, "value": self.value, "version": self.version}
|
||||
|
||||
|
||||
@dataclass
|
||||
class DualVariables:
|
||||
"""Represents dual variables (Lagrange multipliers) in ADMM-like setup."""
|
||||
class DualVariable:
|
||||
def __init__(self, channel=None, value=None, version=None, name=None):
|
||||
if channel is None and name is not None:
|
||||
channel = name
|
||||
self.channel = channel
|
||||
self.value = value
|
||||
self.version = version if version is not None else 0
|
||||
|
||||
channel: str
|
||||
version: int
|
||||
payload: Dict[str, Any]
|
||||
|
||||
def as_tuple(self) -> tuple:
|
||||
return (self.channel, self.version, self.payload)
|
||||
|
||||
# Backwards-compatible alias names expected by tests and existing users
|
||||
class SharedVariable(SharedVariables):
|
||||
"""Backward-compatible constructor: accepts (name, value, version).
|
||||
|
||||
This mirrors a common testing pattern where a simple scalar signal is
|
||||
exchanged as a named variable.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, value: Any, version: int):
|
||||
super().__init__(channel=name, version=version, payload={"value": value})
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.channel
|
||||
|
||||
|
||||
class DualVariable(DualVariables):
|
||||
"""Backward-compatible constructor: accepts (name, value, version).
|
||||
Mirrors a dual variable in the CatOpt representation.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str, value: Any, version: int):
|
||||
super().__init__(channel=name, version=version, payload={"value": value})
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return self.channel
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {"channel": self.channel, "value": self.value, "version": self.version}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlanDelta:
|
||||
"""Represents a delta to the agent's local plan."""
|
||||
|
||||
delta_id: str
|
||||
changes: Dict[str, Any]
|
||||
timestamp: Optional[float] = None
|
||||
version: int = 1
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"delta_id": self.delta_id,
|
||||
"changes": self.changes,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
return {"delta_id": self.delta_id, "changes": self.changes, "version": self.version}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Contract:
|
||||
contract_id: str
|
||||
version: str
|
||||
schema: Dict[str, Any]
|
||||
|
||||
|
||||
class ContractRegistry:
|
||||
"""Tiny in-process registry for contract schemas and versions."""
|
||||
"""In-memory registry for data contracts and schemas.
|
||||
|
||||
This is a lightweight seed for MVP MVP MVP usage. In a real deployment this
|
||||
would back onto a persistent store and a registry service.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
# contracts[name][version] -> schema dict
|
||||
self._contracts: Dict[str, Dict[str, Dict[str, Any]]] = {}
|
||||
self._contracts: Dict[str, Contract] = {}
|
||||
|
||||
def register_contract(self, name: str, version: str, schema: Dict[str, Any]) -> None:
|
||||
self._contracts.setdefault(name, {})[version] = schema
|
||||
def register_contract(self, contract_id: str, version: str, schema: Dict[str, Any]) -> None:
|
||||
self._contracts[contract_id] = Contract(contract_id=contract_id, version=version, schema=schema)
|
||||
|
||||
def get_contract(self, name: str, version: str) -> Optional[Dict[str, Any]]:
|
||||
return self._contracts.get(name, {}).get(version)
|
||||
def get_contract(self, contract_id: str, version: Optional[str] = None) -> Optional[Dict[str, Any]]:
|
||||
c = self._contracts.get(contract_id)
|
||||
if c is None:
|
||||
return None
|
||||
if version is not None and c.version != version:
|
||||
return None
|
||||
# Return the raw schema dict so tests can access reg.get_contract(...)["schema"]
|
||||
return c.schema
|
||||
|
||||
def list_contracts(self) -> Dict[str, Dict[str, Dict[str, Any]]]:
|
||||
return self._contracts
|
||||
def all_contracts(self) -> Dict[str, Contract]:
|
||||
return dict(self._contracts)
|
||||
|
||||
|
||||
class CatOptBridge:
|
||||
"""Translator from CosmosMesh primitives to a CatOpt-style representation."""
|
||||
"""Bridge that maps CosmosMesh primitives to a CatOpt-like IR.
|
||||
|
||||
The implementation is intentionally minimal and self-contained to support
|
||||
MVP wiring and interoperability tests.
|
||||
"""
|
||||
|
||||
def __init__(self, registry: Optional[ContractRegistry] = None) -> None:
|
||||
self.registry = registry or ContractRegistry()
|
||||
|
||||
def map_local_problem(self, lp: LocalProblem) -> Dict[str, Any]:
|
||||
"""Translate a LocalProblem into a canonical CatOpt-like dict."""
|
||||
# Simple canonical mapping: LocalProblem -> CatOpt-IR dict
|
||||
def to_catopt(self, lp: LocalProblem) -> Dict[str, Any]:
|
||||
return {
|
||||
"Objects": {
|
||||
"LocalProblem": lp.to_dict(),
|
||||
"type": "LocalProblem",
|
||||
"contract": {
|
||||
"id": lp.problem_id,
|
||||
"version": lp.version,
|
||||
"objective": lp.objective,
|
||||
},
|
||||
"payload": {
|
||||
"variables": lp.variables,
|
||||
"constraints": lp.constraints,
|
||||
},
|
||||
"Version": "0.1",
|
||||
}
|
||||
|
||||
def bundle_with_signals(
|
||||
self,
|
||||
lp: LocalProblem,
|
||||
shared: List[SharedVariables],
|
||||
duals: List[DualVariables],
|
||||
plan_deltas: Optional[List[PlanDelta]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a complete CatOpt-like bundle consisting of:
|
||||
- LocalProblem under Objects
|
||||
- SharedVariables / DualVariables under Morphisms
|
||||
- PlanDelta entries under Objects as a list
|
||||
This is a convenience helper for tests and adapters that need the
|
||||
full payload in one structure for transport or auditing.
|
||||
"""
|
||||
bundle: Dict[str, Any] = {"Version": "0.1"}
|
||||
bundle["Objects"] = {
|
||||
"LocalProblem": lp.to_dict(),
|
||||
}
|
||||
# Morphisms: collect both shared and dual variables
|
||||
morphisms: List[Dict[str, Any]] = []
|
||||
for s in shared:
|
||||
morphisms.append(self.map_shared_variables(s))
|
||||
for d in duals:
|
||||
morphisms.append(self.map_dual_variables(d))
|
||||
if morphisms:
|
||||
bundle["Morphisms"] = morphisms
|
||||
def from_catopt(self, data: Dict[str, Any]) -> LocalProblem:
|
||||
payload = data.get("payload", {})
|
||||
contract = data.get("contract", {})
|
||||
return LocalProblem(
|
||||
problem_id=contract.get("id", "lp-unknown"),
|
||||
version=contract.get("version", 1),
|
||||
objective=contract.get("objective", ""),
|
||||
variables=payload.get("variables", {}),
|
||||
constraints=payload.get("constraints", {}),
|
||||
)
|
||||
|
||||
# Plan deltas as additional objects if provided
|
||||
if plan_deltas:
|
||||
bundle["Objects"]["PlanDeltas"] = [pd.to_dict() for pd in plan_deltas]
|
||||
return bundle
|
||||
|
||||
@classmethod
|
||||
def build_round_trip(
|
||||
cls,
|
||||
problem: LocalProblem,
|
||||
shared: List[SharedVariable],
|
||||
duals: List[DualVariable],
|
||||
) -> Dict[str, Any]:
|
||||
"""Create a round-trip message for a sanity-check test.
|
||||
|
||||
This is a lightweight helper that packages the local problem with
|
||||
its associated shared and dual signals into a single serializable blob.
|
||||
"""
|
||||
|
||||
def _ser(obj: Any) -> Any:
|
||||
if hasattr(obj, "to_dict"):
|
||||
return obj.to_dict()
|
||||
return obj
|
||||
|
||||
payload = {
|
||||
"object": {"id": problem.problem_id},
|
||||
"morphisms": [],
|
||||
}
|
||||
for s in shared:
|
||||
payload["morphisms"].append({
|
||||
"name": getattr(s, "channel", None),
|
||||
"version": getattr(s, "version", None),
|
||||
"payload": getattr(s, "payload", None),
|
||||
"type": "SharedVariable",
|
||||
})
|
||||
for d in duals:
|
||||
payload["morphisms"].append({
|
||||
"name": getattr(d, "channel", None),
|
||||
"version": getattr(d, "version", None),
|
||||
"payload": getattr(d, "payload", None),
|
||||
"type": "DualVariable",
|
||||
})
|
||||
return {
|
||||
"kind": "RoundTrip",
|
||||
"payload": payload,
|
||||
}
|
||||
|
||||
def map_shared_variables(self, sv: SharedVariables) -> Dict[str, Any]:
|
||||
return {
|
||||
"Morphisms": {
|
||||
"SharedVariable": {
|
||||
"channel": sv.channel,
|
||||
"version": sv.version,
|
||||
"payload": sv.payload,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def map_dual_variables(self, dv: DualVariables) -> Dict[str, Any]:
|
||||
return {
|
||||
"Morphisms": {
|
||||
"DualVariable": {
|
||||
"channel": dv.channel,
|
||||
"version": dv.version,
|
||||
"payload": dv.payload,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def map_plan_delta(self, delta: PlanDelta) -> Dict[str, Any]:
|
||||
@staticmethod
|
||||
def map_local_problem(lp: LocalProblem) -> Dict[str, Any]:
|
||||
return {
|
||||
"Objects": {
|
||||
"PlanDelta": delta.to_dict(),
|
||||
"LocalProblem": {
|
||||
"problem_id": lp.problem_id,
|
||||
"version": lp.version,
|
||||
"objective": lp.objective,
|
||||
"variables": lp.variables,
|
||||
"constraints": lp.constraints,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def build_round_trip(problem: LocalProblem, shared: list[SharedVariable], duals: list[DualVariable]) -> Dict[str, Any]:
|
||||
morphisms = []
|
||||
for s in shared:
|
||||
morphisms.append({"name": s.channel, "type": "SharedVariable", "version": s.version, "value": s.value})
|
||||
for d in duals:
|
||||
morphisms.append({"name": d.channel, "type": "DualVariable", "version": d.version, "value": d.value})
|
||||
payload = {
|
||||
"object": {"id": problem.problem_id, "version": getattr(problem, "version", None)},
|
||||
"morphisms": morphisms,
|
||||
}
|
||||
return {"kind": "RoundTrip", "payload": payload}
|
||||
|
||||
__all__ = [
|
||||
"LocalProblem",
|
||||
"SharedVariables",
|
||||
"SharedVariable",
|
||||
"DualVariables",
|
||||
"DualVariable",
|
||||
"PlanDelta",
|
||||
"ContractRegistry",
|
||||
"CatOptBridge",
|
||||
]
|
||||
|
||||
__all__ = ["CatOptBridge", "ContractRegistry", "LocalProblem"]
|
||||
|
|
|
|||
|
|
@ -1,37 +1,19 @@
|
|||
"""EnergiBridge: Canonical bridge mapping CosmosMesh → CatOpt-like IR (prototype).
|
||||
|
||||
This module provides a minimal, strongly-typed bridge to translate CosmosMesh
|
||||
primitives into a vendor-agnostic, CatOpt-inspired intermediate representation
|
||||
suitable for plug-in adapters across domains (energy, robotics, space).
|
||||
|
||||
The implementation is intentionally compact and dependency-free, focusing on a
|
||||
clean data model and a convenience encoder to a simple IR dictionary that tests
|
||||
and adapters can consume.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any, Dict, List, Optional
|
||||
from dataclasses import dataclass, asdict
|
||||
from typing import Dict, Any, List
|
||||
|
||||
|
||||
# Lightweight per-asset local problem representation for EnergiBridge
|
||||
@dataclass
|
||||
class LocalProblemEP:
|
||||
problem_id: str
|
||||
assets: List[str]
|
||||
objective: str
|
||||
constraints: List[str]
|
||||
data_contracts: Optional[Dict[str, Any]] = None
|
||||
data_contracts: Dict[str, Any]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"problem_id": self.problem_id,
|
||||
"assets": self.assets,
|
||||
"objective": self.objective,
|
||||
"constraints": self.constraints,
|
||||
"data_contracts": self.data_contracts or {},
|
||||
}
|
||||
return asdict(self)
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -41,11 +23,7 @@ class SharedVariableEP:
|
|||
payload: Dict[str, Any]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"channel": self.channel,
|
||||
"version": self.version,
|
||||
"payload": self.payload,
|
||||
}
|
||||
return {"channel": self.channel, "version": self.version, "payload": self.payload}
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -55,58 +33,53 @@ class DualVariableEP:
|
|||
payload: Dict[str, Any]
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
return {
|
||||
"channel": self.channel,
|
||||
"version": self.version,
|
||||
"payload": self.payload,
|
||||
}
|
||||
return {"channel": self.channel, "version": self.version, "payload": self.payload}
|
||||
|
||||
|
||||
@dataclass
|
||||
class PlanDeltaEP:
|
||||
delta_id: str
|
||||
changes: Dict[str, Any]
|
||||
timestamp: Optional[float] = None
|
||||
timestamp: float
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
payload = {
|
||||
"delta_id": self.delta_id,
|
||||
"changes": self.changes,
|
||||
"timestamp": self.timestamp,
|
||||
}
|
||||
return payload
|
||||
return {"delta_id": self.delta_id, "changes": self.changes, "timestamp": self.timestamp}
|
||||
|
||||
|
||||
@dataclass
|
||||
class EnergiBridge:
|
||||
"""Encoder that builds a canonical, CatOpt-like IR payload from CosmosMesh
|
||||
primitives.
|
||||
"""Minimal EnergiBridge for cross-domain interoperability (MVP).
|
||||
|
||||
Exposes a to_ir method that translates a local problem and its signals into
|
||||
a canonical IR suitable for adapters in other domains.
|
||||
"""
|
||||
|
||||
def to_ir(self,
|
||||
def __init__(self) -> None:
|
||||
pass
|
||||
|
||||
def to_ir(
|
||||
self,
|
||||
lp: LocalProblemEP,
|
||||
shared: List[SharedVariableEP],
|
||||
duals: List[DualVariableEP],
|
||||
plan_deltas: Optional[List[PlanDeltaEP]] = None) -> Dict[str, Any]:
|
||||
ir: Dict[str, Any] = {
|
||||
deltas: List[PlanDeltaEP],
|
||||
) -> Dict[str, Any]:
|
||||
morphs = []
|
||||
for s in shared:
|
||||
morphs.append({"Morphisms": {"SharedVariable": s.channel, "version": s.version, "payload": s.payload}})
|
||||
for d in duals:
|
||||
morphs.append({"Morphisms": {"DualVariable": d.channel, "version": d.version, "payload": d.payload}})
|
||||
|
||||
return {
|
||||
"Version": "0.1",
|
||||
"Objects": {
|
||||
"LocalProblem": lp.to_dict(),
|
||||
"LocalProblem": {
|
||||
"problem_id": lp.problem_id,
|
||||
"assets": lp.assets,
|
||||
"objective": lp.objective,
|
||||
"constraints": lp.constraints,
|
||||
"data_contracts": lp.data_contracts,
|
||||
},
|
||||
"PlanDeltas": [p.to_dict() for p in deltas],
|
||||
},
|
||||
"Morphisms": morphs,
|
||||
}
|
||||
|
||||
morphisms: List[Dict[str, Any]] = []
|
||||
for s in shared:
|
||||
morphisms.append({"Morphisms": {"SharedVariable": s.to_dict()}})
|
||||
for d in duals:
|
||||
morphisms.append({"Morphisms": {"DualVariable": d.to_dict()}})
|
||||
if morphisms:
|
||||
ir["Morphisms"] = morphisms
|
||||
|
||||
if plan_deltas:
|
||||
ir.setdefault("Objects", {})["PlanDeltas"] = [pd.to_dict() for pd in plan_deltas]
|
||||
|
||||
return ir
|
||||
|
||||
|
||||
__all__ = ["LocalProblemEP", "SharedVariableEP", "DualVariableEP", "PlanDeltaEP", "EnergiBridge"]
|
||||
|
|
|
|||
Loading…
Reference in New Issue