diff --git a/README.md b/README.md index c7c3222..417f421 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,18 @@ -# SignalVault Verifiable Privacy-Preserving Signal Repository (MVP) +# SignalVault MVP -This project implements a portable, graph-backed signal store focused on verifiable provenance, -offline replay, and privacy-preserving sharing of market signals across venues. The MVP defines -canonical graph primitives (SignalNode, Edge, Scenario, HedgePlan) and a Graph-of-Contracts -registry to map adapters to data feeds, risk models, and execution engines. +Portable, graph-backed signal store for finance with offline replay and privacy-preserving sharing. -What you get in this MVP: -- Core graph primitives and a tiny in-memory registry -- Deterministic replay engine for applying deltas and reconstructing signal state -- Privacy primitives placeholders (secure aggregation, DP budgets) -- Toy adapters for a price feed and a simulated venue -- Lightweight tests validating schema and replay behavior +Key components +- Canonical graph primitives: SignalNode, Edge, Scenario, HedgePlan +- Graph-of-Contracts registry for adapters and data contracts +- Deterministic offline replay engine for reproducible backtests +- Toy adapters: price feed and simulated venue +- Lightweight privacy primitives placeholder (budgets, simple aggregation) -How to run locally: -- Ensure Python 3.11+ is installed -- Run tests via: ./test.sh -- Build package via: python3 -m build (as part of test.sh) +Quick start +- Run tests: pytest +- The MVP demonstrates basic delta application and deterministic replay across two adapters. -Publishing notes: -- Python package name: signalvault_verifiable_privacy_preservin -- Exposes a small public API surface for MVP exploration and integration tests - -This repo intentionally keeps scope minimal and testable to facilitate 1-feature-per-sprint iteration. +Notes +- This is a minimal, MVP-focused subset intended to bootstrap the broader SignalVault vision. +- See signalvault/ for the core primitives and demo adapters. diff --git a/signalvault/__init__.py b/signalvault/__init__.py new file mode 100644 index 0000000..40847cf --- /dev/null +++ b/signalvault/__init__.py @@ -0,0 +1,24 @@ +"""SignalVault MVP package + +A lightweight, graph-backed store for verifiable market signals with offline replay. +This module exposes the core primitives and tiny adapters used in tests. +""" + +from .schema import SignalNode, Edge, Scenario, HedgePlan, AuditLog, PrivacyBudget +from .registry import GraphOfContractsRegistry +from .replay import DeterministicReplayEngine +from .adapters.price_feed_adapter import PriceFeedAdapter +from .adapters.simulated_venue_adapter import SimulatedVenueAdapter + +__all__ = [ + "SignalNode", + "Edge", + "Scenario", + "HedgePlan", + "AuditLog", + "PrivacyBudget", + "GraphOfContractsRegistry", + "DeterministicReplayEngine", + "PriceFeedAdapter", + "SimulatedVenueAdapter", +] diff --git a/signalvault/adapters/price_feed_adapter.py b/signalvault/adapters/price_feed_adapter.py new file mode 100644 index 0000000..896c5f5 --- /dev/null +++ b/signalvault/adapters/price_feed_adapter.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Optional + +from ..schema import SignalNode +import time + + +@dataclass +class PriceFeedAdapter: + asset: str + venue: str + def generate_signal(self, price: float, timestamp: Optional[int] = None) -> SignalNode: + ts = timestamp if timestamp is not None else int(time.time()) + # Simple heuristic: price above threshold yields a positive signal + signal_type = "price_above_threshold" if price > 100 else "price_below_threshold" + return SignalNode(asset=self.asset, venue=self.venue, signal_type=signal_type, timestamp=ts, quality=1.0) diff --git a/signalvault/adapters/simulated_venue_adapter.py b/signalvault/adapters/simulated_venue_adapter.py new file mode 100644 index 0000000..f769a0a --- /dev/null +++ b/signalvault/adapters/simulated_venue_adapter.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict + +from ..schema import HedgePlan + + +@dataclass +class SimulatedVenueAdapter: + venue_name: str = "SimVenue" + + def submit_plan(self, plan: HedgePlan) -> Dict[str, str]: + # Tiny stub that pretends order is always accepted + return {"venue": self.venue_name, "order_id": f"ORD-{plan.id}", "status": "accepted"} diff --git a/signalvault/registry.py b/signalvault/registry.py new file mode 100644 index 0000000..e591627 --- /dev/null +++ b/signalvault/registry.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from typing import Dict, Any +from .schema import HedgePlan + + +class GraphOfContractsRegistry: + """A tiny in-memory registry mapping adapters to their data/contract contracts.""" + + def __init__(self) -> None: + self._registry: Dict[str, Dict[str, Any]] = {} + + def register_adapter(self, name: str, contract: Dict[str, Any]) -> None: + self._registry[name] = contract + + def get_contract(self, name: str) -> dict[str, Any] | None: + return self._registry.get(name) + + def conformance_check(self, name: str, contract_candidate: dict[str, Any]) -> bool: + # Minimal conformance: ensure required keys exist + required = {"version", "data_contract"} + if name not in self._registry: + return False + base = self._registry[name] + return required.issubset(set(base.keys())) and "data_contract" in base diff --git a/signalvault/replay.py b/signalvault/replay.py new file mode 100644 index 0000000..2bf067a --- /dev/null +++ b/signalvault/replay.py @@ -0,0 +1,57 @@ +from __future__ import annotations + +import hashlib +from typing import Dict, List + +from .schema import SignalNode, Edge, HedgePlan + + +def _hash(obj: object) -> str: + return hashlib.sha256(repr(obj).encode("utf-8")).hexdigest() + + +class DeterministicReplayEngine: + """A tiny deterministic replay engine that applies deltas to an in-memory state. + + State is a dict with keys: + - nodes: dict[id, SignalNode] + - edges: list[Edge] + - hedges: dict[id, HedgePlan] + """ + + def __init__(self) -> None: + self.nodes = {} + self.edges = [] + self.hedges = {} + self._applied_hashes: List[str] = [] + self._node_counter = 0 + + def _next_node_id(self) -> str: + self._node_counter += 1 + return f"n{self._node_counter}" + + def apply_delta(self, delta): + # delta can contain: add_nodes, add_edges, add_hedges + if isinstance(delta, dict) and "add_nodes" in delta and delta["add_nodes"]: + for n in delta["add_nodes"]: + if getattr(n, "id", None) is None: + self._node_counter += 1 + n = SignalNode(asset=n.asset, venue=n.venue, signal_type=n.signal_type, timestamp=n.timestamp, quality=n.quality, id=f"n{self._node_counter}") + self.nodes[n.id] = n + + if isinstance(delta, dict) and "add_edges" in delta and delta["add_edges"]: + for e in delta["add_edges"]: + self.edges.append(e) + + if isinstance(delta, dict) and "add_hedges" in delta and delta["add_hedges"]: + for h in delta["add_hedges"]: + self.hedges[h.id] = h + + h = _hash((delta, self.nodes, self.edges, self.hedges)) + self._applied_hashes.append(h) + return h + + def replay_to_hash(self, target_hash: str) -> Dict[str, object]: + # Very small replay: return the current in-memory state if last applied hash matches + # Minimal: return a snapshot of current in-memory structures + return {"nodes": self.nodes, "edges": self.edges, "hedges": self.hedges} diff --git a/signalvault/schema.py b/signalvault/schema.py new file mode 100644 index 0000000..a35ff6c --- /dev/null +++ b/signalvault/schema.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import List, Optional + + +@dataclass(frozen=True) +class SignalNode: + asset: str + venue: str + signal_type: str + timestamp: int + quality: float = 1.0 + id: Optional[str] = None + + def __post_init__(self): + if self.timestamp < 0: + raise ValueError("timestamp must be non-negative") + + +@dataclass(frozen=True) +class Edge: + from_node: SignalNode + to_node: SignalNode + relation: str # e.g., "causal", "derived_from", etc. + delta_hash: str + + +@dataclass +class Scenario: + id: str + nodes: List[SignalNode] = field(default_factory=list) + edges: List[Edge] = field(default_factory=list) + description: str = "" + + +@dataclass +class HedgePlan: + id: str + delta: dict + version: int = 1 + approvals: List[str] = field(default_factory=list) + + +@dataclass +class AuditLog: + entries: List[str] = field(default_factory=list) + signer: str | None = None + timestamp: int | None = None + contract_id: str | None = None + + +@dataclass +class PrivacyBudget: + per_signal_budget: dict[str, float] = field(default_factory=dict) + global_budget: float = 0.0 diff --git a/tests/test_signalvault_mvp.py b/tests/test_signalvault_mvp.py new file mode 100644 index 0000000..ed8dd1f --- /dev/null +++ b/tests/test_signalvault_mvp.py @@ -0,0 +1,47 @@ +import time +import sys, os +# Ensure repo root is on PYTHONPATH for tests when executed in isolation +repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if repo_root not in sys.path: + sys.path.insert(0, repo_root) +from signalvault.schema import SignalNode, HedgePlan +from signalvault.registry import GraphOfContractsRegistry +from signalvault.replay import DeterministicReplayEngine +from signalvault.adapters.price_feed_adapter import PriceFeedAdapter +from signalvault.adapters.simulated_venue_adapter import SimulatedVenueAdapter + + +def test_basic_delta_application_and_replay(): + # Setup adapters + price_adapter = PriceFeedAdapter(asset="AAPL", venue="TestVenue") + venue_adapter = SimulatedVenueAdapter() + + # Create initial nodes via adapter + node1 = price_adapter.generate_signal(price=120.0, timestamp=1) + node2 = price_adapter.generate_signal(price=80.0, timestamp=2) + + # Build a simple delta: add two nodes + delta1 = {"add_nodes": [node1, node2]} + + engine = DeterministicReplayEngine() + hash1 = engine.apply_delta(delta1) + + # Add an edge and a hedge plan delta + edge = (node1, node2, "derived_from", "hash123") + from signalvault.schema import Edge, HedgePlan + e = Edge(from_node=node1, to_node=node2, relation=edge[2], delta_hash=edge[3]) + plan = HedgePlan(id="plan-1", delta={"example": True}, version=1, approvals=["alice"]) + delta2 = {"add_edges": [e], "add_hedges": [plan]} + hash2 = engine.apply_delta(delta2) + + # Replay to the latest hash (no state change for this test beyond cumulative delta) + state = engine.replay_to_hash(hash2) + assert isinstance(state, dict) + assert "nodes" in state and len(state["nodes"]) >= 2 + assert len(state["edges"]) == 1 + assert len(state["hedges"]) == 1 + + # Idempotence: applying the same delta again should result in a new hash but identical counts + hash3 = engine.apply_delta(delta2) + assert hash3 != hash2 # hashing includes state; re-applying delta changes hash + assert len(state["nodes"]) >= 2