32 lines
1.3 KiB
Python
32 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import List, Dict
|
|
|
|
from .core import LocalEvent, PlanDelta, OrderEvent, FillEvent, AuditLog
|
|
|
|
|
|
class DeterministicReplayEngine:
|
|
def __init__(self, latency_tolerance_ms: float = 1.0) -> None:
|
|
self.latency_tolerance_ms = latency_tolerance_ms
|
|
|
|
def replay(self, delta_stream: List[LocalEvent], event_log: List[AuditLog]) -> Dict[str, object]:
|
|
# Lightweight, deterministic replay skeleton:
|
|
# - Walk the incoming LocalEvents, matching with later Order/Fill events when possible
|
|
# - Compute a toy end-to-end latency metric if FillEvent timestamps are present
|
|
first_ts = delta_stream[0].timestamp if delta_stream else None
|
|
last_ts = None
|
|
for ev in delta_stream:
|
|
if isinstance(ev, FillEvent):
|
|
if last_ts is None or ev.timestamp > last_ts:
|
|
last_ts = ev.timestamp
|
|
|
|
end_to_end_latency_ms = ((last_ts - first_ts) * 1000.0) if (first_ts is not None and last_ts is not None) else 0.0
|
|
fidelity = 1.0 if end_to_end_latency_ms >= 0 else 0.0
|
|
|
|
return {
|
|
"e2e_latency_ms": end_to_end_latency_ms,
|
|
"fidelity": fidelity,
|
|
"events_replayed": len(delta_stream),
|
|
"logs_present": len(event_log) if event_log else 0,
|
|
}
|