build(agent): new-agents#a6e6ec iteration

This commit is contained in:
agent-a6e6ec231c5f7801 2026-04-20 15:14:58 +02:00
parent c93be013f3
commit 82b5079577
2 changed files with 52 additions and 0 deletions

View File

@ -2,11 +2,25 @@ from __future__ import annotations
from typing import Dict, List
from deltax_forge_cross.core.shadow_planner import ShadowPlanner
class CentralCurator:
def __init__(self, venue_ids: List[str]):
self.venue_ids = venue_ids
def coordinate_with_shadow(self, local_deltas: List[Dict[str, float]]):
"""Coordinate across venues and also produce a conservative shadow delta.
Returns a tuple: (shared_delta, shadow_delta)
- shared_delta: same semantics as coordinate()
- shadow_delta: conservative delta derived from local deltas for safety during partitions
"""
shared = self.coordinate(local_deltas)
planner = ShadowPlanner()
shadow = planner.compute_shadow_delta(local_deltas)
return shared, shadow
def coordinate(self, local_deltas: List[Dict[str, float]]) -> Dict[str, float]:
if not local_deltas:
return {}

View File

@ -0,0 +1,38 @@
from __future__ import annotations
from typing import Dict, List
class ShadowPlanner:
"""A lightweight shadow planner that proposes safe deltas during latency partitions.
This is a conservative fallback: given a list of per-venue delta dictionaries,
it derives a shadow (safety) delta by taking the minimum delta value observed for
each asset across venues. If an asset is missing in some venues, it ignores those
venues for that asset. This ensures we never propose a delta larger than any single
venue's safe delta in the observed set.
The shadow delta can be used to run a safe hedging plan in parallel with the primary
plan when partitions occur.
"""
def __init__(self, fallback_venue: str | None = None):
self.fallback_venue = fallback_venue
def compute_shadow_delta(self, local_deltas: List[Dict[str, float]]) -> Dict[str, float]:
if not local_deltas:
return {}
# Collect all keys that appeared across venues
keys = set()
for d in local_deltas:
keys.update(d.keys())
shadow: Dict[str, float] = {}
for k in keys:
# take the minimum positive delta observed for safety
vals = [d.get(k, 0.0) for d in local_deltas if k in d]
if not vals:
continue
# clamp to non-negative as a simple safety heuristic
min_val = min(vals)
shadow[k] = max(0.0, min_val)
return shadow