build(agent): new-agents-2#7e3bbc iteration

This commit is contained in:
agent-7e3bbc424e07835b 2026-04-19 22:44:10 +02:00
parent 38f2d3c3ac
commit e130b8635e
16 changed files with 289 additions and 58 deletions

View File

@ -9,12 +9,8 @@ Architectural guidelines and contribution rules for GuardRail.Space MVP.
- MVP scope for safety contracts, policy engine, guard module, and adapters
The goal is to provide a production-ready scaffold that can be progressively extended:
- DSL for SafetyContracts (JSON-like for MVP)
- Runtime policy engine to veto or adjust actions
- Shadow planner for risk-proof alternatives
- Offline-first logging and reconciliation
- CatOpt bridge adapters for cross-domain interoperability
- HIL-ready integration hooks (Gazebo/ROS)
- Governance, audit trails, and privacy controls
- MVP: DSL for SafetyContracts, runtime policy engine, guard module, shadow planner, and adapters.
- The MVP will also include a minimal Python package, unit tests, and a test runner script (test.sh).
- Additional adapters for Gazebo/ROS integration and offline reconciliation scaffolding will be implemented progressively.
If you add features, update this document to reflect schema changes and testing commands.

View File

@ -1,10 +1,13 @@
# GuardRail.Space
# GuardRail.Space MVP
A lightweight, open-source safety framework to govern onboard AGI planners for space robotics. MVP includes:
- JSON-like SafetyContract DSL
- Runtime policy engine to veto or adjust actions
- Guard module with deterministic fallback and shadow-planner hook
- CatOpt bridge adapters skeleton for cross-domain interoperability
- Offline-first design concepts and governance groundwork
A lightweight, open-source safety framework to govern onboard AGI planners for space robotics.
This repository provides a minimal, production-ready scaffold to bootstrap a full MVP. The goal is to enable cross-domain experimentation while maintaining safety and auditability.
- Safety Contract DSL: define pre/post conditions, budgets, collision/warning rules, and trust policy.
- Runtime policy engine: veto or adjust proposed actions.
- Shadow planner: risk-aware alternative planning running in parallel.
- Offline-first design: deterministic reconciliation and auditable logs.
- Privacy-by-design: policy data stays on-device; aggregation only where allowed.
- Lightweight verification hooks and adapters for CatOpt-style interoperability.
- Gazebo/ROS integration path and HIL validation plan (skeletons provided).
This repository provides a minimal production-ready scaffold to be extended with adapters and a full test harness.

5
guard_logs.jsonl Normal file
View File

