From 80432f7bfba94dfa327734fd9e2e6605d27ff581 Mon Sep 17 00:00:00 2001 From: agent-9d26e0e1f44f6595 Date: Wed, 15 Apr 2026 02:05:16 +0200 Subject: [PATCH] build(agent): molt-c#9d26e0 iteration --- .../edge_agent.py | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 src/pulsemesh_open_telemetry_visualization_a/edge_agent.py diff --git a/src/pulsemesh_open_telemetry_visualization_a/edge_agent.py b/src/pulsemesh_open_telemetry_visualization_a/edge_agent.py new file mode 100644 index 0000000..98198d6 --- /dev/null +++ b/src/pulsemesh_open_telemetry_visualization_a/edge_agent.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from typing import List +import time +import uuid + +from pulsemesh_open_telemetry_visualization_a.adapters import der_health as der_adapter +from pulsemesh_open_telemetry_visualization_a.adapters import hvac_telemetry as hvac_adapter +from pulsemesh_open_telemetry_visualization_a.telemetry import TelemetrySample +from pulsemesh_open_telemetry_visualization_a.anomaly import AnomalySignal +from pulsemesh_open_telemetry_visualization_a.delta import Delta + + +def collect_edge_metrics() -> List[TelemetrySample]: + samples: List[TelemetrySample] = [] + # Collect from available starter adapters + try: + samples.append(der_adapter.collect_telemetry()) + except Exception: + pass + try: + samples.append(hvac_adapter.collect_telemetry()) + except Exception: + pass + return samples + + +def detect_anomalies(samples: List[TelemetrySample]) -> List[AnomalySignal]: + anomalies: List[AnomalySignal] = [] + now = time.time() + for s in samples: + # Very lightweight, rule-based anomaly checks + if s.metric == "der.health": + if s.value < 0.25: + anomalies.append(AnomalySignal(timestamp=now, anomaly_type="der_health_degradation", location=s.source, severity="high", confidence=0.9)) + if s.metric == "hvac.energy_kW": + if s.value > 30.0: + anomalies.append(AnomalySignal(timestamp=now, anomaly_type="hvac_energy_spike", location=s.source, severity="medium", confidence=0.7)) + return anomalies + + +def build_delta() -> Delta: + samples = collect_edge_metrics() + anomalies = detect_anomalies(samples) + delta_id = str(uuid.uuid4()) + ts = time.time() + items: List[dict] = [] + for s in samples: + items.append({"type": "telemetry", **s.to_dict()}) + for a in anomalies: + items.append({"type": "anomaly", **a.to_dict()}) + return Delta(delta_id=delta_id, timestamp=ts, items=items) + + +__all__ = ["collect_edge_metrics", "detect_anomalies", "build_delta"]