"""DeltaTrace MVP core: event graph schema and deterministic replay scaffold. This module provides lightweight data structures and a tiny, deterministic replay engine suitable for an MVP. It is not a production-ready tracing system, but a minimal, well-typed foundation you can build upon. """ from __future__ import annotations from dataclasses import dataclass, field from typing import Any, Dict, List, Optional import hashlib import json @dataclass class LocalEvent: instrument: str timestamp: float data_hash: str source_id: str version: str payload: Dict[str, Any] = field(default_factory=dict) @dataclass class SharedSignal: signal_id: str value: float uncertainty: float timestamp: float privacy_tag: Optional[str] = None @dataclass class PlanDelta: delta_id: str timestamp: float author: str contract_id: str signature: str safety_tags: List[str] = field(default_factory=list) @dataclass class OrderEvent: order_id: str side: str price: float size: int timestamp: float venue: str related_delta: Optional[str] = None @dataclass class FillEvent: fill_id: str order_id: str price: float size: int timestamp: float venue: str related_delta: Optional[str] = None @dataclass class RiskCheck: check_id: str result: bool rationale: str timestamp: float @dataclass class AuditLog: entry_id: str signer: str timestamp: float contract_id: str action: str details: Optional[str] = None @dataclass class PrivacyBudget: signal_id: str budget: float expiry: float @dataclass class MessageMeta: version: str nonce: str signature: str encoding: str = "json" class TraceGraph: """Minimal in-memory causal graph for an MVP. Nodes are stored as a simple list; edges are tuples (src, dst, label). This is a lightweight scaffold for deterministic replay and auditing. """ def __init__(self) -> None: self.nodes: List[Any] = [] self.edges: List[tuple] = [] # (src, dst, label) def add_node(self, node: Any) -> None: self.nodes.append(node) def add_edge(self, src: Any, dst: Any, label: str) -> None: self.edges.append((src, dst, label)) def to_dict(self) -> Dict[str, Any]: return { "nodes": [repr(n) for n in self.nodes], "edges": [ {"src": repr(e[0]), "dst": repr(e[1]), "label": e[2]} for e in self.edges ], } class ReplayEngine: """Deterministic replay scaffold. Given a delta-stream (PlanDelta list) and a generic event-log (list), compute a deterministic fidelity root (hash) representing the replay outcome. This is intentionally small but deterministic and testable. """ def __init__(self, delta_stream: List[PlanDelta], event_log: List[Dict[str, Any]]): self.delta_stream = delta_stream self.event_log = event_log def _freeze(self, obj: Any) -> str: return json.dumps(obj, sort_keys=True, default=str) def compute_root(self) -> str: parts: List[str] = [] for d in self.delta_stream: parts.append(self._freeze({"delta_id": d.delta_id, "ts": d.timestamp, "author": d.author})) for e in self.event_log: parts.append(self._freeze(e)) blob = "|".join(parts) return hashlib.sha256(blob.encode("utf-8")).hexdigest() def replay_path(self) -> Dict[str, Any]: """Return a minimal, deterministic representation of the replay path.""" path = [] for d in self.delta_stream: path.append({"delta_id": d.delta_id, "ts": d.timestamp, "author": d.author}) # Attach matched events by simple association (if any) by timestamp range for ev in self.event_log: path.append({"event": ev}) return { "path": path, "root": self.compute_root(), }