From 82b5079577069072f4fc8017087d88f26dbd6470 Mon Sep 17 00:00:00 2001 From: agent-a6e6ec231c5f7801 Date: Mon, 20 Apr 2026 15:14:58 +0200 Subject: [PATCH] build(agent): new-agents#a6e6ec iteration --- deltax_forge_cross/core/central_curator.py | 14 ++++++++ deltax_forge_cross/core/shadow_planner.py | 38 ++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 deltax_forge_cross/core/shadow_planner.py diff --git a/deltax_forge_cross/core/central_curator.py b/deltax_forge_cross/core/central_curator.py index 105a569..c5cb092 100644 --- a/deltax_forge_cross/core/central_curator.py +++ b/deltax_forge_cross/core/central_curator.py @@ -2,11 +2,25 @@ from __future__ import annotations from typing import Dict, List +from deltax_forge_cross.core.shadow_planner import ShadowPlanner + class CentralCurator: def __init__(self, venue_ids: List[str]): self.venue_ids = venue_ids + def coordinate_with_shadow(self, local_deltas: List[Dict[str, float]]): + """Coordinate across venues and also produce a conservative shadow delta. + + Returns a tuple: (shared_delta, shadow_delta) + - shared_delta: same semantics as coordinate() + - shadow_delta: conservative delta derived from local deltas for safety during partitions + """ + shared = self.coordinate(local_deltas) + planner = ShadowPlanner() + shadow = planner.compute_shadow_delta(local_deltas) + return shared, shadow + def coordinate(self, local_deltas: List[Dict[str, float]]) -> Dict[str, float]: if not local_deltas: return {} diff --git a/deltax_forge_cross/core/shadow_planner.py b/deltax_forge_cross/core/shadow_planner.py new file mode 100644 index 0000000..4fd14b1 --- /dev/null +++ b/deltax_forge_cross/core/shadow_planner.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import Dict, List + + +class ShadowPlanner: + """A lightweight shadow planner that proposes safe deltas during latency partitions. + + This is a conservative fallback: given a list of per-venue delta dictionaries, + it derives a shadow (safety) delta by taking the minimum delta value observed for + each asset across venues. If an asset is missing in some venues, it ignores those + venues for that asset. This ensures we never propose a delta larger than any single + venue's safe delta in the observed set. + + The shadow delta can be used to run a safe hedging plan in parallel with the primary + plan when partitions occur. + """ + + def __init__(self, fallback_venue: str | None = None): + self.fallback_venue = fallback_venue + + def compute_shadow_delta(self, local_deltas: List[Dict[str, float]]) -> Dict[str, float]: + if not local_deltas: + return {} + # Collect all keys that appeared across venues + keys = set() + for d in local_deltas: + keys.update(d.keys()) + shadow: Dict[str, float] = {} + for k in keys: + # take the minimum positive delta observed for safety + vals = [d.get(k, 0.0) for d in local_deltas if k in d] + if not vals: + continue + # clamp to non-negative as a simple safety heuristic + min_val = min(vals) + shadow[k] = max(0.0, min_val) + return shadow