diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..bd5590b --- /dev/null +++ b/.gitignore @@ -0,0 +1,21 @@ +node_modules/ +.npmrc +.env +.env.* +__tests__/ +coverage/ +.nyc_output/ +dist/ +build/ +.cache/ +*.log +.DS_Store +tmp/ +.tmp/ +__pycache__/ +*.pyc +.venv/ +venv/ +*.egg-info/ +.pytest_cache/ +READY_TO_PUBLISH diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..53a1e71 --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,20 @@ +# GuardRail.Space AGENTS.md + +Architectural guidelines and contribution rules for GuardRail.Space MVP. + +- Architecture overview +- Tech stack and interfaces +- Testing commands and CI hints +- Contribution and code style rules +- MVP scope for safety contracts, policy engine, guard module, and adapters + +The goal is to provide a production-ready scaffold that can be progressively extended: +- DSL for SafetyContracts (JSON-like for MVP) +- Runtime policy engine to veto or adjust actions +- Shadow planner for risk-proof alternatives +- Offline-first logging and reconciliation +- CatOpt bridge adapters for cross-domain interoperability +- HIL-ready integration hooks (Gazebo/ROS) +- Governance, audit trails, and privacy controls + +If you add features, update this document to reflect schema changes and testing commands. diff --git a/README.md b/README.md index c595973..d5e17c2 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,10 @@ -# guardrail-space-verifiable-safety-contra +# GuardRail.Space -A lightweight, open-source safety framework to govern onboard AGI planners for space robotics (rovers, habitats, small satellites). It introduces a contract-based safety layer with (1) Safety Contracts defining pre/post conditions, budgets, and colli \ No newline at end of file +A lightweight, open-source safety framework to govern onboard AGI planners for space robotics. MVP includes: +- JSON-like SafetyContract DSL +- Runtime policy engine to veto or adjust actions +- Guard module with deterministic fallback and shadow-planner hook +- CatOpt bridge adapters skeleton for cross-domain interoperability +- Offline-first design concepts and governance groundwork + +This repository provides a minimal, production-ready scaffold to bootstrap a full MVP. The goal is to enable cross-domain experimentation while maintaining safety and auditability. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..b19d9bd --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,13 @@ +[build-system] +requires = ["setuptools>=40.8.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "guardrail_space" +version = "0.1.0" +description = "Verifiable safety contracts and offline-first guard for onboard AGI space robotics" +readme = "README.md" +requires-python = ">=3.9" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..b94f0af --- /dev/null +++ b/setup.py @@ -0,0 +1,9 @@ +from setuptools import setup, find_packages + +setup( + name="guardrail_space", + version="0.1.0", + packages=find_packages(where="src"), + package_dir={"": "src"}, + description="Verifiable safety contracts and offline-first guard for onboard AGI space robotics", +) diff --git a/src/guardrail_space/__init__.py b/src/guardrail_space/__init__.py new file mode 100644 index 0000000..a683100 --- /dev/null +++ b/src/guardrail_space/__init__.py @@ -0,0 +1,11 @@ +from .contracts import SafetyContract, parse_safety_contract_from_json, contract_to_minimal_dict +from .policy_engine import PolicyEngine +from .guard_module import GuardModule + +__all__ = [ + "SafetyContract", + "parse_safety_contract_from_json", + "contract_to_minimal_dict", + "PolicyEngine", + "GuardModule", +] diff --git a/src/guardrail_space/adapters.py b/src/guardrail_space/adapters.py new file mode 100644 index 0000000..8039fe8 --- /dev/null +++ b/src/guardrail_space/adapters.py @@ -0,0 +1,17 @@ +from typing import Dict, Any + +def catopt_to_canonic(obj: Dict[str, Any]) -> Dict[str, Any]: + return { + "Object": obj.get("subject", {}), + "Morphism": obj.get("signal", {}), + "Functor": obj.get("adapter", {}), + "Meta": obj.get("meta", {}), + } + +def canonic_to_catopt(obj: Dict[str, Any]) -> Dict[str, Any]: + return { + "subject": obj.get("Object", {}), + "signal": obj.get("Morphism", {}), + "adapter": obj.get("Functor", {}), + "meta": obj.get("Meta", {}), + } diff --git a/src/guardrail_space/contracts.py b/src/guardrail_space/contracts.py new file mode 100644 index 0000000..46f66e5 --- /dev/null +++ b/src/guardrail_space/contracts.py @@ -0,0 +1,36 @@ +import json +from dataclasses import dataclass +from typing import List, Dict, Any, Optional + + +@dataclass +class SafetyContract: + name: str + pre: Optional[str] + post: Optional[str] + budgets: Dict[str, float] + collision_rules: List[Dict[str, Any]] + trust_policy: Dict[str, Any] + + +def parse_safety_contract_from_json(text: str) -> SafetyContract: + data = json.loads(text) + return SafetyContract( + name=data.get("name", "contract"), + pre=data.get("pre"), + post=data.get("post"), + budgets=data.get("budgets", {}), + collision_rules=data.get("collision_rules", []), + trust_policy=data.get("trust_policy", {}), + ) + + +def contract_to_minimal_dict(contract: SafetyContract) -> Dict[str, Any]: + return { + "name": contract.name, + "pre": contract.pre, + "post": contract.post, + "budgets": contract.budgets, + "collision_rules": contract.collision_rules, + "trust_policy": contract.trust_policy, + } diff --git a/src/guardrail_space/guard_module.py b/src/guardrail_space/guard_module.py new file mode 100644 index 0000000..ac3c286 --- /dev/null +++ b/src/guardrail_space/guard_module.py @@ -0,0 +1,40 @@ +import threading +from typing import Dict, Any, Tuple +from .contracts import SafetyContract +from .policy_engine import PolicyEngine + + +class GuardModule: + def __init__(self, contract: SafetyContract): + self.contract = contract + self.engine = PolicyEngine(contract) + self.lock = threading.Lock() + self.shadow_delta: Dict[str, Any] = {} + + def decide(self, action: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]: + with self.lock: + pre_ok, _ = self.engine.evaluate_pre(action, context) + if not pre_ok: + return False, {"reason": "precondition-failed", "fallback": self._fallback(action, context)} + remaining = self.engine.remaining_budgets(context) + cost = action.get("cost", {}) + for k, v in cost.items(): + rem = remaining.get(k, 0) + if v > rem: + return False, {"reason": f"budget-{k}-exceeded", "fallback": self._fallback(action, context)} + post_ok, _ = self.engine.evaluate_post(action, context) + if not post_ok: + return False, {"reason": "postcondition-failed", "fallback": self._fallback(action, context)} + return True, action + + def _fallback(self, action: Dict[str, Any], context: Dict[str, Any]) -> Dict[str, Any]: + safe_action = {"name": "halt", "reason": "fallback", "context": context} + thread = threading.Thread(target=self._run_shadow_planner, args=(action, context)) + thread.daemon = True + thread.start() + return safe_action + + def _run_shadow_planner(self, action: Dict[str, Any], context: Dict[str, Any]): + delta = {"adjustment": {"reduce_cost": True}, "base_action": action} + with self.lock: + self.shadow_delta = delta diff --git a/src/guardrail_space/policy_engine.py b/src/guardrail_space/policy_engine.py new file mode 100644 index 0000000..39ebbaf --- /dev/null +++ b/src/guardrail_space/policy_engine.py @@ -0,0 +1,40 @@ +from typing import Dict, Any, Tuple + +def _safe_eval(expr: str, context: Dict[str, Any]) -> bool: + if not expr: + return True + allowed_globals = {"__builtins__": {}} + local = dict(context) + try: + return bool(eval(expr, allowed_globals, local)) + except Exception: + return False + + +class PolicyEngine: + def __init__(self, contract=None): + self.contract = contract + + def set_contract(self, contract) -> None: + self.contract = contract + + def evaluate_pre(self, action: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, str]: + if not self.contract or not self.contract.pre: + return True, "no-precondition" + ok = _safe_eval(self.contract.pre, {**context, "action": action}) + return (bool(ok), "precondition" if ok else "precondition-failed") + + def evaluate_post(self, action: Dict[str, Any], context: Dict[str, Any]) -> Tuple[bool, str]: + if not self.contract or not self.contract.post: + return True, "no-postcondition" + ok = _safe_eval(self.contract.post, {**context, "action": action}) + return (bool(ok), "postcondition" if ok else "postcondition-failed") + + def remaining_budgets(self, context: Dict[str, Any]) -> Dict[str, float]: + budgets = dict(self.contract.budgets) if self.contract and self.contract.budgets else {} + spent = context.get("spent", {"time": 0, "energy": 0, "compute": 0}) + remaining = {} + for k, v in budgets.items(): + spent_k = spent.get(k, 0) + remaining[k] = max(0.0, v - spent_k) + return remaining diff --git a/test.sh b/test.sh new file mode 100644 index 0000000..c07d618 --- /dev/null +++ b/test.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail +echo "Running tests..." +pytest -q +echo "Building package..." +python -m build +echo "All tests and build succeeded." diff --git a/tests/test_contract_parsing.py b/tests/test_contract_parsing.py new file mode 100644 index 0000000..3f9fc9a --- /dev/null +++ b/tests/test_contract_parsing.py @@ -0,0 +1,43 @@ +import json +import sys +import os +sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))) +from guardrail_space.contracts import SafetyContract, parse_safety_contract_from_json + + +def test_parse_basic_contract(): + text = json.dumps({ + "name": "Demo", + "pre": "battery > 20", + "post": "after_action", + "budgets": {"time": 10, "energy": 5}, + "collision_rules": [{"type": "avoid", "distance": 1.0}], + "trust_policy": {"overrides": ["operator"]} + }) + c = parse_safety_contract_from_json(text) + assert isinstance(c, SafetyContract) + assert c.name == "Demo" + assert c.pre == "battery > 20" + assert c.budgets["time"] == 10 + assert c.collision_rules[0]["type"] == "avoid" + + +def test_contract_to_dict_roundtrip(): + c = SafetyContract( + name="X", + pre=None, + post=None, + budgets={"time": 1}, + collision_rules=[], + trust_policy={}, + ) + d = { + "name": c.name, + "pre": c.pre, + "post": c.post, + "budgets": c.budgets, + "collision_rules": c.collision_rules, + "trust_policy": c.trust_policy, + } + assert d["name"] == "X" + assert d["budgets"]["time"] == 1