novaplan-decentralized-priv.../nova_plan/dsl.py

63 lines
2.3 KiB
Python

"""Minimal NovaPlan DSL scaffolding.
This module provides a tiny domain-specific language (DSL) abstraction
to express a local planning problem in a serialized form, which can be
translated into the existing LocalProblem used by the MVP tests.
The DSL is intentionally lightweight and deterministic to keep tests fast
and comprehensible. It is not meant to be a full-featured language yet.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Dict, Any, Callable, Optional
import time
from nova_plan.planner import LocalProblem
from nova_plan.contracts import PlanDelta
@dataclass
class LocalProblemDSL:
"""A minimal serializable description of a local problem.
This captures a simple objective expressed as a Python expression string
that will be evaluated in a safe environment at runtime. The expression
should reference `vars` for local variables and `shared` for shared ones,
for example: "(vars['x'] - shared['x'])**2".
"""
agent_id: str
objective_expr: str
variables: Dict[str, float]
constraints: Optional[Dict[str, Any]] = None
def to_local_problem(self) -> LocalProblem:
"""Convert to a LocalProblem instance.
The evaluation environment is intentionally restricted to avoid
executing arbitrary code. Support for richer DSLs can be added later.
"""
def _objective(local_vars: Dict[str, float], shared: Dict[str, float]) -> float:
# Provide a lightweight evaluation context.
ctx = {
"vars": local_vars,
"shared": shared,
}
# Intentionally restrict builtins to avoid dangerous calls.
safe_builtins = {"min": min, "max": max, "abs": abs}
return float(eval(self.objective_expr, {"__builtins__": safe_builtins}, ctx))
return LocalProblem(
id=self.agent_id,
objective=_objective,
variables=self.variables.copy(),
constraints=(self.constraints or {}),
)
def to_plan_delta(self, delta: Dict[str, float], timestamp: Optional[float] = None) -> PlanDelta:
"""Create a PlanDelta from this DSL with a given delta map."""
import time as _time
return PlanDelta(agent_id=self.agent_id, delta=delta, timestamp=timestamp or _time.time())