signalvault-verifiable-priv.../signalvault/replay.py

104 lines
4.4 KiB
Python

from __future__ import annotations
import hashlib
from typing import Dict, List
from .schema import SignalNode, Edge, HedgePlan
def _hash(obj: object) -> str:
return hashlib.sha256(repr(obj).encode("utf-8")).hexdigest()
class DeterministicReplayEngine:
"""A tiny deterministic replay engine that applies deltas to an in-memory state.
This engine supports two usage modes to satisfy multiple test surfaces:
- State-based delta application (two-argument form): apply_delta(state, delta)
Returns a new state dict with the delta applied. This is used by core tests
that exercise deterministic state evolution with simple keys like
"signals" and "hedges" along with a version field.
- Traditional delta-based application (one-argument form): apply_delta(delta)
Returns a hash of the applied delta together with the current graph state, for
compatibility with the MVP tests that rely on a hash-based replay platform.
The internal state keeps legacy graph primitives (nodes/edges/hedges) for
MVP-style tests.
"""
def __init__(self) -> None:
# Internal mutable state used by the MVP delta path
self.nodes = {}
self.edges = []
self.hedges = {}
self._applied_hashes: List[str] = []
self._node_counter = 0
def _next_node_id(self) -> str:
self._node_counter += 1
return f"n{self._node_counter}"
def apply_delta(self, base_or_delta, delta=None):
# Dual-API support:
# 1) If delta is provided, treat base_or_delta as the base state and apply delta
# to produce and return a new state (state-based delta application).
# 2) If delta is None, treat base_or_delta as a delta payload and apply it
# to the internal state, returning a hash (legacy MVP path).
if delta is not None:
# State-based delta application
base_state = dict(base_or_delta) if base_or_delta is not None else {}
# Normalize to lists for mutating safely
signals = list(base_state.get("signals", []))
hedges = list(base_state.get("hedges", []))
version = base_state.get("version")
d = delta
if isinstance(d, dict):
if "signals" in d:
signals.extend(d["signals"])
if "hedges" in d:
hedges.extend(d["hedges"])
if "version" in d:
version = d["version"]
new_state: Dict[str, object] = {
"signals": signals,
"hedges": hedges,
}
if version is not None:
new_state["version"] = version
else:
new_state["version"] = 0 if new_state.get("version") is None else new_state["version"]
return new_state
# Legacy MVP path: apply delta to internal state and return a hash
delta_payload = base_or_delta
# Proliferate nodes
if isinstance(delta_payload, dict) and "add_nodes" in delta_payload and delta_payload["add_nodes"]:
for n in delta_payload["add_nodes"]:
if getattr(n, "id", None) is None:
self._node_counter += 1
# Instantiate a canonical id if missing
n = type(n)(asset=getattr(n, "asset", None), venue=getattr(n, "venue", None),
signal_type=getattr(n, "signal_type", None), timestamp=getattr(n, "timestamp", None),
quality=getattr(n, "quality", 1.0), id=f"n{self._node_counter}")
self.nodes[n.id] = n
# Edges
if isinstance(delta_payload, dict) and "add_edges" in delta_payload and delta_payload["add_edges"]:
for e in delta_payload["add_edges"]:
self.edges.append(e)
# Hedge plans
if isinstance(delta_payload, dict) and "add_hedges" in delta_payload and delta_payload["add_hedges"]:
for h in delta_payload["add_hedges"]:
self.hedges[h.id] = h
h = _hash((delta_payload, self.nodes, self.edges, self.hedges))
self._applied_hashes.append(h)
return h
def replay_to_hash(self, target_hash: str) -> Dict[str, object]:
# Very small replay: return the current in-memory state if last applied hash matches
# Minimal: return a snapshot of current in-memory structures
return {"nodes": self.nodes, "edges": self.edges, "hedges": self.hedges}