30 lines
767 B
Python
30 lines
767 B
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import time
|
|
|
|
|
|
class RunLog:
|
|
def __init__(self, path: str = "deltax_run.log"):
|
|
self.path = path
|
|
|
|
def log(self, entry: dict) -> None:
|
|
with open(self.path, "a", encoding="utf-8") as f:
|
|
payload = {
|
|
"ts": time.time(),
|
|
"event": entry,
|
|
}
|
|
f.write(json.dumps(payload) + "\n")
|
|
|
|
@staticmethod
|
|
def replay(path: str):
|
|
with open(path, "r", encoding="utf-8") as f:
|
|
for line in f:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
obj = json.loads(line)
|
|
yield obj
|
|
except json.JSONDecodeError:
|
|
continue
|