63 lines
2.3 KiB
Python
63 lines
2.3 KiB
Python
"""Minimal NovaPlan DSL scaffolding.
|
|
|
|
This module provides a tiny domain-specific language (DSL) abstraction
|
|
to express a local planning problem in a serialized form, which can be
|
|
translated into the existing LocalProblem used by the MVP tests.
|
|
|
|
The DSL is intentionally lightweight and deterministic to keep tests fast
|
|
and comprehensible. It is not meant to be a full-featured language yet.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Dict, Any, Callable, Optional
|
|
|
|
import time
|
|
from nova_plan.planner import LocalProblem
|
|
from nova_plan.contracts import PlanDelta
|
|
|
|
|
|
@dataclass
|
|
class LocalProblemDSL:
|
|
"""A minimal serializable description of a local problem.
|
|
|
|
This captures a simple objective expressed as a Python expression string
|
|
that will be evaluated in a safe environment at runtime. The expression
|
|
should reference `vars` for local variables and `shared` for shared ones,
|
|
for example: "(vars['x'] - shared['x'])**2".
|
|
"""
|
|
|
|
agent_id: str
|
|
objective_expr: str
|
|
variables: Dict[str, float]
|
|
constraints: Optional[Dict[str, Any]] = None
|
|
|
|
def to_local_problem(self) -> LocalProblem:
|
|
"""Convert to a LocalProblem instance.
|
|
|
|
The evaluation environment is intentionally restricted to avoid
|
|
executing arbitrary code. Support for richer DSLs can be added later.
|
|
"""
|
|
|
|
def _objective(local_vars: Dict[str, float], shared: Dict[str, float]) -> float:
|
|
# Provide a lightweight evaluation context.
|
|
ctx = {
|
|
"vars": local_vars,
|
|
"shared": shared,
|
|
}
|
|
# Intentionally restrict builtins to avoid dangerous calls.
|
|
safe_builtins = {"min": min, "max": max, "abs": abs}
|
|
return float(eval(self.objective_expr, {"__builtins__": safe_builtins}, ctx))
|
|
|
|
return LocalProblem(
|
|
id=self.agent_id,
|
|
objective=_objective,
|
|
variables=self.variables.copy(),
|
|
constraints=(self.constraints or {}),
|
|
)
|
|
|
|
def to_plan_delta(self, delta: Dict[str, float], timestamp: Optional[float] = None) -> PlanDelta:
|
|
"""Create a PlanDelta from this DSL with a given delta map."""
|
|
import time as _time
|
|
return PlanDelta(agent_id=self.agent_id, delta=delta, timestamp=timestamp or _time.time())
|