"""Idea138 GuardRailOps - Federated IR MVP (core primitives and orchestration scaffold).""" from __future__ import annotations from dataclasses import dataclass, field from typing import Dict, List, Any import hashlib import time @dataclass class LocalIRTask: id: str description: str action: str # e.g., 'detect', 'contain', 'recover' status: str # 'pending'|'in_progress'|'completed' @dataclass class SharedTelemetry: signals: Dict[str, Any] = field(default_factory=dict) budget: float = 0.0 # privacy/telemetry budget placeholder timestamp: float = field(default_factory=time.time) @dataclass class PlanDelta: delta_id: str parent_id: str | None timestamp: float nonce: str changes: List[Dict[str, Any]] # primitive changes describing actions def digest(self) -> str: hasher = hashlib.sha256() hasher.update(self.delta_id.encode()) hasher.update(str(self.timestamp).encode()) hasher.update((self.nonce or "").encode()) for ch in self.changes: hasher.update(str(ch).encode()) return hasher.hexdigest() @dataclass class AuditLogEntry: entry_id: str event: str detail: Dict[str, Any] = field(default_factory=dict) signature: str = "" # placeholder signature def sign(self, key: str) -> None: data = f"{self.entry_id}:{self.event}:{self.detail}:{key}" self.signature = hashlib.sha256(data.encode()).hexdigest() class DeltaSyncEngine: """Minimal offline delta synchronization engine (in-memory).""" def __init__(self) -> None: self.ledger: List[AuditLogEntry] = [] def apply_delta(self, state: Dict[str, Any], delta: PlanDelta) -> Dict[str, Any]: # Simple deterministic application: iterate changes and update state new_state = dict(state) for change in delta.changes: # Each change should be a dict with 'key' and 'value' and optional 'op' key = change.get("key") value = change.get("value") op = change.get("op", "set") if op == "set": new_state[key] = value elif op == "delete": new_state.pop(key, None) elif op == "append": if key not in new_state: new_state[key] = [value] else: new_state[key].append(value) return new_state def log(self, event: str, detail: Dict[str, Any], signer_key: str | None = None) -> AuditLogEntry: entry = AuditLogEntry(entry_id=hashlib.sha256((event + str(time.time())).encode()).hexdigest(), event=event, detail=detail) if signer_key: entry.sign(signer_key) self.ledger.append(entry) return entry __all__ = [ "LocalIRTask", "SharedTelemetry", "PlanDelta", "AuditLogEntry", "DeltaSyncEngine", ]