75 lines
3.0 KiB
Python
75 lines
3.0 KiB
Python
"""Simple policy DSL and evaluator for BeXProof.
|
|
|
|
This module provides a lightweight policy container and a tiny evaluator.
|
|
Policies are represented as JSON-like strings for simplicity in this MVP.
|
|
In production, replace with a proper DSL parser and validator.
|
|
|
|
Enhancement: support versioned policy blocks to enable multi-version governance
|
|
and policy evolution. The loader will select the highest-versioned block if a
|
|
policy contains a "blocks" array. If the policy uses the legacy shape
|
|
{"version": ..., "rules": ...}, it is still supported for backward compatibility.
|
|
"""
|
|
from __future__ import annotations
|
|
import json
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any
|
|
|
|
|
|
@dataclass
|
|
class Policy:
|
|
version: int
|
|
rules: Dict[str, Any]
|
|
|
|
|
|
def load_policy(policy_text: str) -> Policy:
|
|
# Accept either JSON or Python-like dict string (with single quotes)
|
|
data = None
|
|
try:
|
|
data = json.loads(policy_text)
|
|
except Exception:
|
|
# try Python-style dict string
|
|
try:
|
|
data = json.loads(policy_text.replace("'", '"'))
|
|
except Exception:
|
|
raise ValueError("Policy text is not valid JSON or Python-like dict string")
|
|
|
|
# Support both legacy shape and versioned blocks:
|
|
# Legacy: { "version": 1, "rules": { ... } }
|
|
# Versioned: { "blocks": [ { "version": 1, "rules": { ... } }, ... ] }
|
|
if not isinstance(data, dict):
|
|
raise ValueError("Policy must be a JSON object")
|
|
|
|
# If versioned blocks are present, pick the highest version
|
|
if "blocks" in data and isinstance(data["blocks"], list):
|
|
blocks = data["blocks"]
|
|
if not blocks:
|
|
raise ValueError("Policy blocks list is empty")
|
|
# Find block with max version
|
|
best = max(blocks, key=lambda b: int(b.get("version", 0)))
|
|
if not isinstance(best, dict) or "version" not in best or "rules" not in best:
|
|
raise ValueError("Policy block must contain 'version' and 'rules' keys")
|
|
return Policy(version=int(best["version"]), rules=best["rules"])
|
|
|
|
# Legacy single-block shape
|
|
if "version" in data and "rules" in data:
|
|
return Policy(version=int(data["version"]), rules=data["rules"])
|
|
|
|
raise ValueError("Policy must contain 'version' and 'rules' keys or a 'blocks' array")
|
|
|
|
|
|
def evaluate_policy(log: Dict[str, Any], policy: Policy) -> bool:
|
|
# Minimal evaluation: all top-level rules keys are checked if present in log
|
|
# This is a small MVP; in production, rules would be richer and more formal.
|
|
for key, thresh in policy.rules.items():
|
|
if key == "price_improvement_min":
|
|
if log.get("price_improvement", 0) < float(thresh):
|
|
return False
|
|
elif key == "latency_budget_ms":
|
|
if log.get("latency_ms", float("inf")) > int(thresh):
|
|
return False
|
|
elif key == "slippage_max":
|
|
if log.get("slippage", float("inf")) > float(thresh):
|
|
return False
|
|
# additional rules can be added here
|
|
return True
|