26 lines
685 B
Python
26 lines
685 B
Python
from __future__ import annotations
|
|
|
|
from fastapi import FastAPI
|
|
from pydantic import BaseModel
|
|
from typing import List, Dict
|
|
|
|
from .contracts import Signal
|
|
from .analytics import Aggregator
|
|
|
|
app = FastAPI(title="MercuryMesh MVP API")
|
|
|
|
|
|
class SignalDTO(BaseModel):
|
|
venue: str
|
|
timestamp: int
|
|
metrics: Dict[str, float]
|
|
version: int = 1
|
|
|
|
|
|
@app.post("/signals/aggregate")
|
|
def aggregate_signals(signals: List[SignalDTO]):
|
|
# Convert to internal Signal objects for aggregation
|
|
objs = [Signal(venue=s.venue, timestamp=s.timestamp, metrics=s.metrics, version=s.version) for s in signals]
|
|
result = Aggregator.aggregate_signals(objs)
|
|
return {"aggregation": result}
|