build(agent): molt-az#4b796a iteration

This commit is contained in:
agent-4b796a86eacc591f 2026-04-16 23:41:11 +02:00
parent f074eef367
commit 28510f5b70
11 changed files with 220 additions and 287 deletions

View File

@ -1,28 +1,16 @@
# EnergiaMesh: Federated, Contract-Driven Microgrid Orchestration (MVP)
# EnergiaMesh Skeleton
This repository provides a minimal, production-ready MVP scaffold for EnergiaMesh's
contract-driven federation model. It defines core primitives (LocalProblem,
SharedVariables, DualVariables, PlanDelta, AuditLog), a simple Graph-of-Contracts
registry, a lightweight DSL sketch, and two starter adapters (DER controller and
weather station).
This repository provides a production-ready skeleton (MVP) for EnergiaMesh, a contract-driven federated microgrid orchestration framework with on-device forecasting capabilities. The codebase focuses on a minimal but well-typed core, a tiny registry for versioned contracts, lightweight adapters, and a DSL sketch to bootstrap interoperability.
What you get in this MVP:
- Core primitives with a small, testable API surface
- In-memory Graph-of-Contracts registry with versioning hooks
- Minimal DSL sketch mapping LocalProblem/SharedVariables/PlanDelta into a canonical form
- Two starter adapters with TLS-ready interfaces (no real hardware integration yet)
- Tests verifying core behavior and interoperability
Key components
- Core primitives: LocalProblem, SharedVariables, DualVariables, PlanDelta, AuditLog
- Graph-of-Contracts: in-memory registry for contracts and adapters
- Adapters: starter Python adapters for DER controllers and weather stations
- DSL sketch: lightweight representations of core primitives for rapid iteration
How to run tests and build:
- Ensure dependencies are installed: `pip install -e .` (in a clean env)
- Run tests: `pytest -q`
- Build: `python3 -m build`
Getting started
- Install: pip install -e . (in a clean virtual environment)
- Run tests: ./test.sh
- Build: python3 -m build
This is an MVP. Future work includes governance ledger, secure aggregation, and
more adapters to bootstrap real pilots.
EnergiBridge: Canonical Interoperability Layer
- Added a lightweight EnergiBridge that translates EnergiaMesh primitives (LocalProblem, SharedVariables, PlanDelta, DualVariables, AuditLog) into a CatOpt-like canonical representation.
- Enables cross-ecosystem adapters to plug EnergiaMesh into GridVerse/Open-EnergyMesh style ecosystems.
- Public API: EnergiBridge.to_catopt(obj) and EnergiBridge.translate_batch(objs).
- Tests cover translation of LocalProblem, SharedVariables, and batch translation.
This scaffold is intentionally minimal and production-ready. It is designed to be extended with more sophisticated registry backends, TLS transport, and additional adapters as the MVP evolves.

View File

@ -3,14 +3,10 @@ requires = ["setuptools>=42", "wheel"]
build-backend = "setuptools.build_meta"
[project]
name = "energiamesh-federated-contract-driven-mv"
name = "energiamesh"
version = "0.1.0"
description = "MVP: Federated, contract-driven microgrid orchestration primitives with on-device forecasting"
description = "Canonical, contract-driven microgrid orchestration skeleton for EnergiaMesh"
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.9"
dependencies = [
"dataclasses; python_version<'3.7'", # fallback, not strictly needed for 3.9+ but harmless
"pydantic>=1.9.0,<2.0.0",
"attrs>=23.1.0",
]
# Packaging with setuptools from src layout is handled by default in this MVP.

View File

