deltatrace-deterministic-re.../delta_trace/replay.py

32 lines
1.3 KiB
Python

from __future__ import annotations
from typing import List, Dict
from .core import LocalEvent, PlanDelta, OrderEvent, FillEvent, AuditLog
class DeterministicReplayEngine:
def __init__(self, latency_tolerance_ms: float = 1.0) -> None:
self.latency_tolerance_ms = latency_tolerance_ms
def replay(self, delta_stream: List[LocalEvent], event_log: List[AuditLog]) -> Dict[str, object]:
# Lightweight, deterministic replay skeleton:
# - Walk the incoming LocalEvents, matching with later Order/Fill events when possible
# - Compute a toy end-to-end latency metric if FillEvent timestamps are present
first_ts = delta_stream[0].timestamp if delta_stream else None
last_ts = None
for ev in delta_stream:
if isinstance(ev, FillEvent):
if last_ts is None or ev.timestamp > last_ts:
last_ts = ev.timestamp
end_to_end_latency_ms = ((last_ts - first_ts) * 1000.0) if (first_ts is not None and last_ts is not None) else 0.0
fidelity = 1.0 if end_to_end_latency_ms >= 0 else 0.0
return {
"e2e_latency_ms": end_to_end_latency_ms,
"fidelity": fidelity,
"events_replayed": len(delta_stream),
"logs_present": len(event_log) if event_log else 0,
}