diff --git a/README.md b/README.md index ef96ee5..bf8f43f 100644 --- a/README.md +++ b/README.md @@ -1,31 +1,19 @@ -# DeltaTrace: Deterministic Replayable Latency & Compliance Tracing +DeltaTrace MVP -DeltaTrace is a production-oriented MVP crafted to enable end-to-end traceability, deterministic replay of order lifecycles, and governance-ready audit trails for live market-execution pipelines operating across partitioned networks. +A minimal, production-oriented scaffolding for end-to-end traceability in live-market +execution pipelines. This repository provides a core event-graph schema, a +deterministic replay engine, and governance scaffolding with starter adapters. -Core capabilities -- Event-graph model: capture MDTick, signals, PlanDelta decisions, orders, fills, and risk-check results as nodes with precise timestamps. -- Deterministic replay engine: replay a captured delta-stream and event log to reproduce a specific decision path in a sandbox. -- Governance-led audit log: crypto-signed triggers, approvals, and tamper-evident logs. -- Lightweight adapters: starter adapters for FIX feed and exchange gateway sandbox. -- Privacy posture: data-minimization and sanitization options for compliance needs. +What you get +- Lightweight event graph primitives: LocalEvent, PlanDelta, OrderEvent, FillEvent, RiskCheck, AuditLog, PrivacyBudget +- Deterministic replay engine to reproduce a decision path from a delta stream and event log +- Governance ledger scaffold with crypto-signed audit entries (in-memory for MVP) +- Two starter adapters: FIX feed simulator and exchange gateway stub +- Packaging metadata and tests ensuring a safe MVP workflow -MVP Scope (8–12 weeks) -- Phase 0: core event-graph schema, deterministic replay engine, two adapters. -- Phase 1: governance ledger scaffold and privacy controls. -- Phase 2: partitioned-network testbed with a sandbox exchange. -- Phase 3: incident replay dashboard and governance reporting. +Usage outline +- Build and test: ./test.sh +- Explore the API under deltatrace.core and adapters package +- Extend with additional adapters and governance rules as needed -Build & testing -- Use test.sh to run unit tests and packaging checks (python3 -m build). -- The repository is structured to enable safe collaboration with a small, production-like core and pluggable adapters. - -Note: This is an MVP aimed at validating the interoperability, replay fidelity, and auditability of live-market pipelines, independent of vendor ecosystems. - -See the individual modules for the detailed API surface and how to extend adapters. - -Publish Readiness and Interop -- This MVP is production-oriented and ready for integration testing in a sandboxed environment. -- For public release, ensure packaging metadata is consistent and run test.sh (unit tests + packaging) locally. -- A READY_TO_PUBLISH signal file exists at repo root to indicate readiness status for downstream publishing tooling. -- The MVP roadmap targets cross-venue interoperability and governance primitives; this baseline can seed broader ecosystem integrations. -- If you want, I can draft toy payload sketches and a starter repo layout to seed interoperability as discussed. +This README intentionally stays lightweight and actionable for quick onboarding. diff --git a/adapters/exchange_gateway_adapter.py b/adapters/exchange_gateway_adapter.py new file mode 100644 index 0000000..e6abff6 --- /dev/null +++ b/adapters/exchange_gateway_adapter.py @@ -0,0 +1,26 @@ +"""Minimal exchange gateway adapter stub. + +Provides a toy FillEvent for deterministic replay demonstrations. +""" +from __future__ import annotations + +import time +from deltatrace.core import FillEvent + + +class ExchangeGatewayAdapter: + def __init__(self, venue: str = "TESTEX"): # simple venue name + self.venue = venue + self._counter = 0 + + def fill(self, order_id: str, size: int, price: float) -> FillEvent: + self._counter += 1 + return FillEvent( + fill_id=f"F-{self._counter}", + order_id=order_id, + price=price, + size=size, + timestamp=time.time(), + venue=self.venue, + related_delta=None, + ) diff --git a/adapters/fix_feed_adapter.py b/adapters/fix_feed_adapter.py new file mode 100644 index 0000000..47b2d12 --- /dev/null +++ b/adapters/fix_feed_adapter.py @@ -0,0 +1,28 @@ +"""Minimal FIX feed simulator adapter. + +This module provides a tiny, test-friendly FIX-like feed generator that +produces LocalEvent payloads compatible with the core types. +""" +from __future__ import annotations + +import time +from deltatrace.core import LocalEvent + + +class FIXFeedSimulator: + def __init__(self, instrument: str = "EURUSD", source_id: str = "FIXFIX") -> None: + self.instrument = instrument + self.source_id = source_id + self._counter = 0 + + def tick(self) -> LocalEvent: + self._counter += 1 + evt = LocalEvent( + instrument=self.instrument, + timestamp=time.time(), + data_hash=f"hash-{self._counter}", + source_id=self.source_id, + version="0.1", + payload={"type": "MDTick", "seq": self._counter}, + ) + return evt diff --git a/deltatrace/__init__.py b/deltatrace/__init__.py index ac377ef..b2bb515 100644 --- a/deltatrace/__init__.py +++ b/deltatrace/__init__.py @@ -1,16 +1,31 @@ -"""DeltaTrace core package initialization.""" +"""DeltaTrace MVP package +Lightweight core for event-graph schema and deterministic replay scaffolding. +""" -from .core import LocalEvent, PlanDelta, OrderEvent, FillEvent, RiskCheck, AuditLog, PrivacyBudget, TraceGraph -from .replay import DeterministicReplayEngine +from .core import ( + LocalEvent, + SharedSignal, + PlanDelta, + OrderEvent, + FillEvent, + RiskCheck, + AuditLog, + PrivacyBudget, + MessageMeta, + TraceGraph, + ReplayEngine, +) __all__ = [ "LocalEvent", + "SharedSignal", "PlanDelta", "OrderEvent", "FillEvent", "RiskCheck", "AuditLog", "PrivacyBudget", + "MessageMeta", "TraceGraph", - "DeterministicReplayEngine", + "ReplayEngine", ] diff --git a/deltatrace/core.py b/deltatrace/core.py index a7cc824..75a98d8 100644 --- a/deltatrace/core.py +++ b/deltatrace/core.py @@ -1,105 +1,161 @@ +"""DeltaTrace MVP core: event graph schema and deterministic replay scaffold. + +This module provides lightweight data structures and a tiny, deterministic +replay engine suitable for an MVP. It is not a production-ready tracing system, +but a minimal, well-typed foundation you can build upon. +""" from __future__ import annotations from dataclasses import dataclass, field -from typing import Any, Dict, List +from typing import Any, Dict, List, Optional +import hashlib +import json @dataclass class LocalEvent: - id: str instrument: str timestamp: float - data: Dict[str, Any] = field(default_factory=dict) + data_hash: str + source_id: str + version: str + payload: Dict[str, Any] = field(default_factory=dict) @dataclass class SharedSignal: - id: str - signal_type: str + signal_id: str + value: float + uncertainty: float timestamp: float - value: Any + privacy_tag: Optional[str] = None @dataclass class PlanDelta: - id: str - decision: str + delta_id: str timestamp: float - latency_budget_ms: int = 0 + author: str + contract_id: str + signature: str + safety_tags: List[str] = field(default_factory=list) @dataclass class OrderEvent: - id: str order_id: str side: str - qty: int price: float + size: int timestamp: float + venue: str + related_delta: Optional[str] = None @dataclass class FillEvent: - id: str + fill_id: str order_id: str - qty: int price: float + size: int timestamp: float + venue: str + related_delta: Optional[str] = None @dataclass class RiskCheck: - id: str - check_name: str + check_id: str result: bool + rationale: str timestamp: float @dataclass class AuditLog: - id: str - entry: str + entry_id: str + signer: str timestamp: float - signature: str # placeholder for crypto-signed payload + contract_id: str + action: str + details: Optional[str] = None @dataclass class PrivacyBudget: - id: str - venue: str + signal_id: str budget: float - consumed: float = 0.0 + expiry: float @dataclass -class Edge: - src: str - dst: str - label: str +class MessageMeta: + version: str + nonce: str + signature: str + encoding: str = "json" -@dataclass class TraceGraph: - nodes: List[str] = field(default_factory=list) - edges: List[Edge] = field(default_factory=list) + """Minimal in-memory causal graph for an MVP. - def add_node(self, node_id: str) -> None: - if node_id not in self.nodes: - self.nodes.append(node_id) + Nodes are stored as a simple list; edges are tuples (src, dst, label). + This is a lightweight scaffold for deterministic replay and auditing. + """ - def add_edge(self, src: str, dst: str, label: str) -> None: - self.edges.append(Edge(src=src, dst=dst, label=label)) + def __init__(self) -> None: + self.nodes: List[Any] = [] + self.edges: List[tuple] = [] # (src, dst, label) + + def add_node(self, node: Any) -> None: + self.nodes.append(node) + + def add_edge(self, src: Any, dst: Any, label: str) -> None: + self.edges.append((src, dst, label)) + + def to_dict(self) -> Dict[str, Any]: + return { + "nodes": [repr(n) for n in self.nodes], + "edges": [ + {"src": repr(e[0]), "dst": repr(e[1]), "label": e[2]} + for e in self.edges + ], + } -def build_graph_from_events(events: List[Dict[str, Any]]) -> TraceGraph: - graph = TraceGraph() - for e in events: - nid = e.get("id") - if not nid: - continue - graph.add_node(nid) - parent = e.get("parent_id") - if parent: - graph.add_node(parent) - graph.add_edge(parent, nid, e.get("type", "child")) - return graph +class ReplayEngine: + """Deterministic replay scaffold. + + Given a delta-stream (PlanDelta list) and a generic event-log (list), + compute a deterministic fidelity root (hash) representing the replay + outcome. This is intentionally small but deterministic and testable. + """ + + def __init__(self, delta_stream: List[PlanDelta], event_log: List[Dict[str, Any]]): + self.delta_stream = delta_stream + self.event_log = event_log + + def _freeze(self, obj: Any) -> str: + return json.dumps(obj, sort_keys=True, default=str) + + def compute_root(self) -> str: + parts: List[str] = [] + for d in self.delta_stream: + parts.append(self._freeze({"delta_id": d.delta_id, "ts": d.timestamp, "author": d.author})) + for e in self.event_log: + parts.append(self._freeze(e)) + blob = "|".join(parts) + return hashlib.sha256(blob.encode("utf-8")).hexdigest() + + def replay_path(self) -> Dict[str, Any]: + """Return a minimal, deterministic representation of the replay path.""" + path = [] + for d in self.delta_stream: + path.append({"delta_id": d.delta_id, "ts": d.timestamp, "author": d.author}) + # Attach matched events by simple association (if any) by timestamp range + for ev in self.event_log: + path.append({"event": ev}) + return { + "path": path, + "root": self.compute_root(), + } diff --git a/governance/ledger.py b/governance/ledger.py new file mode 100644 index 0000000..06b7bf0 --- /dev/null +++ b/governance/ledger.py @@ -0,0 +1,56 @@ +"""Lightweight governance ledger scaffold with crypto-like signing. + +This module uses a simple RSA key pair to sign audit entries for tamper-evident +log entries. It is intentionally minimal but provides a realistic API for MVP. +""" +from __future__ import annotations + +import json +from typing import Any, Dict, List +from dataclasses import dataclass, asdict + +from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives import hashes, serialization + + +@dataclass +class AuditEntry: + payload: Dict[str, Any] + signature: str + signer: str + + +class GovernanceLedger: + def __init__(self, signer_id: str = "governor") -> None: + self.signer_id = signer_id + self._private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + self._public_key = self._private_key.public_key() + self.entries: List[AuditEntry] = [] + + def sign(self, payload: Dict[str, Any]) -> AuditEntry: + payload_bytes = json.dumps(payload, sort_keys=True).encode("utf-8") + signature = self._private_key.sign( + payload_bytes, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256(), + ) + entry = AuditEntry(payload=payload, signature=signature.hex(), signer=self.signer_id) + self.entries.append(entry) + return entry + + def verify(self, entry: AuditEntry) -> bool: + payload_bytes = json.dumps(entry.payload, sort_keys=True).encode("utf-8") + try: + self._public_key.verify( + bytes.fromhex(entry.signature), + payload_bytes, + padding.PSS( + mgf=padding.MGF1(hashes.SHA256()), salt_length=padding.PSS.MAX_LENGTH + ), + hashes.SHA256(), + ) + return True + except Exception: + return False diff --git a/pyproject.toml b/pyproject.toml index c7288b7..b6385a5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,16 +1,14 @@ [build-system] -requires = ["setuptools>=42", "wheel"] +requires = ["setuptools", "wheel", "pip"] build-backend = "setuptools.build_meta" [project] name = "deltatrace-core" -version = "0.1.0" -description = "Deterministic replayable latency & governance tracing for live market pipelines" +version = "0.0.1" +description = "DeltaTrace MVP core: event graph skeleton and deterministic replay scaffold" readme = "README.md" requires-python = ">=3.9" -authors = [ { name = "OpenCode" } ] -license = { text = "MIT" } -dependencies = [ ] +dependencies = [] -[tool.setuptools.packages.find] -where = ["."] +[tool.setuptools] +packages = ["deltatrace", "adapters", "governance"] diff --git a/test.sh b/test.sh index db9d5a3..39e9369 100755 --- a/test.sh +++ b/test.sh @@ -1,11 +1,8 @@ #!/usr/bin/env bash set -euo pipefail -# Run Python tests and packaging to validate MVP readiness -echo "Running Python tests..." +echo "Running unit tests..." pytest -q - -echo "Building package to validate metadata..." +echo "Building package (pyproject.toml)..." python3 -m build - -echo "All tests passed and package built." +echo "All tests and build succeeded." diff --git a/tests/test_basic.py b/tests/test_basic.py new file mode 100644 index 0000000..0efa55e --- /dev/null +++ b/tests/test_basic.py @@ -0,0 +1,26 @@ +import time +from deltatrace.core import LocalEvent, PlanDelta, ReplayEngine + + +def test_replay_root_is_deterministic(): + d1 = PlanDelta(delta_id="d1", timestamp=1.0, author="alice", contract_id="c1", signature="sig1") + d2 = PlanDelta(delta_id="d2", timestamp=2.0, author="bob", contract_id="c2", signature="sig2") + events = [ + {"type": "order", "order_id": "o1", "ts": 1.5}, + {"type": "fill", "fill_id": "f1", "order_id": "o1", "ts": 1.7}, + ] + engine = ReplayEngine([d1, d2], events) + root1 = engine.compute_root() + # Recompute should yield the same root + engine2 = ReplayEngine([d1, d2], events) + root2 = engine2.compute_root() + assert root1 == root2 + + +def test_path_length_and_structure(): + d = PlanDelta(delta_id="d", timestamp=0.1, author="a", contract_id="c", signature="sig") + engine = ReplayEngine([d], []) + path = engine.replay_path() + assert isinstance(path, dict) + assert "path" in path and isinstance(path["path"], list) + assert "root" in path