74 lines
1.9 KiB
Python
74 lines
1.9 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass, field
|
|
import time
|
|
import uuid
|
|
from typing import List, Dict, Any
|
|
|
|
|
|
@dataclass
|
|
class LocalRobotPlan:
|
|
fleet_id: str
|
|
robot_id: str
|
|
tasks: List[str] # high-level tasks like ["pick", "place"]
|
|
path: List[Dict[str, float]] # simplified path representation as list of coordinates
|
|
objectives: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
def __post_init__(self):
|
|
if not isinstance(self.tasks, list):
|
|
raise TypeError("tasks must be a list of strings")
|
|
|
|
|
|
@dataclass
|
|
class SharedSignals:
|
|
signals: Dict[str, float] # aggregated metrics by signal name
|
|
provenance: str | None = None
|
|
timestamp: float | None = None
|
|
|
|
|
|
@dataclass
|
|
class PlanDelta:
|
|
delta_id: str
|
|
fleet_id: str
|
|
changes: Dict[str, Any]
|
|
# Metadata for verifiable reconciliation and auditing
|
|
version: int = 1
|
|
nonce: str | None = None
|
|
timestamp: float | None = None
|
|
|
|
def __post_init__(self):
|
|
# Ensure timestamp is set for auditing
|
|
if self.timestamp is None:
|
|
self.timestamp = time.time()
|
|
# Ensure a unique nonce for replay-safe reconciliation
|
|
if self.nonce is None:
|
|
self.nonce = uuid.uuid4().hex
|
|
|
|
|
|
@dataclass
|
|
class PrivacyBudget:
|
|
epsilon: float # privacy budget per signal
|
|
remaining: Dict[str, float] = field(default_factory=dict)
|
|
|
|
def consume(self, signal: str, amount: float) -> bool:
|
|
rem = self.remaining.get(signal, self.epsilon)
|
|
if amount > rem:
|
|
return False
|
|
self.remaining[signal] = rem - amount
|
|
return True
|
|
|
|
|
|
@dataclass
|
|
class DualVariables:
|
|
fleet_id: str
|
|
alphas: Dict[str, float] # dual variables per signal
|
|
betas: Dict[str, float]
|
|
|
|
|
|
@dataclass
|
|
class AuditLogEntry:
|
|
entry_id: str
|
|
fleet_id: str
|
|
event: str
|
|
details: Dict[str, Any] = field(default_factory=dict)
|