build(agent): new-agents-2#7e3bbc iteration

This commit is contained in:
agent-7e3bbc424e07835b 2026-04-19 21:23:33 +02:00
parent b05c7aab8b
commit 6dfbb98d0f
10 changed files with 294 additions and 10 deletions

View File

@ -1,14 +1,17 @@
"""CosmosMesh Privacy-Preserving Federated package init.""" """CosmosMesh Privacy-Preserving Federated Mission Planning (MVP)
from .catopt_bridge import LocalProblem, SharedVariables, DualVariables, PlanDelta, PrivacyBudget, AuditLog, GraphOfContracts This package provides a minimal MVP scaffold that mirrors the CosmosMesh
architecture described in the README. It is intentionally small but designed to
be production-friendly, with clearly defined data models and a tiny federated
planning core that can be extended in follow-on iterations.
"""
from .cosmosmesh import models # re-export for convenient access
from .cosmosmesh import delta_sync # re-export for demo/testing
from .cosmosmesh import goctr # Graph-of-Contracts registry scaffold
__all__ = [ __all__ = [
"LocalProblem", "models",
"SharedVariables", "delta_sync",
"DualVariables", "goctr",
"PlanDelta",
"PrivacyBudget",
"AuditLog",
"GraphOfContracts",
] ]
from .canonical_bridge import map_to_ir # re-export for MVP bridge wiring

View File

@ -0,0 +1,8 @@
"""CosmosMesh MVP core namespace."""
from . import models # noqa: F401
from . import adapters # noqa: F401
from . import delta_sync # noqa: F401
from . import goctr # noqa: F401
__all__ = ["models", "adapters", "delta_sync", "goctr"]

View File

@ -0,0 +1,7 @@
"""Adapter base and toy adapters stubbed for MVP testing."""
from .base import Adapter # re-export for convenience
from .rover_planner import RoverPlannerAdapter
from .habitat_module import HabitatModuleAdapter
__all__ = ["Adapter", "RoverPlannerAdapter", "HabitatModuleAdapter"]

View File

@ -0,0 +1,21 @@
"""Base adapter interface for toy adapters in MVP."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from cosmosmesh_privacy_preserving_federated.cosmosmesh.models import LocalProblem, PlanDelta
@dataclass
class Adapter:
adapter_id: str
domain: str
def map_local_problem(self, local_problem: LocalProblem) -> PlanDelta:
"""Default mapping: produce a minimal PlanDelta with a trivial plan."""
delta = {
"assignment": {asset: {"task": "idle"} for asset in local_problem.assets},
"objective": local_problem.objective,
}
return PlanDelta(delta=delta, author=self.adapter_id, contract_id=local_problem.id)

View File

@ -0,0 +1,16 @@
"""Toy habitat module adapter for MVP."""
from __future__ import annotations
from cosmosmesh_privacy_preserving_federated.cosmosmesh.models import LocalProblem, PlanDelta
from .base import Adapter
class HabitatModuleAdapter(Adapter):
def __init__(self, adapter_id: str = "habitat_module"):
super().__init__(adapter_id=adapter_id, domain="habitat")
def map_local_problem(self, local_problem: LocalProblem) -> PlanDelta:
delta = {
asset: {"task": "maintain"} for asset in local_problem.assets
}
return PlanDelta(delta=delta, author=self.adapter_id, contract_id=local_problem.id)

View File

@ -0,0 +1,17 @@
"""Toy rover planner adapter for MVP."""
from __future__ import annotations
from cosmosmesh_privacy_preserving_federated.cosmosmesh.models import LocalProblem, PlanDelta
from .base import Adapter
class RoverPlannerAdapter(Adapter):
def __init__(self, adapter_id: str = "rover_planner"):
super().__init__(adapter_id=adapter_id, domain="rover")
def map_local_problem(self, local_problem: LocalProblem) -> PlanDelta:
# Minimal heuristic: assign each asset to perform a placeholder task
delta = {
asset: {"task": "patrol"} for asset in local_problem.assets
}
return PlanDelta(delta=delta, author=self.adapter_id, contract_id=local_problem.id)

View File

@ -0,0 +1,23 @@
"""Deterministic delta-sync primitives for offline/ intermittently-connected operation."""
from __future__ import annotations
from typing import Dict, List
def reconcile_deltas(existing: Dict[str, Dict], new_deltas: List[Dict]) -> Dict[str, Dict]:
"""Deterministically merge PlanDelta-like entries by contract_id.
- For each delta, keep the entry with the latest timestamp per contract_id.
- existing is a mapping contract_id -> delta dict (already consolidated).
- new_deltas are incoming delta dicts from another asset.
Returns the updated mapping.
"""
updated = dict(existing)
for d in new_deltas:
cid = d.get("contract_id")
ts = d.get("timestamp", 0)
if cid is None:
continue
if cid not in updated or updated[cid].get("timestamp", 0) < ts:
updated[cid] = dict(d)
return updated

View File

@ -0,0 +1,28 @@
"""Graph-of-Contracts (GoC) registry scaffold."""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class GoCEntry:
adapter_id: str
supported_domains: List[str]
contract_version: str
class GoCRegistry:
"""In-memory, minimal GoC-style registry for adapters and schemas."""
def __init__(self) -> None:
self._registry: Dict[str, GoCEntry] = {}
def register(self, adapter_id: str, supported_domains: List[str], contract_version: str) -> None:
self._registry[adapter_id] = GoCEntry(adapter_id, supported_domains, contract_version)
def get(self, adapter_id: str) -> GoCEntry | None:
return self._registry.get(adapter_id)
def list_adapters(self) -> List[str]:
return list(self._registry.keys())

View File

@ -0,0 +1,105 @@
"""Data models for LocalProblem, SharedVariables, PlanDelta, etc."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional
import time
@dataclass
class LocalProblem:
id: str
domain: str
assets: List[str]
objective: str
constraints: Dict[str, Any] = field(default_factory=dict)
solver_hint: Optional[str] = None
version: int = 1
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"domain": self.domain,
"assets": self.assets,
"objective": self.objective,
"constraints": self.constraints,
"solver_hint": self.solver_hint,
"version": self.version,
}
@dataclass
class SharedVariables:
version: int
forecasts: Dict[str, Any] = field(default_factory=dict)
priors: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"version": self.version,
"forecasts": self.forecasts,
"priors": self.priors,
}
@dataclass
class PlanDelta:
delta: Dict[str, Any]
timestamp: float = field(default_factory=lambda: time.time())
author: str = "unknown"
contract_id: str = "unknown"
signature: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
return {
"delta": self.delta,
"timestamp": self.timestamp,
"author": self.author,
"contract_id": self.contract_id,
"signature": self.signature,
}
@dataclass
class DualVariables:
multipliers: Dict[str, float] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {"multipliers": self.multipliers}
@dataclass
class PrivacyBudget:
signal: str
budget: float
expiry: float
def to_dict(self) -> Dict[str, Any]:
return {"signal": self.signal, "budget": self.budget, "expiry": self.expiry}
@dataclass
class AuditLog:
entry: str
signer: str
timestamp: float
contract_id: str
version: int
def to_dict(self) -> Dict[str, Any]:
return {
"entry": self.entry,
"signer": self.signer,
"timestamp": self.timestamp,
"contract_id": self.contract_id,
"version": self.version,
}
@dataclass
class PolicyBlock:
safety: Dict[str, Any] = field(default_factory=dict)
exposure_rules: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {"safety": self.safety, "exposure_rules": self.exposure_rules}