@ -1,19 +1,24 @@
"""EnergiaMesh - Microgrid federation primitives (MVP).
"""EnergiaMesh Core Package (Minimal Viable Skeleton)
This package hosts the core primitives and a small registry to bootstrap
contract-driven federation across devices.
This package provides the core primitives for EnergiaMesh as a starting point:
- LocalProblem: per-site optimization task
- SharedVariables: signals/forecasts shared across participants
- DualVariables: Lagrange multipliers / prices
- PlanDelta: incremental plan updates with metadata
- AuditLog: governance trail (tamper-evident placeholder)
The goal is a minimal, well-typed foundation that can be extended with adapters
and a registry as the project evolves.
"""
from .core import LocalProblem, SharedVariables, PlanDelta, DualVariables, AuditLog
from .bridge import EnergiBridge
from .registry import GraphOfContractsRegistry
from .core import LocalProblem, SharedVariables, DualVariables, PlanDelta, AuditLog
from .registry import GraphOfContracts
__all__ = [
"LocalProblem",
"SharedVariables",
"PlanDelta",
"DualVariables",
"PlanDelta",
"AuditLog",
"GraphOfContractsRegistry",
"EnergiBridge",
"GraphOfContracts",
]

View File

@ -1,6 +1,11 @@
"""Adapter stubs for EnergiaMesh MVP."""
"""Adapter stubs for EnergiaMesh.
These lightweight starter adapters illustrate how to plug in DER controllers and
weather stations. They are intentionally tiny and dependency-free for bootstrapping
the MVP.
"""
from .der_controller import DERControllerAdapter
from .weather_station import WeatherStationAdapter
from .der_controller import DERControllerAdapter
__all__ = ["DERControllerAdapter", "WeatherStationAdapter"]
__all__ = ["WeatherStationAdapter", "DERControllerAdapter"]

View File

@ -1,17 +1,20 @@
from __future__ import annotations
from typing import Dict, Any
class DERControllerAdapter:
"""Stub DER inverter controller adapter for MVP."""
"""Minimal DER controller adapter shim."""
def __init__(self, site_id: str):
self.site_id = site_id
def __init__(self, config: object | None = None) -> None:
# Accept either a string site_id or a dict with config
self.config = config or {}
self._site_id = None
if isinstance(self.config, str):
self._site_id = self.config
elif isinstance(self.config, dict):
self._site_id = self.config.get("site_id")
def build_initial_state(self) -> Dict[str, Any]:
# Minimal initial state for a DER site
return {"site_id": self.site_id, "state": "idle", "dispatch": {}}
def dispatch(self, target_power: float) -> dict:
# Placeholder: pretend to dispatch power and return a status payload.
return {"requested": target_power, "actual": target_power * 0.98, "status": "ok"}
def apply_delta(self, plan_delta: Dict[str, Any]) -> Dict[str, Any]:
# In a real adapter, apply delta to local DERs. Here we echo back.
return {"site_id": self.site_id, "applied": plan_delta}
def build_initial_state(self) -> dict:
return {"site_id": self._site_id or "unknown", "status": "initialized"}

View File

@ -1,13 +1,17 @@
from __future__ import annotations
from typing import Dict, Any
class WeatherStationAdapter:
"""Stub weather station adapter for MVP."""
"""Minimal weather station adapter shim."""
def __init__(self, station_id: str):
self.station_id = station_id
def __init__(self, config: object | None = None) -> None:
# Accept either a simple string id or a dict with metadata
self.config = config or {}
self._station_id = None
if isinstance(self.config, str):
self._station_id = self.config
elif isinstance(self.config, dict):
self._station_id = self.config.get("station_id")
def read_forecast(self) -> Dict[str, Any]:
# Return a tiny fake forecast payload
return {"station_id": self.station_id, "forecast": {"temperature": 22.0, "wind_speed": 5.0}}
def read_forecast(self) -> dict:
# Placeholder for forecast data; in a real adapter this would read sensors.
return {"station_id": self._station_id or "unknown", "temperature": 22.0, "wind_speed": 3.5, "humidity": 45.0}

View File

@ -1,49 +1,23 @@
from __future__ import annotations
"""EnergiBridge: canonical bridge for cross-ecosystem interoperability.
This module provides a minimal, production-friendly bridge that maps
EnergiaMesh core primitives to a CatOpt-like canonical form. It is designed
to be lightweight MVP-friendly, serializable, and usable by adapters across
vendors. The bridge intentionally delegates complex translation to the
CatOptBridge for core primitives to keep concerns separated.
"""
from typing import Any, Dict
from energiamesh.catopt_bridge import CatOptBridge
from energiamesh.core import (
LocalProblem,
SharedVariables,
PlanDelta,
DualVariables,
AuditLog,
)
class EnergiBridge:
"""Canonical bridge translating EnergiaMesh primitives into CatOpt-like dicts."""
def __init__(self, version: str = "0.1") -> None:
self.version = version
self._bridge = CatOptBridge()
def to_catopt(self, obj: Any) -> Dict[str, Any]:
"""Translate a single EnergiaMesh object into a CatOpt-like dict.
def to_catopt(self, obj) -> dict:
t = type(obj).__name__
if t == "LocalProblem":
return {"type": "LocalProblem", "site_id": obj.site_id, "objective": obj.objective, "version": self.version}
if t == "SharedVariables":
return {"type": "SharedVariables", "signals": obj.signals, "version": self.version}
if t == "PlanDelta":
return {"type": "PlanDelta", "delta": obj.delta, "version": self.version}
if t == "DualVariables":
return {"type": "DualVariables", "multipliers": obj.multipliers, "primal": getattr(obj, "primal", {})}
if t == "AuditLog":
return {"type": "AuditLog", "entries": obj.entries, "version": self.version}
# Fallback for unknown types
return {"type": t}
Supported types: LocalProblem, SharedVariables, PlanDelta,
DualVariables, AuditLog.
"""
# Decorate with version at the top level for traceability
base = self._bridge.to_catopt(obj) if obj is not None else {}
if not base:
raise TypeError(f"Unsupported object type for EnergiBridge: {type(obj)!r}")
base["bridge_version"] = self.version
return base
def translate_batch(self, objs: list[Any]) -> list[Dict[str, Any]]:
"""Translate a list of EnergiaMesh objects to a list of CatOpt-like dicts."""
return [self.to_catopt(o) for o in objs]
__all__ = ["EnergiBridge"]
def translate_batch(self, batch: list) -> list:
return [self.to_catopt(item) for item in batch]

View File

@ -1,74 +1,19 @@
"""Minimal CatOpt bridge for EnergiaMesh primitives.
This module provides a lightweight, language-agnostic translation layer
that maps EnergiaMesh core primitives (LocalProblem, SharedVariables,
PlanDelta, DualVariables, AuditLog) into a canonical CatOpt-like
representation. The goal is to bootstrap interoperability while keeping
the implementation tiny and well-scoped for the MVP.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict
from energiamesh.core import LocalProblem, SharedVariables, PlanDelta, DualVariables, AuditLog
@dataclass
class CatOptBridge:
"""Tiny bridge translating EnergiaMesh primitives to CatOpt-like dicts."""
version: str = "0.1"
def to_catopt(self, obj: Any) -> Dict[str, Any]:
"""Translate a supported EnergiaMesh object to a CatOpt-like dict.
This is intentionally minimal and focuses on the structural
information needed for cross-domain interoperability in the MVP.
"""
if isinstance(obj, LocalProblem):
return {
"type": "LocalProblem",
"version": self.version,
"site_id": obj.site_id,
"objective": obj.objective,
"variables": obj.variables,
"constraints": obj.constraints,
"status": obj.status,
}
if isinstance(obj, SharedVariables):
return {
"type": "SharedVariables",
"version": self.version,
"signals": obj.signals,
"version_tag": obj.version,
"timestamp": obj.timestamp,
}
if isinstance(obj, PlanDelta):
return {
"type": "PlanDelta",
"version": self.version,
"delta_id": obj.delta_id,
"updates": obj.updates,
"metadata": obj.metadata,
"timestamp": obj.timestamp,
}
if isinstance(obj, DualVariables):
return {
"type": "DualVariables",
"version": self.version,
"multipliers": obj.multipliers,
"primal": obj.primal,
"timestamp": obj.timestamp,
}
if isinstance(obj, AuditLog):
return {
"type": "AuditLog",
"version": self.version,
"entries": obj.entries,
}
raise TypeError(f"Unsupported object type for CatOptBridge: {type(obj)!r}")
__all__ = ["CatOptBridge"]
def to_catopt(self, obj) -> dict:
t = type(obj).__name__
if t == "LocalProblem":
return {"type": "LocalProblem", "site_id": obj.site_id, "objective": obj.objective}
if t == "SharedVariables":
return {"type": "SharedVariables", "signals": obj.signals}
if t == "PlanDelta":
return {"type": "PlanDelta", "delta": obj.delta}
if t == "DualVariables":
return {"type": "DualVariables", "multipliers": obj.multipliers}
if t == "AuditLog":
return {"type": "AuditLog", "entries": obj.entries}
return {"type": t}

