policymesh-policy-driven-fe.../policy_mesh/core.py

115 lines
3.6 KiB
Python

from __future__ import annotations
from typing import Any, Dict, List, Optional
from dataclasses import dataclass
@dataclass
class LocalPolicySet:
domain: str
policies: Dict[str, Any] # domain-specific policy primitives
version: str
def dict(self) -> Dict[str, Any]:
return {
"domain": self.domain,
"policies": self.policies,
"version": self.version,
}
@dataclass
class GlobalConstraints:
constraints: Dict[str, Any] # mesh-level constraints (limits, envelopes, etc.)
version: str
def dict(self) -> Dict[str, Any]:
return {
"constraints": self.constraints,
"version": self.version,
}
@dataclass
class DataExposurePolicy:
allowed_data: List[str]
privacy_budget: float # simple DP budget proxy
version: str
def dict(self) -> Dict[str, Any]:
return {
"allowed_data": self.allowed_data,
"privacy_budget": self.privacy_budget,
"version": self.version,
}
class PolicyMesh:
def __init__(self) -> None:
self.local_problems: Dict[str, LocalPolicySet] = {}
self.global_constraints: Optional[GlobalConstraints] = None
# internal solver state (very lightweight ADMM-lite mock)
self._z: Dict[str, Any] = {}
self._u: Dict[str, Any] = {}
# governance ledger (in-process; can be swapped to persistent store)
self._ledger: List[Dict[str, Any]] = []
# Local problem management
def add_local_policy(self, policy: LocalPolicySet) -> None:
self.local_problems[policy.domain] = policy
def set_global_constraints(self, constraints: GlobalConstraints) -> None:
self.global_constraints = constraints
# Lightweight ADMM-like update (mock for MVP)
def admm_step(self, rho: float = 1.0) -> None:
# initialize if first run
if not self.local_problems:
return
# simple placeholder: push domain policy sums into z and reset u
for domain, policy in self.local_problems.items():
# naive objective: sum of numeric policy values where possible
total = 0.0
for k, v in policy.policies.items():
try:
total += float(v)
except Exception:
continue
self._z[domain] = total
self._u[domain] = self._u.get(domain, 0.0) # keep a scalar dual-like variable
# Commit a governance-like delta to the ledger for traceability
self._ledger.append({
"type": "admm_step",
"count": len(self._ledger) + 1,
"z": self._z.copy(),
"u": self._u.copy(),
})
# Delta-sync: merge updates from a remote partner (mock)
def delta_sync(self, remote: Dict[str, Any]) -> None:
# naive: merge remote z into local, respecting versioning if provided
for k, v in remote.get("z", {}).items():
self._z[k] = v
for k, v in remote.get("u", {}).items():
self._u[k] = v
self._ledger.append({
"type": "delta_sync",
"remote": True,
"z": self._z.copy(),
"u": self._u.copy(),
})
# Query helpers
def get_state(self) -> Dict[str, Any]:
return {
"local_problems": {d: p.dict() for d, p in self.local_problems.items()},
"global_constraints": self.global_constraints.dict() if self.global_constraints else None,
"z": self._z,
"u": self._u,
}
def get_ledger(self) -> List[Dict[str, Any]]:
return self._ledger