mercurymesh-privacy-preserv.../mercurymesh_privacy_preserv.../server.py

26 lines
685 B
Python

from __future__ import annotations
from fastapi import FastAPI
from pydantic import BaseModel
from typing import List, Dict
from .contracts import Signal
from .analytics import Aggregator
app = FastAPI(title="MercuryMesh MVP API")
class SignalDTO(BaseModel):
venue: str
timestamp: int
metrics: Dict[str, float]
version: int = 1
@app.post("/signals/aggregate")
def aggregate_signals(signals: List[SignalDTO]):
# Convert to internal Signal objects for aggregation
objs = [Signal(venue=s.venue, timestamp=s.timestamp, metrics=s.metrics, version=s.version) for s in signals]
result = Aggregator.aggregate_signals(objs)
return {"aggregation": result}