idea131-fleetopt-verifiable.../core/registry.py

24 lines
750 B
Python

from __future__ import annotations
import time
from typing import Dict, Optional
from .models import SharedSignals
class GraphOfContracts:
"""In-memory contract registry for exchanged signals between fleets."""
def __init__(self) -> None:
self._contracts: Dict[str, SharedSignals] = {}
self._translations: Dict[str, float] = {}
def register_signal(self, contract_id: str, signals: SharedSignals) -> None:
self._contracts[contract_id] = signals
self._translations[contract_id] = time.time()
def get_signal(self, contract_id: str) -> Optional[SharedSignals]:
return self._contracts.get(contract_id)
def list_contracts(self) -> Dict[str, float]:
return dict(self._translations)