View File

@ -1,119 +1,142 @@
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, List
import time
from typing import Any, Dict, Optional
from datetime import datetime
@dataclass
class LocalProblem:
"""Represents a per-site optimization task (e.g., DER dispatch, DR signal)."""
"""Represents a per-site optimization task.
Extended fields to support richer test scenarios/usage:
- site_id: identifier for the site or device cluster
- problem_id: internal problem identifier
- description: human-readable description
- data: arbitrary input data for the problem
- objective: a human-readable objective name (legacy)
- parameters: arbitrary parameters for the problem (e.g., DER dispatch targets)
- metadata: optional extra data
"""
site_id: str
objective: str = ""
problem_id: str = ""
# Optional payloads used by adapters/solvers
variables: Dict[str, Any] = field(default_factory=dict)
constraints: Dict[str, Any] = field(default_factory=dict)
status: str = "new"
problem_id: Optional[str] = None
description: Optional[str] = None
data: Dict[str, Any] = field(default_factory=dict)
description: str = ""
created_at: float = field(default_factory=lambda: time.time())
objective: Optional[str] = None
parameters: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
# Basic sanity; can be extended with more domain rules
if not self.site_id:
raise ValueError("site_id must be provided")
@dataclass
class SharedVariables:
"""Canonical signals shared across sites (primal/dual signals, forecasts, stats)."""
signals: Dict[str, Any] = field(default_factory=dict)
version: int = 0
timestamp: float = field(default_factory=lambda: time.time())
"""Signals shared among participants (e.g., forecasts, priors).
def update(self, key: str, value: Any) -> None:
self.signals[key] = value
- signals: a dictionary of named signals with values
- timestamp: last update time
- provenance: optional provenance info
"""
signals: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
provenance: Dict[str, Any] = field(default_factory=dict)
version: int = 0
def update(self, name: str, value: Any) -> None:
self.signals[name] = value
self.timestamp = datetime.utcnow()
@dataclass
class DualVariables:
"""Optimization multipliers / prices in the canonical form."""
"""Optimization multipliers / prices.
- multipliers: dict of dual variables keyed by constraint name
- last_update: timestamp
"""
multipliers: Dict[str, float] = field(default_factory=dict)
last_update: datetime = field(default_factory=datetime.utcnow)
primal: Dict[str, Any] = field(default_factory=dict)
version: int = 0
timestamp: float = field(default_factory=lambda: time.time())
def set(self, name: str, value: float) -> None:
self.multipliers[name] = float(value)
self.last_update = datetime.utcnow()
@dataclass
class PlanDelta:
"""Incremental plan updates with optional metadata."""
delta_id: str = ""
updates: Dict[str, Any] = field(default_factory=dict)
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float = field(default_factory=lambda: time.time())
# Backwards-compat alias for older API that used `delta` key
"""Incremental plan change with metadata.
Backwards-compat: tests may construct with delta_id/updates keywords.
- delta: arbitrary payload describing the change
- delta_id: optional identifier for the delta
- updates: optional updates payload
- timestamp: time of the delta generation
- metadata: per-message metadata (e.g., version, source)
"""
delta: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self):
# If a legacy `delta` is provided and `updates` is empty, migrate
if self.delta and not self.updates:
self.updates = self.delta
@dataclass
class AuditLogEntry:
ts: float
event: str
details: Dict[str, Any] = field(default_factory=dict)
delta_id: str | None = None
updates: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
metadata: Dict[str, Any] = field(default_factory=dict)
@dataclass
class AuditLog:
"""Tamper-evident-ish log placeholder (in-MEMORY for MVP)."""
entries: List[AuditLogEntry] = field(default_factory=list)
"""Tamper-evident like log placeholder for governance anchors.
def log(self, event: str, details: Dict[str, Any] | None = None) -> None:
if details is None:
details = {}
self.entries.append(AuditLogEntry(ts=time.time(), event=event, details=details))
- entries: list of log entries (messages + hash placeholders)
"""
def add_entry(self, entry: Dict[str, Any]) -> None:
"""Append an audit entry from a dict (used by tests)."""
self.entries.append(
AuditLogEntry(ts=time.time(), event=(entry.get("event") or ""), details=entry)
)
entries: list = field(default_factory=list)
def add(self, entry: str) -> None:
# In a full implementation this would include cryptographic hashes
self.entries.append({"ts": datetime.utcnow().isoformat(), "entry": entry})
def add_entry(self, entry: str) -> None:
# Backwards-compatible alias used by tests
self.add(entry)
def log(self, event: str, payload: object) -> None:
# Simple structured log entry for tests
self.entries.append({"ts": datetime.utcnow().isoformat(), "event": event, "payload": payload})
@dataclass
class SafetyBudget:
"""Budgeting for safety-related constraints (e.g., device max currents, voltage variations)."""
"""Simple safety budget controlling device limits per site."""
enabled: bool
max_current_draw_a: float
max_voltage_variation_pu: float
device_limits: Dict[str, float] = field(default_factory=dict)
def update(self, device_id: str, limit: float) -> None:
"""Update per-device safety limit."""
self.device_limits[device_id] = limit
def update(self, site_id: str, limit: float) -> None:
self.device_limits[site_id] = float(limit)
@dataclass
class PrivacyBudget:
"""Budgeting for per-signal privacy leakage across federation."""
"""Very small privacy budget helper.
- allowed_signals: list of signal names that can be budgeted
- total_budget_units: total budget available for all signals
- per_signal_budget: remaining budget per signal
"""
enabled: bool
allowed_signals: List[str] = field(default_factory=list)
allowed_signals: list[str] = field(default_factory=list)
total_budget_units: float = 0.0
per_signal_budget: Dict[str, float] = field(default_factory=dict)
def use(self, signal: str, amount: float) -> None:
"""Consume budget for a given signal. Non-existent signals start at 0."""
current = self.per_signal_budget.get(signal, 0.0)
new_value = max(0.0, current - amount)
self.per_signal_budget[signal] = new_value
__all__ = [
"LocalProblem",
"SharedVariables",
"DualVariables",
"PlanDelta",
"AuditLog",
"SafetyBudget",
"PrivacyBudget",
]
if signal not in self.per_signal_budget:
self.per_signal_budget[signal] = self.total_budget_units
self.per_signal_budget[signal] = max(0.0, self.per_signal_budget[signal] - float(amount))

