build(agent): molt-x#ed374b iteration
This commit is contained in:
parent
20c164092b
commit
3c634384d8
|
|
@ -13,13 +13,20 @@ def _hash(obj: object) -> str:
|
|||
class DeterministicReplayEngine:
|
||||
"""A tiny deterministic replay engine that applies deltas to an in-memory state.
|
||||
|
||||
State is a dict with keys:
|
||||
- nodes: dict[id, SignalNode]
|
||||
- edges: list[Edge]
|
||||
- hedges: dict[id, HedgePlan]
|
||||
This engine supports two usage modes to satisfy multiple test surfaces:
|
||||
- State-based delta application (two-argument form): apply_delta(state, delta)
|
||||
Returns a new state dict with the delta applied. This is used by core tests
|
||||
that exercise deterministic state evolution with simple keys like
|
||||
"signals" and "hedges" along with a version field.
|
||||
- Traditional delta-based application (one-argument form): apply_delta(delta)
|
||||
Returns a hash of the applied delta together with the current graph state, for
|
||||
compatibility with the MVP tests that rely on a hash-based replay platform.
|
||||
The internal state keeps legacy graph primitives (nodes/edges/hedges) for
|
||||
MVP-style tests.
|
||||
"""
|
||||
|
||||
def __init__(self) -> None:
|
||||
# Internal mutable state used by the MVP delta path
|
||||
self.nodes = {}
|
||||
self.edges = []
|
||||
self.hedges = {}
|
||||
|
|
@ -30,24 +37,63 @@ class DeterministicReplayEngine:
|
|||
self._node_counter += 1
|
||||
return f"n{self._node_counter}"
|
||||
|
||||
def apply_delta(self, delta):
|
||||
# delta can contain: add_nodes, add_edges, add_hedges
|
||||
if isinstance(delta, dict) and "add_nodes" in delta and delta["add_nodes"]:
|
||||
for n in delta["add_nodes"]:
|
||||
def apply_delta(self, base_or_delta, delta=None):
|
||||
# Dual-API support:
|
||||
# 1) If delta is provided, treat base_or_delta as the base state and apply delta
|
||||
# to produce and return a new state (state-based delta application).
|
||||
# 2) If delta is None, treat base_or_delta as a delta payload and apply it
|
||||
# to the internal state, returning a hash (legacy MVP path).
|
||||
if delta is not None:
|
||||
# State-based delta application
|
||||
base_state = dict(base_or_delta) if base_or_delta is not None else {}
|
||||
# Normalize to lists for mutating safely
|
||||
signals = list(base_state.get("signals", []))
|
||||
hedges = list(base_state.get("hedges", []))
|
||||
version = base_state.get("version")
|
||||
|
||||
d = delta
|
||||
if isinstance(d, dict):
|
||||
if "signals" in d:
|
||||
signals.extend(d["signals"])
|
||||
if "hedges" in d:
|
||||
hedges.extend(d["hedges"])
|
||||
if "version" in d:
|
||||
version = d["version"]
|
||||
|
||||
new_state: Dict[str, object] = {
|
||||
"signals": signals,
|
||||
"hedges": hedges,
|
||||
}
|
||||
if version is not None:
|
||||
new_state["version"] = version
|
||||
else:
|
||||
new_state["version"] = 0 if new_state.get("version") is None else new_state["version"]
|
||||
return new_state
|
||||
|
||||
# Legacy MVP path: apply delta to internal state and return a hash
|
||||
delta_payload = base_or_delta
|
||||
# Proliferate nodes
|
||||
if isinstance(delta_payload, dict) and "add_nodes" in delta_payload and delta_payload["add_nodes"]:
|
||||
for n in delta_payload["add_nodes"]:
|
||||
if getattr(n, "id", None) is None:
|
||||
self._node_counter += 1
|
||||
n = SignalNode(asset=n.asset, venue=n.venue, signal_type=n.signal_type, timestamp=n.timestamp, quality=n.quality, id=f"n{self._node_counter}")
|
||||
# Instantiate a canonical id if missing
|
||||
n = type(n)(asset=getattr(n, "asset", None), venue=getattr(n, "venue", None),
|
||||
signal_type=getattr(n, "signal_type", None), timestamp=getattr(n, "timestamp", None),
|
||||
quality=getattr(n, "quality", 1.0), id=f"n{self._node_counter}")
|
||||
self.nodes[n.id] = n
|
||||
|
||||
if isinstance(delta, dict) and "add_edges" in delta and delta["add_edges"]:
|
||||
for e in delta["add_edges"]:
|
||||
# Edges
|
||||
if isinstance(delta_payload, dict) and "add_edges" in delta_payload and delta_payload["add_edges"]:
|
||||
for e in delta_payload["add_edges"]:
|
||||
self.edges.append(e)
|
||||
|
||||
if isinstance(delta, dict) and "add_hedges" in delta and delta["add_hedges"]:
|
||||
for h in delta["add_hedges"]:
|
||||
# Hedge plans
|
||||
if isinstance(delta_payload, dict) and "add_hedges" in delta_payload and delta_payload["add_hedges"]:
|
||||
for h in delta_payload["add_hedges"]:
|
||||
self.hedges[h.id] = h
|
||||
|
||||
h = _hash((delta, self.nodes, self.edges, self.hedges))
|
||||
h = _hash((delta_payload, self.nodes, self.edges, self.hedges))
|
||||
self._applied_hashes.append(h)
|
||||
return h
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue