"""CRDT-style PlanDelta primitives for CosmosMesh MVP. This module provides a tiny CRDT-like structure to enable deterministic offline merges of PlanDelta changes across islanded replicas. The goal is to permit independent delta accumulation and deterministic replay on reconnect. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, Optional @dataclass(frozen=True) class PlanDeltaCRDT: """A CRDT-like wrapper for a single PlanDelta payload. - delta_id: unique identifier for this delta when created. - changes: a mapping from key to numeric delta value (or other simple types). - ts: logical timestamp (monotonic) of delta creation to help ordering on replay. - actor: optional actor id who authored the delta. - version_vector: a simple per-actor vector to track seen deltas for causal safety. """ delta_id: str changes: Dict[str, Any] = field(default_factory=dict) ts: Optional[int] = None actor: Optional[str] = None version_vector: Dict[str, int] = field(default_factory=dict) def merge(self, other: "PlanDeltaCRDT") -> "PlanDeltaCRDT": """Return a new PlanDeltaCRDT representing the commutative merge. Rules: - For numeric keys, take the maximum value between deltas. - For non-numeric values, prefer the one with the larger timestamp if available, otherwise choose the value from the delta with higher lexical delta_id. - Version vectors are merged by taking the max per-actor sequence. - The merge is commutative and associative, suitable for islanded reconciling. """ # Merge version vectors merged_vv: Dict[str, int] = dict(self.version_vector) for actor, v in other.version_vector.items(): merged_vv[actor] = max(merged_vv.get(actor, 0), v) # Merge changes merged_changes: Dict[str, Any] = dict(self.changes) for k, v in other.changes.items(): if k in merged_changes: a = merged_changes[k] b = v # If both numeric (and not booleans), take max if ( isinstance(a, (int, float)) and not isinstance(a, bool) and isinstance(b, (int, float)) and not isinstance(b, bool) ): merged_changes[k] = max(a, b) else: # Otherwise, prefer the later delta by timestamp if possible t_self = self.ts or 0 t_other = other.ts or 0 if t_other > t_self: merged_changes[k] = b else: merged_changes[k] = a else: merged_changes[k] = v # Construct a new PlanDeltaCRDT representing the merge return PlanDeltaCRDT( delta_id=self.delta_id + "+merge+" + other.delta_id, changes=merged_changes, ts=max(filter(None, [self.ts, other.ts])), actor=self.actor or other.actor, version_vector=merged_vv, ) def to_dict(self) -> Dict[str, Any]: return { "delta_id": self.delta_id, "changes": self.changes, "ts": self.ts, "actor": self.actor, "version_vector": self.version_vector, } __all__ = ["PlanDeltaCRDT"]