from typing import List, Dict, Any class PolicyEngine: def __init__(self): # Separate allow and deny policies for straightforward evaluation order self._policies: List[Dict[str, Any]] = [] # allowed rules self._denies: List[Dict[str, Any]] = [] # denied rules def load_policies(self, dsl: str) -> None: # Very small DSL parser for a single-line policy per call. # Supports two forms: # allow ... # deny ... # Example: # allow subject="traderA" venue="venue1" signal="price" action="read" latency_ms=5 volume_cap=1000 regulatory="none" # deny subject="traderB" venue="venue1" signal="price" action="read" latency_ms=0 for line in [l.strip() for l in dsl.splitlines() if l.strip()]: if not line: continue # Determine policy type if line.startswith("allow"): rest = line[len("allow"):].strip() policy: Dict[str, Any] = {} parts = rest.split() for part in parts: if "=" not in part: continue k, v = part.split("=", 1) v = v.strip().strip('"') if v.isdigit(): policy[k] = int(v) else: policy[k] = v policy.setdefault("latency_ms", 0) policy.setdefault("volume_cap", None) self._policies.append(policy) elif line.startswith("deny"): rest = line[len("deny"):].strip() policy: Dict[str, Any] = {} parts = rest.split() for part in parts: if "=" not in part: continue k, v = part.split("=", 1) v = v.strip().strip('"') if v.isdigit(): policy[k] = int(v) else: policy[k] = v policy.setdefault("latency_ms", 0) policy.setdefault("volume_cap", None) self._denies.append(policy) def check_access(self, subject: str, venue: str, signal: str, action: str) -> bool: # Deny policies take precedence: if any deny matches, deny access for p in self._denies: if ( p.get("subject") in (subject, "any") and p.get("venue") in (venue, "any") and p.get("signal") == signal and p.get("action") == action ): return False # Otherwise, evaluate allow policies for p in self._policies: if ( p.get("subject") in (subject, "any") and p.get("venue") in (venue, "any") and p.get("signal") == signal and p.get("action") == action ): return True return False