43 lines
1.5 KiB
Python
43 lines
1.5 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, List
|
|
|
|
from .core import LocalEvent
|
|
|
|
|
|
class DeterministicReplayEngine:
|
|
"""Minimal deterministic replay engine.
|
|
|
|
It accepts a delta_stream, sorts events deterministically by (timestamp, id),
|
|
and returns an ordered sequence which can be used to compare fidelity against a baseline.
|
|
This is intentionally minimal and focused on determinism for MVP.
|
|
"""
|
|
|
|
def __init__(self, delta_stream: List[Dict[str, Any]], seed: int | None = None, baseline: List[str] | None = None) -> None:
|
|
self.delta_stream = delta_stream
|
|
self.seed = seed if seed is not None else 42
|
|
self.baseline = baseline
|
|
|
|
def _normalize(self, item: Dict[str, Any]) -> Dict[str, Any]:
|
|
# Normalize input into a simple canonical form for deterministic processing
|
|
return {
|
|
"id": str(item.get("id")),
|
|
"type": str(item.get("type")),
|
|
"timestamp": float(item.get("timestamp", 0.0)),
|
|
}
|
|
|
|
def run(self) -> Dict[str, Any]:
|
|
# Deterministic sort by (timestamp, id)
|
|
events = [self._normalize(e) for e in self.delta_stream]
|
|
ordered = sorted(events, key=lambda e: (e["timestamp"], e["id"]))
|
|
|
|
result = {
|
|
"ordered_events": [e["id"] for e in ordered],
|
|
"deterministic": True,
|
|
}
|
|
|
|
# Basic fidelity signal if baseline provided
|
|
if self.baseline is not None:
|
|
result["matches_baseline"] = result["ordered_events"] == self.baseline
|
|
return result
|