diff --git a/src/cosmosmesh_privacy_preserving_federated_/__init__.py b/src/cosmosmesh_privacy_preserving_federated_/__init__.py index 41bd689..97ec0c1 100644 --- a/src/cosmosmesh_privacy_preserving_federated_/__init__.py +++ b/src/cosmosmesh_privacy_preserving_federated_/__init__.py @@ -3,6 +3,9 @@ from .catopt_bridge import CatOptBridge from .contract_registry import REGISTRY, register_contract, get_contract from .reference_adapters import RoverPlannerAdapter, HabitatLifeSupportAdapter +from .crdt import PlanDeltaCRDT +from .ledger import ExperimentLedger +from .policy import PolicyBlock def add(a, b): """Tiny compatibility helper used by tests.""" @@ -15,5 +18,8 @@ __all__ = [ "get_contract", "RoverPlannerAdapter", "HabitatLifeSupportAdapter", + "PlanDeltaCRDT", + "ExperimentLedger", + "PolicyBlock", "add", ] diff --git a/src/cosmosmesh_privacy_preserving_federated_/crdt.py b/src/cosmosmesh_privacy_preserving_federated_/crdt.py new file mode 100644 index 0000000..cb8db7b --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated_/crdt.py @@ -0,0 +1,87 @@ +"""CRDT-style PlanDelta primitives for CosmosMesh MVP. + +This module provides a tiny CRDT-like structure to enable deterministic +offline merges of PlanDelta changes across islanded replicas. The goal is to +permit independent delta accumulation and deterministic replay on reconnect. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, Optional + + +@dataclass(frozen=True) +class PlanDeltaCRDT: + """A CRDT-like wrapper for a single PlanDelta payload. + + - delta_id: unique identifier for this delta when created. + - changes: a mapping from key to numeric delta value (or other simple types). + - ts: logical timestamp (monotonic) of delta creation to help ordering on replay. + - actor: optional actor id who authored the delta. + - version_vector: a simple per-actor vector to track seen deltas for causal safety. + """ + + delta_id: str + changes: Dict[str, Any] = field(default_factory=dict) + ts: Optional[int] = None + actor: Optional[str] = None + version_vector: Dict[str, int] = field(default_factory=dict) + + def merge(self, other: "PlanDeltaCRDT") -> "PlanDeltaCRDT": + """Return a new PlanDeltaCRDT representing the commutative merge. + + Rules: + - For numeric keys, take the maximum value between deltas. + - For non-numeric values, prefer the one with the larger timestamp if available, + otherwise choose the value from the delta with higher lexical delta_id. + - Version vectors are merged by taking the max per-actor sequence. + - The merge is commutative and associative, suitable for islanded reconciling. + """ + # Merge version vectors + merged_vv: Dict[str, int] = dict(self.version_vector) + for actor, v in other.version_vector.items(): + merged_vv[actor] = max(merged_vv.get(actor, 0), v) + + # Merge changes + merged_changes: Dict[str, Any] = dict(self.changes) + for k, v in other.changes.items(): + if k in merged_changes: + a = merged_changes[k] + b = v + # If both numeric (and not booleans), take max + if ( + isinstance(a, (int, float)) and not isinstance(a, bool) + and isinstance(b, (int, float)) and not isinstance(b, bool) + ): + merged_changes[k] = max(a, b) + else: + # Otherwise, prefer the later delta by timestamp if possible + t_self = self.ts or 0 + t_other = other.ts or 0 + if t_other > t_self: + merged_changes[k] = b + else: + merged_changes[k] = a + else: + merged_changes[k] = v + + # Construct a new PlanDeltaCRDT representing the merge + return PlanDeltaCRDT( + delta_id=self.delta_id + "+merge+" + other.delta_id, + changes=merged_changes, + ts=max(filter(None, [self.ts, other.ts])), + actor=self.actor or other.actor, + version_vector=merged_vv, + ) + + def to_dict(self) -> Dict[str, Any]: + return { + "delta_id": self.delta_id, + "changes": self.changes, + "ts": self.ts, + "actor": self.actor, + "version_vector": self.version_vector, + } + +__all__ = ["PlanDeltaCRDT"] diff --git a/src/cosmosmesh_privacy_preserving_federated_/ledger.py b/src/cosmosmesh_privacy_preserving_federated_/ledger.py new file mode 100644 index 0000000..b3c1495 --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated_/ledger.py @@ -0,0 +1,54 @@ +"""Experiment ledger for CosmosMesh MVP. + +Records environment, topology, and delta outcomes to enable reproducible demos +and deterministic replay when disconnections occur. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class DeltaRecord: + delta_id: str + outcome: str + details: Dict[str, Any] = field(default_factory=dict) + + def to_dict(self) -> Dict[str, Any]: + return {"delta_id": self.delta_id, "outcome": self.outcome, "details": self.details} + + +@dataclass +class ExperimentLedger: + environment_version: str + seed_space: str + topology: str + hardware_config: Dict[str, Any] + code_version_hash: str + deltas: List[DeltaRecord] = field(default_factory=list) + proofs: List[str] = field(default_factory=list) # placeholder for Merkle proofs + + def record_delta(self, delta_id: str, outcome: str, details: Optional[Dict[str, Any]] = None) -> None: + self.deltas.append(DeltaRecord(delta_id=delta_id, outcome=outcome, details=details or {})) + + def add_proof(self, proof: str) -> None: + self.proofs.append(proof) + + def to_dict(self) -> Dict[str, Any]: + return { + "environment_version": self.environment_version, + "seed_space": self.seed_space, + "topology": self.topology, + "hardware_config": self.hardware_config, + "code_version_hash": self.code_version_hash, + "deltas": [d.to_dict() for d in self.deltas], + "proofs": self.proofs, + } + + def __len__(self) -> int: # convenience + return len(self.deltas) + + +__all__ = ["ExperimentLedger"] diff --git a/src/cosmosmesh_privacy_preserving_federated_/policy.py b/src/cosmosmesh_privacy_preserving_federated_/policy.py new file mode 100644 index 0000000..80f86ea --- /dev/null +++ b/src/cosmosmesh_privacy_preserving_federated_/policy.py @@ -0,0 +1,28 @@ +"""PolicyBlock: safety governance gate for CosmosMesh deltas. + +A lightweight mechanism to guard information exchange paths and require +approval for sensitive plan deltas in Phase 0. +""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + + +@dataclass +class PolicyBlock: + policy_id: str + allow_signal_types: Optional[list[str]] = None + requires_approval: bool = False + + def is_allowed(self, signal_type: str) -> bool: + if self.allow_signal_types is None: + return True + return signal_type in self.allow_signal_types + + def __str__(self) -> str: + return f"PolicyBlock(id={self.policy_id}, requires_approval={self.requires_approval})" + + +__all__ = ["PolicyBlock"] diff --git a/tests/test_crdt_plan_delta.py b/tests/test_crdt_plan_delta.py new file mode 100644 index 0000000..8131442 --- /dev/null +++ b/tests/test_crdt_plan_delta.py @@ -0,0 +1,26 @@ +import unittest +from cosmosmesh_privacy_preserving_federated_ import PlanDeltaCRDT + + +class TestPlanDeltaCRDT(unittest.TestCase): + def test_merge_numeric_max(self): + a = PlanDeltaCRDT(delta_id="d1", changes={"x": 1}, ts=1, actor="A", version_vector={"A": 1}) + b = PlanDeltaCRDT(delta_id="d2", changes={"x": 3}, ts=2, actor="B", version_vector={"B": 2}) + merged = a.merge(b) + self.assertEqual(merged.changes["x"], 3) + self.assertEqual(merged.ts, 2) + self.assertEqual(merged.version_vector.get("A"), 1) + self.assertEqual(merged.version_vector.get("B"), 2) + + def test_merge_non_numeric_prefers_later_timestamp(self): + a = PlanDeltaCRDT(delta_id="d1", changes={"flag": True}, ts=1, actor="A", version_vector={"A": 1}) + b = PlanDeltaCRDT(delta_id="d2", changes={"flag": False}, ts=2, actor="B", version_vector={"B": 2}) + merged = a.merge(b) + self.assertIs(merged.changes["flag"], False) + self.assertEqual(merged.ts, 2) + self.assertEqual(merged.version_vector.get("A"), 1) + self.assertEqual(merged.version_vector.get("B"), 2) + + +if __name__ == "__main__": + unittest.main()