deltatrace-deterministic-re.../deltatrace/trace.py

124 lines
2.6 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field, asdict
from typing import Any, Dict, Optional
import json
@dataclass
class Metadata:
version: str
timestamp: float
nonce: str
source_adapter: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class LocalEvent:
version: str
timestamp: float
nonce: str
source: str
asset: str
event_type: str # e.g., 'MDTick', 'Signal', 'RiskCheckInput'
payload: Dict[str, Any]
metadata: Any = field(default=None)
def to_dict(self) -> Dict[str, Any]:
d = {
"version": self.version,
"timestamp": self.timestamp,
"nonce": self.nonce,
"source": self.source,
"asset": self.asset,
"event_type": self.event_type,
"payload": self.payload,
"metadata": self.metadata.to_dict() if self.metadata else None,
}
return d
@staticmethod
def from_dict(d: Dict[str, Any]) -> "LocalEvent":
md = d.get("metadata")
metadata = Metadata(**md) if md else None # type: ignore[arg-type]
return LocalEvent(
version=d["version"],
timestamp=d["timestamp"],
nonce=d["nonce"],
source=d["source"],
asset=d["asset"],
event_type=d["event_type"],
payload=d["payload"],
metadata=metadata,
)
@dataclass
class PlanDelta:
version: str
timestamp: float
nonce: str
author: str
contract_id: str
delta_id: str
payload: Dict[str, Any]
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class OrderEvent:
version: str
timestamp: float
order_id: str
delta_ref: str # PlanDelta delta_id
payload: Dict[str, Any]
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class FillEvent:
version: str
timestamp: float
fill_id: str
order_ref: str
payload: Dict[str, Any]
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class RiskCheck:
version: str
timestamp: float
check_id: str
delta_ref: str
results: Dict[str, Any]
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
@dataclass
class AuditLog:
version: str
timestamp: float
entry_id: str
action: str
details: Dict[str, Any]
signature: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)