62 lines
2.3 KiB
Python
62 lines
2.3 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
def safe_eval(expr: str, context: Dict[str, Any]) -> bool:
|
|
# Extremely small, sandboxed evaluator for MVP.
|
|
allowed_builtins = {"abs": abs, "min": min, "max": max, "sum": sum, "len": len}
|
|
try:
|
|
return bool(eval(expr, {"__builtins__": allowed_builtins}, context))
|
|
except Exception:
|
|
# If evaluation fails, be conservative and treat as not satisfied
|
|
return False
|
|
|
|
|
|
@dataclass
|
|
class SafetyContract:
|
|
contract_id: str
|
|
pre_conditions: List[str] = field(default_factory=list)
|
|
post_conditions: List[str] = field(default_factory=list)
|
|
budgets: Dict[str, float] = field(default_factory=dict) # e.g., {"time": 10.0, "energy": 100.0}
|
|
collision_rules: List[str] = field(default_factory=list)
|
|
trust_policy: Dict[str, str] = field(default_factory=dict)
|
|
|
|
def to_dict(self) -> Dict[str, Any]:
|
|
return {
|
|
"contract_id": self.contract_id,
|
|
"pre_conditions": self.pre_conditions,
|
|
"post_conditions": self.post_conditions,
|
|
"budgets": self.budgets,
|
|
"collision_rules": self.collision_rules,
|
|
"trust_policy": self.trust_policy,
|
|
}
|
|
|
|
@staticmethod
|
|
def from_dict(data: Dict[str, Any]) -> "SafetyContract":
|
|
return SafetyContract(
|
|
contract_id=data.get("contract_id", "unnamed-contract"),
|
|
pre_conditions=data.get("pre_conditions", []),
|
|
post_conditions=data.get("post_conditions", []),
|
|
budgets=data.get("budgets", {}),
|
|
collision_rules=data.get("collision_rules", []),
|
|
trust_policy=data.get("trust_policy", {}),
|
|
)
|
|
|
|
def evaluate_pre(self, state: Dict[str, Any]) -> bool:
|
|
# Evaluate all pre-conditions in the given state/context
|
|
# Some DSLs reference a "state" object; support that by packaging the current state under 'state'
|
|
local_context = {"state": state}
|
|
for expr in self.pre_conditions:
|
|
if not safe_eval(expr, local_context):
|
|
return False
|
|
return True
|
|
|
|
def evaluate_post(self, state: Dict[str, Any]) -> bool:
|
|
local_context = {"state": state}
|
|
for expr in self.post_conditions:
|
|
if not safe_eval(expr, local_context):
|
|
return False
|
|
return True
|