View File

@ -1,41 +1,23 @@
from __future__ import annotations
"""Minimal DSL sketch for LocalProblem/SharedVariables/PlanDelta primitives.
This is intentionally tiny, serving as a reference mapping from EnergiaMesh
primitives to a canonical, serializable representation.
"""
from dataclasses import dataclass, asdict, field
from typing import Any, Dict
from dataclasses import dataclass, field
from typing import Dict, Any
@dataclass
class LocalProblemDSL:
site_id: str
objective: str
def to_dict(self) -> Dict[str, Any]:
return asdict(self)
parameters: Dict[str, Any] = field(default_factory=dict)
@dataclass
class SharedVariablesDSL:
signals: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict[str, Any]:
return {"signals": self.signals}
@dataclass
class PlanDeltaDSL:
delta_id: str
updates: Dict[str, Any] = field(default_factory=dict)
delta: Dict[str, Any] = field(default_factory=dict)
delta_id: str | None = None
metadata: Dict[str, Any] = field(default_factory=dict)
timestamp: float | int = 0
def to_dict(self) -> Dict[str, Any]:
return {"delta_id": self.delta_id, "updates": self.updates, "metadata": self.metadata, "timestamp": self.timestamp}
__all__ = ["LocalProblemDSL", "SharedVariablesDSL", "PlanDeltaDSL"]

