deltatrace-deterministic-re.../deltatrace/replay.py

41 lines
1.5 KiB
Python

from __future__ import annotations
from typing import Dict, List, Any
from .trace import LocalEvent, PlanDelta, OrderEvent, FillEvent
def deterministic_replay(delta_stream: List[Dict[str, Any]], event_log: List[Dict[str, Any]]) -> Dict[str, Any]:
"""A simple deterministic replay engine.
This function replays a captured delta stream in a sandbox and compares
the resulting decision path against a baseline event_log. It returns a
summary containing fidelity metrics and the produced replay path.
"""
# Normalize input into structured objects if they aren't already
replay_path: List[Dict[str, Any]] = []
baseline = {e.get("delta_id") for e in event_log if e.get("delta_id")}
for item in delta_stream:
# Expect each delta to be a PlanDelta dict; in a real system this would be validated
if item.get("delta_id"):
entry = {
"delta_id": item["delta_id"],
"timestamp": item.get("timestamp"),
"action": "PlanDeltaApplied",
}
replay_path.append(entry)
# Simple fidelity: count deltas that have a corresponding entry in baseline
replay_delta_ids = {d.get("delta_id") for d in delta_stream if d.get("delta_id")}
hit = len(replay_delta_ids.intersection(baseline))
fidelity = hit / max(1, len(replay_delta_ids))
return {
"status": "ok",
"replay_path": replay_path,
"metrics": {
"delta_count": len(replay_delta_ids),
"fidelity": fidelity,
},
}