meshviz-studio-decentralize.../meshviz/crdt.py

39 lines
1.4 KiB
Python

import uuid
from typing import Dict, List, Tuple
class DeltaCRDT:
"""A compact CRDT-like store for delta-based time-series data.
We store per-device time series as a list of (timestamp, value, delta_id).
Deltas are deduplicated via delta_id and merged idempotently.
"""
def __init__(self) -> None:
# device_id -> list[(ts, value, delta_id)]
self.state: Dict[str, List[Tuple[float, float, str]]] = {}
self.seen: set = set()
def _new_id(self, device: str, ts: float, value: float) -> str:
# Simple unique-id generator; deterministic content is not required here
return str(uuid.uuid4())
def add_local_delta(self, device: str, ts: float, value: float) -> str:
delta_id = self._new_id(device, ts, value)
entry = (ts, value, delta_id)
self.state.setdefault(device, [])
self.state[device].append(entry)
self.seen.add(delta_id)
return delta_id
def merge(self, remote_state: Dict[str, List[Tuple[float, float, str]]]) -> None:
for device, entries in remote_state.items():
self.state.setdefault(device, [])
for ts, val, did in entries:
if did not in self.seen: # type: ignore
self.state[device].append((ts, val, did))
self.seen.add(did)
def export_state(self) -> Dict[str, List[Tuple[float, float, str]]]:
return self.state