signalvault-verifiable-priv.../signalvault_verifiable_priv.../replay.py

27 lines
887 B
Python

from __future__ import annotations
from typing import Dict, List
from .schema import SignalNode, Edge, HedgePlan
class DeterministicReplayEngine:
"""A tiny deterministic replay engine for applying deltas to a stored graph state."""
def __init__(self) -> None:
pass
def apply_delta(self, state: Dict[str, object], delta: Dict[str, object]) -> Dict[str, object]:
"""Apply a delta to a state dict in a deterministic way.
This is a minimal placeholder: keys in delta override state.
"""
new_state = dict(state)
for k, v in delta.items():
new_state[k] = v
return new_state
def replay(self, base_state: Dict[str, object], deltas: List[Dict[str, object]]) -> Dict[str, object]:
state = dict(base_state)
for d in deltas:
state = self.apply_delta(state, d)
return state