ZKAC/cli/zkac_cli/server_debug.py
2026-05-06 00:08:46 +02:00

187 lines
6.3 KiB
Python

"""Mutable debug snapshot for ``zkac-node serve`` (optional, thread-safe)."""
from __future__ import annotations
import base64
import json
import threading
import time
import uuid
from collections import deque
from pathlib import Path
from typing import Any
import zkac
class ServerDebugState:
"""
Live server introspection for an admin dashboard.
Connection records are updated from worker threads; readers take a consistent
snapshot via :meth:`snapshot`.
"""
def __init__(self, *, userid: str, data_dir: str) -> None:
self.userid = userid
self.data_dir = str(Path(data_dir).resolve())
self._lock = threading.Lock()
self._started_wall = time.time()
self._started_mono = time.monotonic()
self._listen: tuple[str, int] = ("", 0)
self._server_pk_hex: str | None = None
self._registries_loaded = 0
self._active: dict[str, dict[str, Any]] = {}
self._history: deque[dict[str, Any]] = deque(maxlen=64)
def set_listen(self, host: str, port: int) -> None:
with self._lock:
self._listen = (host, port)
def set_boot_info(self, *, server_pk_hex: str, registries_loaded: int) -> None:
with self._lock:
self._server_pk_hex = server_pk_hex
self._registries_loaded = registries_loaded
def open_connection(self, peer: str) -> str:
cid = uuid.uuid4().hex[:10]
rec: dict[str, Any] = {
"id": cid,
"peer": peer,
"opened_wall": time.time(),
"opened_mono": time.monotonic(),
"phase": "accepted",
"hello_op": None,
"transcript_hash_hex": None,
"auth_registry_hex": None,
"auth_role_hex": None,
"auth_ok": None,
"last_mgmt_cmd": None,
"mgmt_commands": 0,
"bytes_echoed": 0,
"error": None,
}
with self._lock:
self._active[cid] = rec
return cid
def update_connection(self, cid: str, **fields: Any) -> None:
with self._lock:
rec = self._active.get(cid)
if rec is None:
return
rec.update(fields)
def note_mgmt_command(self, cid: str, cmd: dict) -> None:
name = cmd.get("cmd")
with self._lock:
rec = self._active.get(cid)
if rec is None:
return
rec["last_mgmt_cmd"] = name
rec["mgmt_commands"] = int(rec.get("mgmt_commands") or 0) + 1
def note_echo_chunk(self, cid: str, n: int) -> None:
with self._lock:
rec = self._active.get(cid)
if rec is None:
return
rec["bytes_echoed"] = int(rec.get("bytes_echoed") or 0) + n
def close_connection(self, cid: str, *, error: str | None = None) -> None:
with self._lock:
rec = self._active.pop(cid, None)
if rec is None:
return
rec = {**rec, "closed_wall": time.time(), "closed_mono": time.monotonic()}
if error:
rec["error"] = error
self._history.appendleft(rec)
def snapshot(self) -> dict[str, Any]:
with self._lock:
uptime = time.monotonic() - self._started_mono
return {
"userid": self.userid,
"data_dir": self.data_dir,
"started_wall": self._started_wall,
"uptime_s": round(uptime, 3),
"listen": {"host": self._listen[0], "port": self._listen[1]},
"server_public_key_hex": self._server_pk_hex,
"registries_loaded_boot": self._registries_loaded,
"active_connections": list(self._active.values()),
"recent_connections": list(self._history),
"active_connection_count": len(self._active),
}
def server_key_meta(data_dir: Path) -> dict[str, Any]:
"""Non-secret metadata from ``server_key.json``."""
path = data_dir / "server_key.json"
if not path.is_file():
return {"present": False, "path": str(path)}
try:
data = json.loads(path.read_text())
pub = base64.b64decode(data["public_b64"])
sec = base64.b64decode(data["secret_b64"])
return {
"present": True,
"path": str(path),
"public_key_hex": pub.hex(),
"secret_key_stored_bytes": len(sec),
}
except Exception as exc:
return {"present": True, "path": str(path), "error": str(exc)}
def collect_registry_debug(data_dir: Path) -> list[dict[str, Any]]:
"""Per-registry rows from on-disk state (deserialized for version / ids)."""
reg_dir = data_dir / "registries"
if not reg_dir.is_dir():
return []
rows: list[dict[str, Any]] = []
for p in sorted(reg_dir.glob("*.state")):
rid_file = p.stem
cert_path = reg_dir / f"{rid_file}.cert"
raw = p.read_bytes()
row: dict[str, Any] = {
"file_registry_id_hex": rid_file,
"state_path": str(p),
"state_bytes": len(raw),
"cert_path": str(cert_path) if cert_path.exists() else None,
"cert_bytes": cert_path.stat().st_size if cert_path.exists() else 0,
}
try:
st = zkac.RegistryState.deserialize(raw)
row["parsed_ok"] = True
row["version"] = st.version()
row["registry_id_hex"] = st.registry_id().hex()
row["state_hash_hex"] = st.state_hash().hex()
except Exception as exc:
row["parsed_ok"] = False
row["parse_error"] = str(exc)
rows.append(row)
return rows
def data_dir_tree(data_dir: Path, *, max_files: int = 200) -> list[dict[str, Any]]:
"""Small file listing for transparency (names + sizes only)."""
out: list[dict[str, Any]] = []
root = data_dir.resolve()
if not root.is_dir():
return out
n = 0
for path in sorted(root.rglob("*")):
if not path.is_file():
continue
if n >= max_files:
out.append({"truncated": True, "note": f"listing capped at {max_files} files"})
break
rel = path.relative_to(root)
try:
out.append({"path": str(rel).replace("\\", "/"), "bytes": path.stat().st_size})
n += 1
except OSError:
continue
return out