feedtrust-blockchain-backed.../feedtrust/aggregation.py

21 lines
649 B
Python

from typing import Dict, Any, List
class Aggregator:
def __init__(self):
self._latest: Dict[str, float] = {}
self._venues: List[str] = []
def add_signal(self, venue: str, signal: str, value: float) -> None:
key = f"{venue}:{signal}"
self._latest[key] = value
if venue not in self._venues:
self._venues.append(venue)
def aggregate_price(self) -> float:
# Simple average across known venues for signal 'price'
values = [v for k, v in self._latest.items() if k.endswith(":price")]
if not values:
return 0.0
return sum(values) / len(values)