38 lines
1.4 KiB
Python
38 lines
1.4 KiB
Python
import uuid
|
|
from typing import Dict, List, Tuple
|
|
|
|
class DeltaCRDT:
|
|
"""A very small, toy CRDT for delta-based time-series data.
|
|
|
|
We store per-device time series as a list of (timestamp, value, delta_id).
|
|
Deltas are deduplicated via delta_id and merged idempotently.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
# device_id -> list[(ts, value, delta_id)]
|
|
self.state: Dict[str, List[Tuple[float, float, str]]] = {}
|
|
self.seen: set = set()
|
|
|
|
def _new_id(self, device: str, ts: float, value: float) -> str:
|
|
# Deterministic-ish id generator for reproducibility; include a uuid for uniqueness
|
|
return str(uuid.uuid4())
|
|
|
|
def add_local_delta(self, device: str, ts: float, value: float) -> str:
|
|
delta_id = self._new_id(device, ts, value)
|
|
entry = (ts, value, delta_id)
|
|
self.state.setdefault(device, [])
|
|
self.state[device].append(entry)
|
|
self.seen.add(delta_id)
|
|
return delta_id
|
|
|
|
def merge(self, remote_state: Dict[str, List[Tuple[float, float, str]]]) -> None:
|
|
for device, entries in remote_state.items():
|
|
self.state.setdefault(device, [])
|
|
for ts, val, did in entries:
|
|
if did not in self.seen: # type: ignore
|
|
self.state[device].append((ts, val, did))
|
|
self.seen.add(did)
|
|
|
|
def export_state(self) -> Dict[str, List[Tuple[float, float, str]]]:
|
|
return self.state
|