exoroute-cross-venue-order-.../exoroute/delta_store.py

46 lines
1.4 KiB
Python

import os
import json
from pathlib import Path
from typing import List, Dict, Any
from datetime import datetime
from .core import PlanDelta
class DeltaStore:
def __init__(self, path: str = ".exoroute_deltas"):
self.path = Path(path)
self.path.mkdir(parents=True, exist_ok=True)
self.log_file = self.path / "deltas.log"
if not self.log_file.exists():
self.log_file.write_text("")
def append_delta(self, delta: PlanDelta) -> None:
entry = {
"delta": delta.delta,
"timestamp": delta.timestamp,
"author": delta.author,
"contract_id": delta.contract_id,
"signature": delta.signature,
}
with self.log_file.open("a", encoding="utf-8") as f:
f.write(json.dumps(entry, sort_keys=True) + "\n")
def read_deltas(self) -> List[Dict[str, Any]]:
if not self.log_file.exists():
return []
deltas = []
with self.log_file.open("r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
deltas.append(json.loads(line))
except json.JSONDecodeError:
continue
return deltas
def clear(self) -> None:
self.log_file.write_text("")