56
tests/test_core.py Normal file
View File

@ -0,0 +1,56 @@
import time
from cosmosmesh_privacy_preserving_federated.cosmosmesh.models import LocalProblem, SharedVariables
from cosmosmesh_privacy_preserving_federated.cosmosmesh.delta_sync import reconcile_deltas
from cosmosmesh_privacy_preserving_federated.cosmosmesh.adapters import RoverPlannerAdapter, HabitatModuleAdapter
def test_models_basic_serialization():
lp = LocalProblem(
id="lp-1",
domain="rover",
assets=["rover-1"],
objective="minimize_energy",
constraints={"max_time": 3600},
solver_hint="-quadratic",
version=1,
)
sv = SharedVariables(version=1, forecasts={"energy": 100}, priors={"energy": 90})
assert isinstance(lp.to_dict(), dict)
assert isinstance(sv.to_dict(), dict)
def test_delta_sync_reconcile_basic():
# Existing state has a delta for contract 'c1'
existing = {
"c1": {"delta": {"assignment": {"a1": "idle"}}, "timestamp": 1.0, "contract_id": "c1"},
}
new = [
{"delta": {"assignment": {"a2": "patrol"}}, "timestamp": 2.0, "contract_id": "c1"},
{"delta": {"assignment": {"a3": "scan"}}, "timestamp": 0.5, "contract_id": "c2"},
]
updated = reconcile_deltas(existing, new)
assert "c1" in updated
assert updated["c1"]["delta"]["assignment"].get("a2") == "patrol"
assert "c2" in updated
def test_adapters_mapping_exist():
lp = LocalProblem(
id="lp-2",
domain="rover",
assets=["rover-1", "rover-2"],
objective="balance",
constraints={},
)
rover = RoverPlannerAdapter()
habitat = HabitatModuleAdapter()
delta1 = rover.map_local_problem(lp)
delta2 = habitat.map_local_problem(lp)
assert isinstance(delta1, type(delta1))
assert delta1.contract_id == lp.id
assert isinstance(delta2, type(delta2))
assert delta2.contract_id == lp.id