sunhub-open-city-scale-sola.../tests/test_sunhub.py

46 lines
1.5 KiB
Python

import sys, os
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "src")))
import pytest
from sunhub import SunHub
from sunhub import Object, SharedSignal, PlanDelta
def test_object_creation_and_storage():
hub = SunHub()
o = hub.add_object("Downtown", {"pv_capacity_kw": 120})
assert o.id in hub.objects
assert hub.objects[o.id].neighborhood == "Downtown"
def test_signal_creation():
hub = SunHub()
s = hub.add_signal("irradiance_forecast", {"hour": 12, "value": 800})
assert s.id in hub.shared_signals
assert hub.shared_signals[s.id].signal_type == "irradiance_forecast"
def test_plan_delta_application():
hub = SunHub()
state = {"solar_production_kw": 0}
delta = hub.add_plan_delta("Downtown", [{"key": "solar_production_kw", "value": 45}])
new_state = delta.apply(state)
assert new_state["solar_production_kw"] == 45
def test_offline_delta_sync_roundtrip():
hub = SunHub()
# offline delta updates
hub.add_plan_delta("Downtown", [{"key": "solar_production_kw", "value": 30}])
hub.add_plan_delta("Downtown", [{"key": "storage_state", "value": "charging"}])
final = hub.apply_all({})
assert final["solar_production_kw"] == 30
assert final["storage_state"] == "charging"
def test_contract_registry():
hub = SunHub()
hub.register_adapter("gis", {"endpoint": "http://localhost/gis"})
reg = hub.contracts.get("gis")
assert reg["endpoint"] == "http://localhost/gis"