edge-latency-aware-cross-ve.../elac_plan/api.py

160 lines
4.6 KiB
Python

from __future__ import annotations
"""ELAC-Plan API
Simple FastAPI-based MVP to exercise LocalProblem -> PlanDelta flow with two starter adapters.
This is intentionally lightweight and in-process to bootstrap cross-venue planning without
external dependencies beyond FastAPI, Pydantic (via existing dataclasses), and httpx for tests.
"""
from datetime import datetime
import json
from typing import Dict, Any, Optional
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from .core import LocalProblem, PlanDelta, SharedVariables, DualVariables
from .solver import LocalSolver
from .adapters import NBBOFeedAdapter, BrokerGatewayAdapter
from .dsl import (
DSLObject,
DSLMorphisms,
DSLDualVariables,
DSLPlanDelta,
to_dsl_object,
to_dsl_morphisms,
to_dsl_dual,
to_dsl_plan,
)
app = FastAPI(title="ELAC-Plan MVP API", version="0.1.0")
# In-memory stores for MVP: problems and deltas
_problems: Dict[str, LocalProblem] = {}
_deltas: Dict[str, PlanDelta] = {}
# Starter adapters (could be used by endpoints or in a factory pattern)
_nbbo = NBBOFeedAdapter(contract_id="default-contract")
_broker = BrokerGatewayAdapter()
_solver = LocalSolver()
class LocalProblemInput(BaseModel):
id: str
asset: str
venue: str
objective: str
constraints: Optional[Dict[str, Any]] = None
price_target: float
tolerance: float
@app.post("/problems", response_model=Dict[str, Any])
def create_problem(problem: LocalProblemInput):
# Build a LocalProblem from input and run the toy solver to produce a delta
lp = LocalProblem(
id=problem.id,
asset=problem.asset,
venue=problem.venue,
objective=problem.objective,
constraints=problem.constraints or {},
price_target=problem.price_target,
tolerance=problem.tolerance,
)
_problems[lp.id] = lp
delta = _solver.solve(lp)
_deltas[delta.contract_id] = delta
# Simulate publishing via broker gateway (stringized payload) for MVP
_broker.publish(delta)
return {
"problem_id": lp.id,
"delta": json.dumps(delta.delta),
"delta_contract": delta.contract_id,
}
@app.get("/status")
def status():
return {
"problems_count": len(_problems),
"deltas_count": len(_deltas),
"latest_delta_contract": max(_deltas.keys()) if _deltas else None,
"server_time": datetime.utcnow().isoformat() + "Z",
}
@app.get("/problems/{problem_id}")
def get_problem(problem_id: str):
lp = _problems.get(problem_id)
if not lp:
raise HTTPException(status_code=404, detail="problem not found")
return {
"id": lp.id,
"asset": lp.asset,
"venue": lp.venue,
"objective": lp.objective,
"price_target": lp.price_target,
"tolerance": lp.tolerance,
"constraints": lp.constraints,
}
@app.get("/deltas/{contract_id}")
def get_delta(contract_id: str):
delta = _deltas.get(contract_id)
if not delta:
raise HTTPException(status_code=404, detail="delta not found")
return {
"contract_id": delta.contract_id,
"delta": delta.delta,
"timestamp": delta.timestamp,
"author": delta.author,
"privacy_budget": delta.privacy_budget,
}
@app.get("/dsl/{problem_id}")
def get_problem_dsl(problem_id: str):
# Expose a minimal DSL representation for a given problem, including
# LocalProblem -> DSLObject, SharedVariables, DualVariables, and PlanDelta.
lp = _problems.get(problem_id)
if not lp:
raise HTTPException(status_code=404, detail="problem not found")
# Build a deterministic contract_id as used by the MVP
contract_id = f"{lp.id}:{lp.venue}"
# Rehydrate a minimal SharedVariables and DualVariables for DSL exposure
sw = SharedVariables(variables={}, version=1, contract_id=contract_id)
dv = DualVariables(shadow_prices={}, version=1, contract_id=contract_id)
# Resolve latest delta for the problem if available
delta = _deltas.get(contract_id)
if delta is None:
# create a minimal placeholder delta to showcase DSL shape
delta = PlanDelta(
delta={},
timestamp="",
author="unknown",
contract_id=contract_id,
privacy_budget=0.0,
)
# Transform to DSL structures
dsl_lp = to_dsl_object(lp)
dsl_sw = to_dsl_morphisms(sw)
dsl_dv = to_dsl_dual(dv)
dsl_plan = to_dsl_plan(delta)
return {
"lp": dsl_lp.to_dict(),
"shared_variables": dsl_sw.to_dict(),
"dual_variables": dsl_dv.to_dict(),
"plan_delta": dsl_plan.to_dict(),
}