@ -0,0 +1,5 @@
{"contract_id": "guard-001", "plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}, "state": {"speed": 0.8, "distance_to_obstacle": 5}, "decision": "modify", "new_plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.5}}
{"contract_id": "guard-001", "plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}, "state": {"speed": 0.8, "distance_to_obstacle": 5}, "decision": "allow"}
{"contract_id": "guard-001", "plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}, "state": {"speed": 0.8, "distance_to_obstacle": 5}, "decision": "allow"}
{"contract_id": "guard-001", "plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}, "state": {"speed": 0.8, "distance_to_obstacle": 5}, "decision": "allow"}
{"contract_id": "guard-001", "plan": {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}, "state": {"speed": 0.8, "distance_to_obstacle": 5}, "decision": "allow"}

View File

@ -0,0 +1,12 @@
"""GuardRail.Space MVP: Verifiable Safety Contracts for Onboard AGI-Driven Systems
This package provides a minimal, production-ready skeleton for a Safety Contract DSL,
runtime policy engine, and a guard module with a shadow planner. It is designed as a
foundational core for further integration with Gazebo/ROS and CatOpt-style adapters.
"""
from .contract import SafetyContract
from .guard import GuardModule
from .shadow_planner import ShadowPlanner
__all__ = ["SafetyContract", "GuardModule", "ShadowPlanner"]

View File

@ -0,0 +1,61 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import List, Dict, Any
def safe_eval(expr: str, context: Dict[str, Any]) -> bool:
# Extremely small, sandboxed evaluator for MVP.
allowed_builtins = {"abs": abs, "min": min, "max": max, "sum": sum, "len": len}
try:
return bool(eval(expr, {"__builtins__": allowed_builtins}, context))
except Exception:
# If evaluation fails, be conservative and treat as not satisfied
return False
@dataclass
class SafetyContract:
contract_id: str
pre_conditions: List[str] = field(default_factory=list)
post_conditions: List[str] = field(default_factory=list)
budgets: Dict[str, float] = field(default_factory=dict) # e.g., {"time": 10.0, "energy": 100.0}
collision_rules: List[str] = field(default_factory=list)
trust_policy: Dict[str, str] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {
"contract_id": self.contract_id,
"pre_conditions": self.pre_conditions,
"post_conditions": self.post_conditions,
"budgets": self.budgets,
"collision_rules": self.collision_rules,
"trust_policy": self.trust_policy,
}
@staticmethod
def from_dict(data: Dict[str, Any]) -> "SafetyContract":
return SafetyContract(
contract_id=data.get("contract_id", "unnamed-contract"),
pre_conditions=data.get("pre_conditions", []),
post_conditions=data.get("post_conditions", []),
budgets=data.get("budgets", {}),
collision_rules=data.get("collision_rules", []),
trust_policy=data.get("trust_policy", {}),
)
def evaluate_pre(self, state: Dict[str, Any]) -> bool:
# Evaluate all pre-conditions in the given state/context
# Some DSLs reference a "state" object; support that by packaging the current state under 'state'
local_context = {"state": state}
for expr in self.pre_conditions:
if not safe_eval(expr, local_context):
return False
return True
def evaluate_post(self, state: Dict[str, Any]) -> bool:
local_context = {"state": state}
for expr in self.post_conditions:
if not safe_eval(expr, local_context):
return False
return True

View File

@ -0,0 +1,23 @@
import json
class SafetyContract:
def __init__(self, name=None, pre=None, post=None, budgets=None, collision_rules=None, trust_policy=None):
self.name = name
self.pre = pre
self.post = post
self.budgets = budgets or {}
self.collision_rules = collision_rules or []
self.trust_policy = trust_policy or {}
def parse_safety_contract_from_json(text: str) -> SafetyContract:
data = json.loads(text)
return SafetyContract(
name=data.get("name"),
pre=data.get("pre"),
post=data.get("post"),
budgets=data.get("budgets", {}),
collision_rules=data.get("collision_rules", []),
trust_policy=data.get("trust_policy", {}),
)

39
guardrail_space/guard.py Normal file
View File

@ -0,0 +1,39 @@
from __future__ import annotations
import json
from typing import Dict, Any, Optional
from .contract import SafetyContract
from .policy import PolicyEngine
from .shadow_planner import ShadowPlanner
LOG_FILE = "guard_logs.jsonl"
class GuardModule:
def __init__(self, contract: SafetyContract, shadow: Optional[ShadowPlanner] = None):
self.contract = contract
self.engine = PolicyEngine(contract)
self.shadow = shadow or ShadowPlanner()
def log_decision(self, entry: Dict[str, Any]) -> None:
with open(LOG_FILE, "a", encoding="utf-8") as f:
f.write(json.dumps(entry) + "\n")
def evaluate_plan(self, plan: Dict[str, Any], state: Dict[str, Any]) -> Dict[str, Any]:
# First, run the policy engine
result = self.engine.evaluate(plan, state)
decision = {"allowed": result.get("approved", result.get("allowed", True)) if False else result.get("allowed", True), "reason": result.get("reason", "ok")}
# If allowed, return as-is
if decision["allowed"]:
self.log_decision({"contract_id": self.contract.contract_id, "plan": plan, "state": state, "decision": "allow"})
return {"decision": "allow", "plan": plan}
# If not allowed, attempt a safe modification via shadow planner
safe_plan = self.shadow.propose_safe(plan, state, self.contract)
if safe_plan:
self.log_decision({"contract_id": self.contract.contract_id, "plan": plan, "state": state, "decision": "modify", "new_plan": safe_plan})
return {"decision": "modify", "new_plan": safe_plan, "reason": result.get("reason", "veto_by_guard")}
# If no safe alternative, veto with reason
self.log_decision({"contract_id": self.contract.contract_id, "plan": plan, "state": state, "decision": "veto"})
return {"decision": "veto", "reason": result.get("reason", "guard_veto")}

35
guardrail_space/policy.py Normal file
View File

@ -0,0 +1,35 @@
from __future__ import annotations
from typing import Dict, Any, Optional
from .contract import SafetyContract, safe_eval
class PolicyEngine:
"""Minimal runtime policy engine for on-board guards.
Given a SafetyContract, a proposed plan, and the current state, decide whether to allow,
veto, or modify the plan. This is intentionally small for MVP while being deterministic.
"""
def __init__(self, contract: SafetyContract):
self.contract = contract
def evaluate(self, plan: Dict[str, Any], state: Dict[str, Any]) -> Dict[str, Any]:
# 1) Pre-conditions must hold
pre_ok = self.contract.evaluate_pre(state | {"action": plan.get("action")})
if not pre_ok:
return {"allowed": False, "reason": "pre_condition_failed"}
# 2) Budget checks
costs: Dict[str, float] = plan.get("costs", {})
for key, limit in (self.contract.budgets or {}).items():
if key in costs and costs[key] > limit:
return {"allowed": False, "reason": f"budget_exceeded: {key}"}
# 3) Collision/warning rules
for expr in self.contract.collision_rules:
if expr:
if safe_eval(expr, {**state, **plan}):
return {"allowed": False, "reason": "collision_rule_violation"}
return {"allowed": True}

View File

@ -0,0 +1,31 @@
from __future__ import annotations
from typing import Dict, Any, Optional
from .contract import SafetyContract
class ShadowPlanner:
"""Toy shadow planner for MVP: suggests a safer delta to the proposed plan.
In a real system this would run a separate optimizer. Here we implement a simple heuristic:
if the plan has a 'speed' parameter, cap it to a safe maximum; otherwise, return a reduced-cost plan.
"""
SAFE_SPEED_MAX = 0.5
def propose_safe(self, plan: Dict[str, Any], state: Dict[str, Any], contract: SafetyContract) -> Optional[Dict[str, Any]]:
new_plan = plan.copy()
costs = new_plan.get("costs", {})
action = new_plan.get("action")
# Simple heuristic: if speed present, cap it; otherwise, throttle time/cost if possible
if "speed" in new_plan:
if new_plan["speed"] > self.SAFE_SPEED_MAX:
new_plan["speed"] = self.SAFE_SPEED_MAX
new_plan["costs"] = {**costs, "time": max(costs.get("time", 0), 0.1)}
return new_plan
# If no speed but time can be reduced, reduce time fractionally
if costs and costs.get("time", 0) > 0:
new_plan_costs = dict(costs)
new_plan_costs["time"] = max(costs["time"] * 0.8, 0.01)
new_plan["costs"] = new_plan_costs
return new_plan
return None

View File

@ -1,13 +1,16 @@
[build-system]
requires = ["setuptools>=40.8.0", "wheel"]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"
[tool.poetry] # optional, kept for readability if Poetry is used later
name = "guardrail_space"
version = "0.1.0"
[project]
name = "guardrail_space"
version = "0.1.0"
description = "Verifiable safety contracts and offline-first guard for onboard AGI space robotics"
description = "MVP: verifiable safety contracts and offline safety monitor for onboard AGI-driven space robotics"
readme = "README.md"
requires-python = ">=3.9"
[tool.setuptools.packages.find]
where = ["src"]
license = {text = "MIT"}
requires-python = ">=3.8"
dependencies = []

View File

@ -1,11 +1 @@
from .contracts import SafetyContract, parse_safety_contract_from_json, contract_to_minimal_dict
from .policy_engine import PolicyEngine
from .guard_module import GuardModule
__all__ = [
"SafetyContract",
"parse_safety_contract_from_json",
"contract_to_minimal_dict",
"PolicyEngine",
"GuardModule",
]
"""Namespace for on-src guardrail_space package (compatibility layer for tests)."""

View File

@ -1,36 +1,23 @@
import json
from dataclasses import dataclass
from typing import List, Dict, Any, Optional
@dataclass
class SafetyContract:
name: str
pre: Optional[str]
post: Optional[str]
budgets: Dict[str, float]
collision_rules: List[Dict[str, Any]]
trust_policy: Dict[str, Any]
def __init__(self, name=None, pre=None, post=None, budgets=None, collision_rules=None, trust_policy=None):
self.name = name
self.pre = pre
self.post = post
self.budgets = budgets or {}
self.collision_rules = collision_rules or []
self.trust_policy = trust_policy or {}
def parse_safety_contract_from_json(text: str) -> SafetyContract:
data = json.loads(text)
return SafetyContract(
name=data.get("name", "contract"),
name=data.get("name"),
pre=data.get("pre"),
post=data.get("post"),
budgets=data.get("budgets", {}),
collision_rules=data.get("collision_rules", []),
trust_policy=data.get("trust_policy", {}),
)
def contract_to_minimal_dict(contract: SafetyContract) -> Dict[str, Any]:
return {
"name": contract.name,
"pre": contract.pre,
"post": contract.post,
"budgets": contract.budgets,
"collision_rules": contract.collision_rules,
"trust_policy": contract.trust_policy,
}

View File

@ -1,7 +1,5 @@
#!/usr/bin/env bash
set -euo pipefail
echo "Running tests..."
pytest -q
echo "Building package..."
python -m build
echo "All tests and build succeeded."
echo "Running unit tests for GuardRail.Space MVP..."
python3 -m unittest discover -s tests -p 'test_*.py'
echo "All tests passed."

15
tests/test_contract.py Normal file
View File

@ -0,0 +1,15 @@
import unittest
from guardrail_space.contract import SafetyContract
class TestSafetyContract(unittest.TestCase):
def test_pre_post_eval(self):
c = SafetyContract(
contract_id="test-001",
pre_conditions=["state['speed'] <= 1.0"],
post_conditions=["state['completed'] == True"],
budgets={"time": 10.0, "energy": 100.0},
collision_rules=["state['distance_to_obstacle'] >= 0"],
)
self.assertTrue(c.evaluate_pre({"speed": 0.5, "distance_to_obstacle": 5}))
self.assertFalse(c.evaluate_pre({"speed": 2.0, "distance_to_obstacle": 5}))

15
tests/test_guard.py Normal file
View File

@ -0,0 +1,15 @@
import unittest
from guardrail_space.contract import SafetyContract
from guardrail_space.guard import GuardModule
from guardrail_space.shadow_planner import ShadowPlanner
from guardrail_space.policy import PolicyEngine
class TestGuardModule(unittest.TestCase):
def test_guard_allows_and_vetos(self):
c = SafetyContract(contract_id="guard-001", pre_conditions=["state['speed'] <= 1.0"], budgets={"time": 10.0}, collision_rules=["state['distance_to_obstacle'] >= 0"], post_conditions=[])
g = GuardModule(c, ShadowPlanner())
state = {"speed": 0.8, "distance_to_obstacle": 5}
plan = {"action": "move", "costs": {"time": 2.0}, "speed": 0.8}
res = g.evaluate_plan(plan, state)
self.assertIn(res["decision"], ["allow", "modify", "veto"])

18
tests/test_policy.py Normal file
View File

@ -0,0 +1,18 @@
import unittest
from guardrail_space.contract import SafetyContract
from guardrail_space.policy import PolicyEngine
class TestPolicyEngine(unittest.TestCase):
def test_budget_and_pref(self):
c = SafetyContract(
contract_id="policy-001",
pre_conditions=["state['speed'] <= 2.0"],
budgets={"time": 5.0, "energy": 50.0},
collision_rules=["state['distance_to_obstacle'] >= 1.0"],
)
engine = PolicyEngine(c)
plan = {"action": "move", "costs": {"time": 3.0, "energy": 20.0}, "speed": 1.0}
self.assertTrue(engine.evaluate(plan, {"speed": 1.0, "distance_to_obstacle": 2}).get("allowed"))
plan2 = {"action": "move", "costs": {"time": 6.0, "energy": 20.0}, "speed": 1.0}
self.assertFalse(engine.evaluate(plan2, {"speed": 1.0, "distance_to_obstacle": 2}).get("allowed"))