36 lines
1.3 KiB
Python
36 lines
1.3 KiB
Python
from __future__ import annotations
|
|
|
|
from typing import Dict, Any, Optional
|
|
from .contract import SafetyContract, safe_eval
|
|
|
|
|
|
class PolicyEngine:
|
|
"""Minimal runtime policy engine for on-board guards.
|
|
|
|
Given a SafetyContract, a proposed plan, and the current state, decide whether to allow,
|
|
veto, or modify the plan. This is intentionally small for MVP while being deterministic.
|
|
"""
|
|
|
|
def __init__(self, contract: SafetyContract):
|
|
self.contract = contract
|
|
|
|
def evaluate(self, plan: Dict[str, Any], state: Dict[str, Any]) -> Dict[str, Any]:
|
|
# 1) Pre-conditions must hold
|
|
pre_ok = self.contract.evaluate_pre(state | {"action": plan.get("action")})
|
|
if not pre_ok:
|
|
return {"allowed": False, "reason": "pre_condition_failed"}
|
|
|
|
# 2) Budget checks
|
|
costs: Dict[str, float] = plan.get("costs", {})
|
|
for key, limit in (self.contract.budgets or {}).items():
|
|
if key in costs and costs[key] > limit:
|
|
return {"allowed": False, "reason": f"budget_exceeded: {key}"}
|
|
|
|
# 3) Collision/warning rules
|
|
for expr in self.contract.collision_rules:
|
|
if expr:
|
|
if safe_eval(expr, {**state, **plan}):
|
|
return {"allowed": False, "reason": "collision_rule_violation"}
|
|
|
|
return {"allowed": True}
|