signalvault-verifiable-priv.../signalvault/replay.py

58 lines
2.0 KiB
Python

from __future__ import annotations
import hashlib
from typing import Dict, List
from .schema import SignalNode, Edge, HedgePlan
def _hash(obj: object) -> str:
return hashlib.sha256(repr(obj).encode("utf-8")).hexdigest()
class DeterministicReplayEngine:
"""A tiny deterministic replay engine that applies deltas to an in-memory state.
State is a dict with keys:
- nodes: dict[id, SignalNode]
- edges: list[Edge]
- hedges: dict[id, HedgePlan]
"""
def __init__(self) -> None:
self.nodes = {}
self.edges = []
self.hedges = {}
self._applied_hashes: List[str] = []
self._node_counter = 0
def _next_node_id(self) -> str:
self._node_counter += 1
return f"n{self._node_counter}"
def apply_delta(self, delta):
# delta can contain: add_nodes, add_edges, add_hedges
if isinstance(delta, dict) and "add_nodes" in delta and delta["add_nodes"]:
for n in delta["add_nodes"]:
if getattr(n, "id", None) is None:
self._node_counter += 1
n = SignalNode(asset=n.asset, venue=n.venue, signal_type=n.signal_type, timestamp=n.timestamp, quality=n.quality, id=f"n{self._node_counter}")
self.nodes[n.id] = n
if isinstance(delta, dict) and "add_edges" in delta and delta["add_edges"]:
for e in delta["add_edges"]:
self.edges.append(e)
if isinstance(delta, dict) and "add_hedges" in delta and delta["add_hedges"]:
for h in delta["add_hedges"]:
self.hedges[h.id] = h
h = _hash((delta, self.nodes, self.edges, self.hedges))
self._applied_hashes.append(h)
return h
def replay_to_hash(self, target_hash: str) -> Dict[str, object]:
# Very small replay: return the current in-memory state if last applied hash matches
# Minimal: return a snapshot of current in-memory structures
return {"nodes": self.nodes, "edges": self.edges, "hedges": self.hedges}