energiamesh-federated-contr.../src/energiamesh/core.py

149 lines
5.0 KiB
Python

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional
from datetime import datetime
@dataclass
class LocalProblem:
"""Represents a per-site optimization task.
Extended fields to support richer test scenarios/usage:
- site_id: identifier for the site or device cluster
- problem_id: internal problem identifier
- description: human-readable description
- data: arbitrary input data for the problem
- objective: a human-readable objective name (legacy)
- parameters: arbitrary parameters for the problem (e.g., DER dispatch targets)
- metadata: optional extra data
"""
site_id: str
problem_id: Optional[str] = None
description: Optional[str] = None
data: Dict[str, Any] = field(default_factory=dict)
objective: Optional[str] = None
parameters: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
# Basic sanity; can be extended with more domain rules
if not self.site_id:
raise ValueError("site_id must be provided")
@dataclass
class SharedVariables:
"""Signals shared among participants (e.g., forecasts, priors).
- signals: a dictionary of named signals with values
- timestamp: last update time
- provenance: optional provenance info
"""
signals: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
provenance: Dict[str, Any] = field(default_factory=dict)
version: int = 0
def update(self, name: str, value: Any) -> None:
self.signals[name] = value
self.timestamp = datetime.utcnow()
@dataclass
class DualVariables:
"""Optimization multipliers / prices.
- multipliers: dict of dual variables keyed by constraint name
- last_update: timestamp
"""
multipliers: Dict[str, float] = field(default_factory=dict)
last_update: datetime = field(default_factory=datetime.utcnow)
primal: Dict[str, Any] = field(default_factory=dict)
version: int = 0
def set(self, name: str, value: float) -> None:
self.multipliers[name] = float(value)
self.last_update = datetime.utcnow()
@dataclass
class PlanDelta:
"""Incremental plan change with metadata.
Backwards-compat: tests may construct with delta_id/updates keywords.
- delta: arbitrary payload describing the change
- delta_id: optional identifier for the delta
- updates: optional updates payload
- timestamp: time of the delta generation
- metadata: per-message metadata (e.g., version, source)
- nonce: optional cryptographic nonce for replay protection
- source: optional source identifier for the delta (e.g., adapter name)
"""
delta: Dict[str, Any] = field(default_factory=dict)
delta_id: str | None = None
updates: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = field(default_factory=dict)
# Lightweight anti-replay protections; optional
nonce: str | None = None
# Optional source attribution for auditing and routing
source: str | None = None
@dataclass
class AuditLog:
"""Tamper-evident like log placeholder for governance anchors.
- entries: list of log entries (messages + hash placeholders)
"""
entries: list = field(default_factory=list)
def add(self, entry: str) -> None:
# In a full implementation this would include cryptographic hashes
self.entries.append({"ts": datetime.utcnow().isoformat(), "entry": entry})
def add_entry(self, entry: str) -> None:
# Backwards-compatible alias used by tests
self.add(entry)
def log(self, event: str, payload: object) -> None:
# Simple structured log entry for tests
self.entries.append({"ts": datetime.utcnow().isoformat(), "event": event, "payload": payload})
@dataclass
class SafetyBudget:
"""Simple safety budget controlling device limits per site."""
enabled: bool
max_current_draw_a: float
max_voltage_variation_pu: float
device_limits: Dict[str, float] = field(default_factory=dict)
def update(self, site_id: str, limit: float) -> None:
self.device_limits[site_id] = float(limit)
@dataclass
class PrivacyBudget:
"""Very small privacy budget helper.
- allowed_signals: list of signal names that can be budgeted
- total_budget_units: total budget available for all signals
- per_signal_budget: remaining budget per signal
"""
enabled: bool
allowed_signals: list[str] = field(default_factory=list)
total_budget_units: float = 0.0
per_signal_budget: Dict[str, float] = field(default_factory=dict)
def use(self, signal: str, amount: float) -> None:
if signal not in self.per_signal_budget:
self.per_signal_budget[signal] = self.total_budget_units
self.per_signal_budget[signal] = max(0.0, self.per_signal_budget[signal] - float(amount))