48 lines
1.8 KiB
Python
48 lines
1.8 KiB
Python
from typing import List, Dict, Any
|
|
|
|
|
|
class PolicyEngine:
|
|
def __init__(self):
|
|
self._policies: List[Dict[str, Any]] = []
|
|
|
|
def load_policies(self, dsl: str) -> None:
|
|
# Very small DSL parser for a single-line policy per call.
|
|
# Example:
|
|
# allow subject="traderA" venue="venue1" signal="price" action="read" latency_ms=5 volume_cap=1000 regulatory="none"
|
|
for line in [l.strip() for l in dsl.splitlines() if l.strip()]:
|
|
if not line:
|
|
continue
|
|
if not line.startswith("allow"):
|
|
continue
|
|
# Remove leading 'allow'
|
|
rest = line[len("allow"):].strip()
|
|
policy: Dict[str, Any] = {}
|
|
# Split by spaces, then parse key=value parts
|
|
parts = rest.split()
|
|
for part in parts:
|
|
if "=" not in part:
|
|
continue
|
|
k, v = part.split("=", 1)
|
|
v = v.strip().strip('"')
|
|
# cast common types
|
|
if v.isdigit():
|
|
policy[k] = int(v)
|
|
else:
|
|
policy[k] = v
|
|
# Normalize some keys
|
|
policy.setdefault("latency_ms", 0)
|
|
policy.setdefault("volume_cap", None)
|
|
self._policies.append(policy)
|
|
|
|
def check_access(self, subject: str, venue: str, signal: str, action: str) -> bool:
|
|
# Simple matcher: any policy that matches all provided fields grants access
|
|
for p in self._policies:
|
|
if (
|
|
p.get("subject") in (subject, "any")
|
|
and p.get("venue") in (venue, "any")
|
|
and p.get("signal") == signal
|
|
and p.get("action") == action
|
|
):
|
|
return True
|
|
return False
|