"""Simple policy DSL and evaluator for BeXProof. This module provides a lightweight policy container and a tiny evaluator. Policies are represented as JSON-like strings for simplicity in this MVP. In production, replace with a proper DSL parser and validator. Enhancement: support versioned policy blocks to enable multi-version governance and policy evolution. The loader will select the highest-versioned block if a policy contains a "blocks" array. If the policy uses the legacy shape {"version": ..., "rules": ...}, it is still supported for backward compatibility. """ from __future__ import annotations import json from dataclasses import dataclass from typing import Dict, Any @dataclass class Policy: version: int rules: Dict[str, Any] def load_policy(policy_text: str) -> Policy: # Accept either JSON or Python-like dict string (with single quotes) data = None try: data = json.loads(policy_text) except Exception: # try Python-style dict string try: data = json.loads(policy_text.replace("'", '"')) except Exception: raise ValueError("Policy text is not valid JSON or Python-like dict string") # Support both legacy shape and versioned blocks: # Legacy: { "version": 1, "rules": { ... } } # Versioned: { "blocks": [ { "version": 1, "rules": { ... } }, ... ] } if not isinstance(data, dict): raise ValueError("Policy must be a JSON object") # If versioned blocks are present, pick the highest version if "blocks" in data and isinstance(data["blocks"], list): blocks = data["blocks"] if not blocks: raise ValueError("Policy blocks list is empty") # Find block with max version best = max(blocks, key=lambda b: int(b.get("version", 0))) if not isinstance(best, dict) or "version" not in best or "rules" not in best: raise ValueError("Policy block must contain 'version' and 'rules' keys") return Policy(version=int(best["version"]), rules=best["rules"]) # Legacy single-block shape if "version" in data and "rules" in data: return Policy(version=int(data["version"]), rules=data["rules"]) raise ValueError("Policy must contain 'version' and 'rules' keys or a 'blocks' array") def evaluate_policy(log: Dict[str, Any], policy: Policy) -> bool: # Minimal evaluation: all top-level rules keys are checked if present in log # This is a small MVP; in production, rules would be richer and more formal. for key, thresh in policy.rules.items(): if key == "price_improvement_min": if log.get("price_improvement", 0) < float(thresh): return False elif key == "latency_budget_ms": if log.get("latency_ms", float("inf")) > int(thresh): return False elif key == "slippage_max": if log.get("slippage", float("inf")) > float(thresh): return False # additional rules can be added here return True