gridverse-open-low-code-pla.../gridverse/delta_sync.py

21 lines
668 B
Python

from typing import List, Dict, Any
class DeltaSyncEngine:
def __init__(self) -> None:
self._store: Dict[str, List[Dict[str, Any]]] = {}
def write_delta(self, node_id: str, delta: Dict[str, Any]) -> None:
self._store.setdefault(node_id, []).append(delta)
def replay(self, node_id: str) -> List[Dict[str, Any]]:
# Return deltas in order they were written
return list(self._store.get(node_id, []))
def reconcile(local: Dict[str, Any], remote: Dict[str, Any]) -> Dict[str, Any]:
"""Merge two delta domains, preferring remote values on conflicts."""
merged = dict(local)
merged.update(remote)
return merged