feedtrust-blockchain-backed.../feedtrust/policy.py

75 lines
2.9 KiB
Python

from typing import List, Dict, Any
class PolicyEngine:
def __init__(self):
# Separate allow and deny policies for straightforward evaluation order
self._policies: List[Dict[str, Any]] = [] # allowed rules
self._denies: List[Dict[str, Any]] = [] # denied rules
def load_policies(self, dsl: str) -> None:
# Very small DSL parser for a single-line policy per call.
# Supports two forms:
# allow ...
# deny ...
# Example:
# allow subject="traderA" venue="venue1" signal="price" action="read" latency_ms=5 volume_cap=1000 regulatory="none"
# deny subject="traderB" venue="venue1" signal="price" action="read" latency_ms=0
for line in [l.strip() for l in dsl.splitlines() if l.strip()]:
if not line:
continue
# Determine policy type
if line.startswith("allow"):
rest = line[len("allow"):].strip()
policy: Dict[str, Any] = {}
parts = rest.split()
for part in parts:
if "=" not in part:
continue
k, v = part.split("=", 1)
v = v.strip().strip('"')
if v.isdigit():
policy[k] = int(v)
else:
policy[k] = v
policy.setdefault("latency_ms", 0)
policy.setdefault("volume_cap", None)
self._policies.append(policy)
elif line.startswith("deny"):
rest = line[len("deny"):].strip()
policy: Dict[str, Any] = {}
parts = rest.split()
for part in parts:
if "=" not in part:
continue
k, v = part.split("=", 1)
v = v.strip().strip('"')
if v.isdigit():
policy[k] = int(v)
else:
policy[k] = v
policy.setdefault("latency_ms", 0)
policy.setdefault("volume_cap", None)
self._denies.append(policy)
def check_access(self, subject: str, venue: str, signal: str, action: str) -> bool:
# Deny policies take precedence: if any deny matches, deny access
for p in self._denies:
if (
p.get("subject") in (subject, "any")
and p.get("venue") in (venue, "any")
and p.get("signal") == signal
and p.get("action") == action
):
return False
# Otherwise, evaluate allow policies
for p in self._policies:
if (
p.get("subject") in (subject, "any")
and p.get("venue") in (venue, "any")
and p.get("signal") == signal
and p.get("action") == action
):
return True
return False