"""Mutable debug snapshot for ``zkac-node serve`` (optional, thread-safe).""" from __future__ import annotations import base64 import json import threading import time import uuid from collections import deque from pathlib import Path from typing import Any import zkac class ServerDebugState: """ Live server introspection for an admin dashboard. Connection records are updated from worker threads; readers take a consistent snapshot via :meth:`snapshot`. """ def __init__(self, *, userid: str, data_dir: str) -> None: self.userid = userid self.data_dir = str(Path(data_dir).resolve()) self._lock = threading.Lock() self._started_wall = time.time() self._started_mono = time.monotonic() self._listen: tuple[str, int] = ("", 0) self._server_pk_hex: str | None = None self._registries_loaded = 0 self._active: dict[str, dict[str, Any]] = {} self._history: deque[dict[str, Any]] = deque(maxlen=64) def set_listen(self, host: str, port: int) -> None: with self._lock: self._listen = (host, port) def set_boot_info(self, *, server_pk_hex: str, registries_loaded: int) -> None: with self._lock: self._server_pk_hex = server_pk_hex self._registries_loaded = registries_loaded def open_connection(self, peer: str) -> str: cid = uuid.uuid4().hex[:10] rec: dict[str, Any] = { "id": cid, "peer": peer, "opened_wall": time.time(), "opened_mono": time.monotonic(), "phase": "accepted", "hello_op": None, "transcript_hash_hex": None, "auth_registry_hex": None, "auth_role_hex": None, "auth_ok": None, "last_mgmt_cmd": None, "mgmt_commands": 0, "bytes_echoed": 0, "error": None, } with self._lock: self._active[cid] = rec return cid def update_connection(self, cid: str, **fields: Any) -> None: with self._lock: rec = self._active.get(cid) if rec is None: return rec.update(fields) def note_mgmt_command(self, cid: str, cmd: dict) -> None: name = cmd.get("cmd") with self._lock: rec = self._active.get(cid) if rec is None: return rec["last_mgmt_cmd"] = name rec["mgmt_commands"] = int(rec.get("mgmt_commands") or 0) + 1 def note_echo_chunk(self, cid: str, n: int) -> None: with self._lock: rec = self._active.get(cid) if rec is None: return rec["bytes_echoed"] = int(rec.get("bytes_echoed") or 0) + n def close_connection(self, cid: str, *, error: str | None = None) -> None: with self._lock: rec = self._active.pop(cid, None) if rec is None: return rec = {**rec, "closed_wall": time.time(), "closed_mono": time.monotonic()} if error: rec["error"] = error self._history.appendleft(rec) def snapshot(self) -> dict[str, Any]: with self._lock: uptime = time.monotonic() - self._started_mono return { "userid": self.userid, "data_dir": self.data_dir, "started_wall": self._started_wall, "uptime_s": round(uptime, 3), "listen": {"host": self._listen[0], "port": self._listen[1]}, "server_public_key_hex": self._server_pk_hex, "registries_loaded_boot": self._registries_loaded, "active_connections": list(self._active.values()), "recent_connections": list(self._history), "active_connection_count": len(self._active), } def server_key_meta(data_dir: Path) -> dict[str, Any]: """Non-secret metadata from ``server_key.json``.""" path = data_dir / "server_key.json" if not path.is_file(): return {"present": False, "path": str(path)} try: data = json.loads(path.read_text()) pub = base64.b64decode(data["public_b64"]) sec = base64.b64decode(data["secret_b64"]) return { "present": True, "path": str(path), "public_key_hex": pub.hex(), "secret_key_stored_bytes": len(sec), } except Exception as exc: return {"present": True, "path": str(path), "error": str(exc)} def collect_registry_debug(data_dir: Path) -> list[dict[str, Any]]: """Per-registry rows from on-disk state (deserialized for version / ids).""" reg_dir = data_dir / "registries" if not reg_dir.is_dir(): return [] rows: list[dict[str, Any]] = [] for p in sorted(reg_dir.glob("*.state")): rid_file = p.stem cert_path = reg_dir / f"{rid_file}.cert" raw = p.read_bytes() row: dict[str, Any] = { "file_registry_id_hex": rid_file, "state_path": str(p), "state_bytes": len(raw), "cert_path": str(cert_path) if cert_path.exists() else None, "cert_bytes": cert_path.stat().st_size if cert_path.exists() else 0, } try: st = zkac.RegistryState.deserialize(raw) row["parsed_ok"] = True row["version"] = st.version() row["registry_id_hex"] = st.registry_id().hex() row["state_hash_hex"] = st.state_hash().hex() except Exception as exc: row["parsed_ok"] = False row["parse_error"] = str(exc) rows.append(row) return rows def data_dir_tree(data_dir: Path, *, max_files: int = 200) -> list[dict[str, Any]]: """Small file listing for transparency (names + sizes only).""" out: list[dict[str, Any]] = [] root = data_dir.resolve() if not root.is_dir(): return out n = 0 for path in sorted(root.rglob("*")): if not path.is_file(): continue if n >= max_files: out.append({"truncated": True, "note": f"listing capped at {max_files} files"}) break rel = path.relative_to(root) try: out.append({"path": str(rel).replace("\\", "/"), "bytes": path.stat().st_size}) n += 1 except OSError: continue return out