deltatrace-deterministic-re.../deltatrace/adapters/fix_feed_sim.py

34 lines
899 B
Python

from __future__ import annotations
import time
from typing import List
from ..dsl import LocalEvent, PlanDelta
class FixFeedSimulator:
"""
Lightweight FIX feed simulator that emits LocalEvent and PlanDelta samples.
This is intentionally simplistic and deterministic for MVP replay demos.
"""
def __init__(self, seed: int = 1):
self.seed = seed
def generate_stream(self) -> List[object]:
t = time.time()
# Create a LocalEvent tick followed by a PlanDelta
md1 = LocalEvent(
event_id="md1",
event_type="MDTick",
asset="ABC",
venue="FIX_GATEWAY",
timestamp=t,
payload={"price": 101.5, "size": 100},
)
plan = PlanDelta(
delta_id="d1",
timestamp=t + 0.01,
author="sim",
contract_id="ABC",
delta_payload={"action": "BUY", "qty": 10, "limit": 102.0},
signature="sig",
)
return [md1, plan]