pulsemesh-open-telemetry-vi.../tests/test_delta.py

29 lines
1.0 KiB
Python

import sys
import os
# Ensure package source is importable during tests
SRC_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src"))
if SRC_DIR not in sys.path:
sys.path.insert(0, SRC_DIR)
from pulsemesh_open_telemetry_visualization_a.delta import Delta, DeltaSync
from pulsemesh_open_telemetry_visualization_a.telemetry import TelemetrySample
def test_delta_serialization_and_sync():
t1 = TelemetrySample(timestamp=1.0, source="a", metric="m1", value=1.0, units="u")
t2 = TelemetrySample(timestamp=2.0, source="a", metric="m2", value=2.0, units="u")
d1 = Delta(delta_id="d1", timestamp=1.0, items=[t1.to_dict()])
d2 = Delta(delta_id="d2", timestamp=2.0, items=[t2.to_dict()])
ds = DeltaSync()
ds.add_delta(d1)
ds.add_delta(d2)
payload = ds.to_payload()
assert isinstance(payload, dict)
assert len(payload["deltas"]) == 2
# simulate remote payload
remote = {"deltas": [d1.to_dict() for d in [d1]]}
merged = ds.merge_with_remote(remote)
assert len(merged) >= 1