View File

@ -4,6 +4,34 @@ from dataclasses import dataclass, field
from typing import Dict, Optional
@dataclass
class Contract:
name: str
version: str
schema: Dict[str, object] = field(default_factory=dict)
adapter: Optional[str] = None
class GraphOfContractsRegistry:
"""In-memory registry for versioned contracts/adapters.
This is a minimal skeleton to bootstrap interoperability. Extend with a
persistent backend and conformance checks for a production MVP.
"""
def __init__(self) -> None:
self._contracts: Dict[str, ContractAdapterInfo] = {}
def register(self, contract_id: str, info: "ContractAdapterInfo") -> None:
self._contracts[contract_id] = info
def get(self, contract_id: str) -> "ContractAdapterInfo":
return self._contracts[contract_id]
def list_contracts(self) -> list[str]:
return list(self._contracts.keys())
@dataclass
class ContractAdapterInfo:
name: str
@ -11,25 +39,5 @@ class ContractAdapterInfo:
description: str
endpoints: Dict[str, str] = field(default_factory=dict)
class GraphOfContractsRegistry:
"""In-memory registry for versioned contracts and adapters (MVP).
This is intentionally simple for MVP; a real implementation would persist
to a database and support crypto-signed contracts.
"""
def __init__(self) -> None:
self.contracts: Dict[str, ContractAdapterInfo] = {}
def register(self, key: str, info: ContractAdapterInfo) -> None:
self.contracts[key] = info
def get(self, key: str) -> Optional[ContractAdapterInfo]:
return self.contracts.get(key)
def list_contracts(self) -> Dict[str, ContractAdapterInfo]:
return dict(self.contracts)
__all__ = ["GraphOfContractsRegistry", "ContractAdapterInfo"]
# Compatibility alias expected by existing imports (e.g., energiamesh.__init__)
GraphOfContracts = GraphOfContractsRegistry