build(agent): molt-z#db0ec5 iteration

This commit is contained in:
agent-db0ec53c058f1326 2026-04-17 09:27:59 +02:00
parent ee2260fd54
commit 669e3092e4
3 changed files with 91 additions and 0 deletions

View File

@ -14,6 +14,11 @@ See the AGENTS.md for architecture details and testing commands.
Usage: see tests under `tests/` for examples and run `bash test.sh` to verify CI-like flows locally.
Engine: ADMM-lite Fleet Allocation
- A minimal, production-facing Python module `engine_admm.py` exposes admm_solve(agents, tasks) to compute a fleet-wide allocation.
- It demonstrates a compositional optimization approach where local agent costs influence task assignment.
- The tests under `tests/` validate basic behavior and can be extended for more elaborate scenarios.
Licensing: MIT (placeholder; update as needed)
"""

View File

@ -0,0 +1,65 @@
"""ADMM-lite inspired fleet allocation engine (MVP).
This module provides a tiny, self-contained allocation routine
that mimics a distributed optimization step: each agent contributes
local cost signals (via the provided Agent.cost_model) and a simple
global objective (maximize welfare by assigning higher-value tasks to
cheaper agents). The implementation is intentionally compact for MVP
visibility and testability.
"""
from typing import List, Dict
from .dsl import Agent, Task
def _score_task_for_agent(task: Task, agent: Agent) -> float:
"""Compute a naive score of assigning a task to an agent.
Higher score means more favorable allocation for this task.
Score = task.value - (duration * agent_cost + energy * agent_cost)
where agent_cost is taken from agent.cost_model if present.
This is a placeholder for a more sophisticated local objective.
"""
# Extract simple cost factors with safe defaults
duration = float(task.requirements.get("duration", 1))
energy = float(task.requirements.get("energy", 1))
# Cost weight per unit feature; default to 1.0 if not provided
weight = float(agent.cost_model.get("weight", 1.0))
# A more nuanced cost could combine multiple features
cost = weight * (duration + energy)
return float(task.value) - cost
def admm_solve(agents: List[Agent], tasks: List[Task], max_iters: int = 3) -> Dict[str, str]:
"""A lightweight, single-shot allocation routine.
Args:
agents: List of participating agents with their local cost models.
tasks: Tasks to allocate among the agents.
max_iters: Number of refinement iterations (not used heavily in this MVP).
Returns:
A mapping from task_id to agent_id representing the allocation.
"""
if not agents or not tasks:
return {}
# Start with a naive one-pass greedy assignment based on scores.
allocation: Dict[str, str] = {}
# Track best agent per task
for task in tasks:
best_agent_id = None
best_score = float('-inf')
for agent in agents:
score = _score_task_for_agent(task, agent)
if score > best_score:
best_score = score
best_agent_id = agent.id
if best_agent_id is not None:
allocation[task.id] = best_agent_id
# Simple iteration allowance: in a real ADMM, we'd exchange dual variables here.
# For MVP, we return the one-shot allocation.
return allocation

21
tests/test_engine_admm.py Normal file
View File

@ -0,0 +1,21 @@
import pytest
from algebraic_auction_studio_for_robotic_fle.dsl import Agent, Task
from algebraic_auction_studio_for_robotic_fle.engine_admm import admm_solve
def test_admm_solve_basic_allocation():
# Two agents with differing costs
a1 = Agent(id="robot_A", capabilities=["lift"], cost_model={"weight": 1.0})
a2 = Agent(id="robot_B", capabilities=["navigate"], cost_model={"weight": 0.8})
# Two simple tasks with values and requirements
t1 = Task(id="task_1", requirements={"duration": 3, "energy": 2}, deadline=100.0, value=10.0)
t2 = Task(id="task_2", requirements={"duration": 4, "energy": 3}, deadline=120.0, value=12.0)
alloc = admm_solve([a1, a2], [t1, t2])
# Expect an allocation for both tasks to some agent
assert set(alloc.keys()) == {t1.id, t2.id}
assert alloc[t1.id] in {a1.id, a2.id}
assert alloc[t2.id] in {a1.id, a2.id}