signalcanvas-graph-based-ma.../examples/orchestrator.py

60 lines
2.1 KiB
Python

"""
Tiny orchestrator example:
- Wire two adapters (FIX feed and simulated venue) to SignalCanvas primitives
- Produce PlanDelta updates and reconcile across partitions using delta_sync
- Demonstrate exporting a narrative via NLP helper and recording to governance ledger
"""
import time
from signalcanvas_graph_based_market_signal_s import (
Graph,
SignalNode,
Link,
HedgePlan,
GraphRegistry,
replay_deltas,
)
from signalcanvas_graph_based_market_signal_s.adapters import FixFeedAdapter, SimulatedVenueAdapter
from signalcanvas_graph_based_market_signal_s.delta_sync import (
create_plan_delta,
)
from signalcanvas_graph_based_market_signal_s.nlp import generate_narrative
from signalcanvas_graph_based_market_signal_s.ledger import GovernanceLedger
def demo_run():
# Setup a tiny graph with two signals
g = Graph()
n1 = SignalNode(id="n1", type="price", asset="AAPL", venue="FIX", timestamp=time.time(), value=150.0)
n2 = SignalNode(id="n2", type="depth", asset="AAPL", venue="FIX", timestamp=time.time(), value=1.2)
g.add_node(n1)
g.add_node(n2)
g.add_link(Link(from_id="n1", to_id="n2", relation="lead-lag"))
g.add_hedge(HedgePlan(id="h1", delta={"AAPL": 2.0}, scenario_name="sc1", timestamp=time.time()))
# Two adapters (one real-like FIX feed, one simulated venue)
fix = FixFeedAdapter(asset="AAPL")
sim = SimulatedVenueAdapter(assets=["AAPL", "MSFT"])
# Create a couple deltas as if these feeds produced updates
pd1 = create_plan_delta("plan-1", {"AAPL": 1.0}, 1, "demo-signer")
pd2 = create_plan_delta("plan-1", {"MSFT": 0.5}, 2, "demo-signer")
narrative = generate_narrative(pd1.delta, context="demo run")
print("Narrative:\n", narrative)
# Governance ledger entry (mock)
ledger = GovernanceLedger(signer_key="demo-signer-key")
ledger.append(pd1.plan_id, pd1.delta)
print("Ledger entries:", len(ledger.entries))
# Simple delta replay for demonstration
state = {"AAPL": 100.0, "MSFT": 200.0}
state = replay_deltas(state, [pd1.delta, pd2.delta])
print("Replayed state:", state)
return 0
if __name__ == "__main__":
demo_run()