from __future__ import annotations import hashlib from typing import Dict, List from .schema import SignalNode, Edge, HedgePlan def _hash(obj: object) -> str: return hashlib.sha256(repr(obj).encode("utf-8")).hexdigest() class DeterministicReplayEngine: """A tiny deterministic replay engine that applies deltas to an in-memory state. This engine supports two usage modes to satisfy multiple test surfaces: - State-based delta application (two-argument form): apply_delta(state, delta) Returns a new state dict with the delta applied. This is used by core tests that exercise deterministic state evolution with simple keys like "signals" and "hedges" along with a version field. - Traditional delta-based application (one-argument form): apply_delta(delta) Returns a hash of the applied delta together with the current graph state, for compatibility with the MVP tests that rely on a hash-based replay platform. The internal state keeps legacy graph primitives (nodes/edges/hedges) for MVP-style tests. """ def __init__(self) -> None: # Internal mutable state used by the MVP delta path self.nodes = {} self.edges = [] self.hedges = {} self._applied_hashes: List[str] = [] self._node_counter = 0 def _next_node_id(self) -> str: self._node_counter += 1 return f"n{self._node_counter}" def apply_delta(self, base_or_delta, delta=None): # Dual-API support: # 1) If delta is provided, treat base_or_delta as the base state and apply delta # to produce and return a new state (state-based delta application). # 2) If delta is None, treat base_or_delta as a delta payload and apply it # to the internal state, returning a hash (legacy MVP path). if delta is not None: # State-based delta application base_state = dict(base_or_delta) if base_or_delta is not None else {} # Normalize to lists for mutating safely signals = list(base_state.get("signals", [])) hedges = list(base_state.get("hedges", [])) version = base_state.get("version") d = delta if isinstance(d, dict): if "signals" in d: signals.extend(d["signals"]) if "hedges" in d: hedges.extend(d["hedges"]) if "version" in d: version = d["version"] new_state: Dict[str, object] = { "signals": signals, "hedges": hedges, } if version is not None: new_state["version"] = version else: new_state["version"] = 0 if new_state.get("version") is None else new_state["version"] return new_state # Legacy MVP path: apply delta to internal state and return a hash delta_payload = base_or_delta # Proliferate nodes if isinstance(delta_payload, dict) and "add_nodes" in delta_payload and delta_payload["add_nodes"]: for n in delta_payload["add_nodes"]: if getattr(n, "id", None) is None: self._node_counter += 1 # Instantiate a canonical id if missing n = type(n)(asset=getattr(n, "asset", None), venue=getattr(n, "venue", None), signal_type=getattr(n, "signal_type", None), timestamp=getattr(n, "timestamp", None), quality=getattr(n, "quality", 1.0), id=f"n{self._node_counter}") self.nodes[n.id] = n # Edges if isinstance(delta_payload, dict) and "add_edges" in delta_payload and delta_payload["add_edges"]: for e in delta_payload["add_edges"]: self.edges.append(e) # Hedge plans if isinstance(delta_payload, dict) and "add_hedges" in delta_payload and delta_payload["add_hedges"]: for h in delta_payload["add_hedges"]: self.hedges[h.id] = h h = _hash((delta_payload, self.nodes, self.edges, self.hedges)) self._applied_hashes.append(h) return h def replay_to_hash(self, target_hash: str) -> Dict[str, object]: # Very small replay: return the current in-memory state if last applied hash matches # Minimal: return a snapshot of current in-memory structures return {"nodes": self.nodes, "edges": self.edges, "hedges": self.hedges}