idea119-regflow-verifiable-pre/regflow/core.py

128 lines
4.4 KiB
Python

from __future__ import annotations
"""Core DSL compiler and basic proof engine for RegFlow skeleton.
This module provides:
- A tiny DSL compiler that converts lines like:
constraint max_position venue1 AAPL 1000
constraint min_cash venue1 5000
into a canonical IR structure.
- A simple per-trade proof checker that evaluates the IR rules against a
trade descriptor and returns a basic proof result.
This is intentionally minimal but designed to be easily extended into a full
production-ready implementation.
"""
from typing import Any, Dict, List, Tuple
def _parse_line_to_rule(line: str) -> Dict[str, Any]:
line = line.strip()
if not line or line.startswith("#"):
raise ValueError("Empty or non-rule line")
if not line.startswith("constraint"):
raise ValueError(f"Unsupported DSL line: {line}")
rem = line[len("constraint"):].strip()
tokens = rem.split()
if not tokens:
raise ValueError("Malformed constraint line")
rule_type = tokens[0]
rest = tokens[1:]
if rule_type == "max_position":
# Expect: venue instrument limit
if len(rest) != 3:
raise ValueError("max_position requires 3 tokens: venue instrument limit")
venue, instrument, limit = rest
return {
"type": "max_position",
"venue": venue,
"instrument": instrument,
"limit": int(limit),
}
elif rule_type == "min_cash":
# Expect: venue amount
if len(rest) != 2:
raise ValueError("min_cash requires 2 tokens: venue amount")
venue, amount = rest
return {"type": "min_cash", "venue": venue, "amount": int(amount)}
else:
raise ValueError(f"Unknown constraint type: {rule_type}")
def compile_dsl(dsl_text: str) -> Dict[str, Any]:
"""Compile a tiny DSL into a canonical IR.
Example input:
constraint max_position venue1 AAPL 1000
constraint min_cash venue1 5000
Output IR:
{
"rules": [
{"type": "max_position", "venue": "venue1", "instrument": "AAPL", "limit": 1000},
{"type": "min_cash", "venue": "venue1", "amount": 5000}
]
}
"""
lines = [ln for ln in dsl_text.strip().splitlines() if ln.strip() and not ln.strip().startswith("#")]
rules: List[Dict[str, Any]] = []
for line in lines:
if not line.strip():
continue
if line.strip().startswith("constraint"):
rule = _parse_line_to_rule(line)
rules.append(rule)
else:
# ignore blank or comments; fail on unexpected content to keep DSL strict
raise ValueError(f"Unsupported DSL line: {line}")
return {"rules": rules}
def _evaluate_rule(rule: Dict[str, Any], trade: Dict[str, Any]) -> Tuple[bool, str]:
rtype = rule.get("type")
if rtype == "max_position":
venue = rule["venue"]
instrument = rule["instrument"]
limit = rule["limit"]
qty = trade.get("qty")
tvenue = trade.get("venue")
tinstrument = trade.get("instrument")
if tvenue != venue or tinstrument != instrument:
return True, "no_match" # Not applicable to this trade
if qty is None:
return False, "missing_qty"
return (qty <= limit), f"qty={qty} <= limit={limit}"
elif rtype == "min_cash":
venue = rule["venue"]
amount = rule["amount"]
tvenue = trade.get("venue")
cash = trade.get("cash")
if tvenue != venue:
return True, "no_match"
if cash is None:
return False, "missing_cash"
return (cash >= amount), f"cash={cash} >= amount={amount}"
else:
return False, "unknown_rule_type"
def generate_proof(ir: Dict[str, Any], trade: Dict[str, Any]) -> Dict[str, Any]:
"""Evaluate IR rules against a trade and return a simple proof object.
The proof contains:
- valid: overall verdict
- details: per-rule evaluation results
- summary: high-level messaging
"""
rules = ir.get("rules", [])
all_ok = True
details: List[Dict[str, Any]] = []
for idx, rule in enumerate(rules):
ok, reason = _evaluate_rule(rule, trade)
details.append({"rule_index": idx, "rule": rule, "ok": ok, "reason": reason})
if not ok:
all_ok = False
summary = "all applicable rules satisfied" if all_ok else "one or more rules violated"
return {"valid": all_ok, "details": details, "summary": summary}