""" Subprocess adapter around the existing ``zkac-node`` CLI. The demo deliberately does not import ``cli/zkac_cli/*``: every interaction with identity, registry, grant or p2p-listen flows runs through the CLI as a black box, exactly the way a third-party application would integrate. The only direct filesystem touch is reading the CLI-managed ``identity.json`` under ``ZKAC_HOME`` (default ``~/.ZKAC-FS//`` for this demo). That file is the CLI's public on-disk format and is needed locally to obtain the user's transport and issuance secrets for the file-share session and role-grant decryption. """ from __future__ import annotations import base64 import hashlib import json import os import re import shutil import subprocess import sys import threading from dataclasses import dataclass from pathlib import Path import zkac ROOT = Path(__file__).resolve().parents[1] CLI_DIR = ROOT / "cli" DEFAULT_TIMEOUT_S = 60.0 DEFAULT_LISTEN_TIMEOUT_S = 600.0 DEFAULT_DEMO_ZKAC_HOME = Path.home() / ".ZKAC-FS" # ── helpers ────────────────────────────────────────────────────────── def _b64_decode(s: str) -> bytes: return base64.b64decode(s) def zkac_home() -> Path: return Path(os.environ.get("ZKAC_HOME", DEFAULT_DEMO_ZKAC_HOME)) def user_identity_path(userid: str) -> Path: return zkac_home() / userid / "identity.json" def user_exists(userid: str) -> bool: return user_identity_path(userid).is_file() def list_local_users() -> list[str]: home = zkac_home() if not home.is_dir(): return [] return sorted(d.name for d in home.iterdir() if (d / "identity.json").is_file()) def _zkac_invocation() -> tuple[list[str], dict[str, str]]: """Resolve a runnable ``zkac-node`` command, falling back to the source tree.""" exe = shutil.which("zkac-node") if exe: return [exe], {} return [sys.executable, "-m", "zkac_cli.main"], {"PYTHONPATH": str(CLI_DIR)} @dataclass class CliResult: rc: int stdout: str stderr: str def ok(self) -> bool: return self.rc == 0 def raise_for_status(self) -> "CliResult": if self.rc != 0: raise RuntimeError( f"zkac-node failed (rc={self.rc}): {self.stderr.strip() or self.stdout.strip()}" ) return self def run_cli( args: list[str], *, timeout_s: float = DEFAULT_TIMEOUT_S, extra_env: dict[str, str] | None = None, ) -> CliResult: prefix, env_overrides = _zkac_invocation() env = { **os.environ, **env_overrides, "ZKAC_HOME": str(zkac_home()), **(extra_env or {}), } try: p = subprocess.run( prefix + args, capture_output=True, text=True, timeout=timeout_s, cwd=str(ROOT), env=env, ) except subprocess.TimeoutExpired as exc: return CliResult(rc=124, stdout=exc.stdout or "", stderr=f"timed out after {timeout_s}s") return CliResult(rc=p.returncode, stdout=p.stdout, stderr=p.stderr) # ── identity ───────────────────────────────────────────────────────── @dataclass class Identity: userid: str issuance_pk_hex: str issuance_secret_hex: str transport_pk_hex: str transport_secret_hex: str grant_token_b64: str contact_bundle: str # value of ``zkac-node user show --peer ...`` def load_identity_secrets(userid: str) -> dict[str, str]: """Read the CLI-managed ``identity.json`` (creator: ``zkac-node user create``).""" path = user_identity_path(userid) if not path.is_file(): raise FileNotFoundError(f"unknown user {userid!r} (run: zkac-node user create {userid})") data = json.loads(path.read_text()) def hexed(b64_field: str) -> str: return _b64_decode(data[b64_field]).hex() return { "issuance_pk_hex": hexed("issuance_public_b64"), "issuance_secret_hex": hexed("issuance_secret_b64"), "transport_pk_hex": hexed("transport_public_b64"), "transport_secret_hex": hexed("transport_secret_b64"), "grant_token_b64": data["grant_token_b64"], } def create_user(userid: str) -> CliResult: return run_cli(["user", "create", userid]) def show_user_contact(userid: str, peer: str | None = None) -> str: args = ["user", "show", userid] if peer: args += ["--peer", peer] res = run_cli(args).raise_for_status() for line in res.stdout.splitlines(): line = line.strip() # contact bundle is the indented blob after "share contact:" if line and not line.startswith(("user:", "share contact:", "issuance pk:", "p2p transport pk:", "(", "registries owned:", "credentials:", "contact peer endpoint:", "owner admin")): if re.fullmatch(r"[A-Za-z0-9_\-]+", line): return line raise RuntimeError("could not parse contact bundle from `zkac-node user show`") def get_identity(userid: str, peer: str | None = None) -> Identity: secrets = load_identity_secrets(userid) contact = show_user_contact(userid, peer=peer) return Identity( userid=userid, issuance_pk_hex=secrets["issuance_pk_hex"], issuance_secret_hex=secrets["issuance_secret_hex"], transport_pk_hex=secrets["transport_pk_hex"], transport_secret_hex=secrets["transport_secret_hex"], grant_token_b64=secrets["grant_token_b64"], contact_bundle=contact, ) # ── registry / pinning / grants ────────────────────────────────────── def server_pin(userid: str, server: str, server_pk_hex: str) -> CliResult: return run_cli(["server", "pin", userid, server, "--key", server_pk_hex]) def load_pinned_server_key(userid: str, server: str) -> str | None: digest = hashlib.sha256(server.encode("utf-8")).hexdigest() pin_file = zkac_home() / userid / "servers" / f"sha256_{digest}.json" if not pin_file.is_file(): return None data = json.loads(pin_file.read_text()) server_pk_b64 = data.get("server_public_key_b64") if not isinstance(server_pk_b64, str): return None return _b64_decode(server_pk_b64).hex() def registry_create(userid: str, server: str, roles: list[str]) -> str: res = run_cli(["registry", "create", userid, server, "--roles", ",".join(roles)]).raise_for_status() for line in res.stdout.splitlines(): line = line.strip() if line.startswith("registry created:"): return line.split(":", 1)[1].strip() raise RuntimeError(f"could not parse registry id from output:\n{res.stdout}") def registry_list(userid: str) -> list[dict]: res = run_cli(["registry", "list", userid]).raise_for_status() out: list[dict] = [] for line in res.stdout.splitlines(): s = line.strip() m = re.match( r"^([0-9a-fA-F]+)\s+@\s+(\S+)\s+roles=\[(.*)\]\s*$", s, ) if m: roles_raw = m.group(3) roles = [r.strip().strip("'\"") for r in roles_raw.split(",") if r.strip()] out.append({"registry_id": m.group(1), "server": m.group(2), "roles": roles}) return out def registry_add_roles(userid: str, server: str, registry_id: str, add_roles: list[str]) -> CliResult: return run_cli([ "registry", "update", userid, server, "--registry", registry_id, "--add-roles", ",".join(add_roles), ]).raise_for_status() def credentials_list(userid: str) -> list[dict]: """Return locally held BBS credentials as [{registry_id, role}].""" res = run_cli(["credentials", "list", userid]).raise_for_status() creds: list[dict] = [] in_local = False for line in res.stdout.splitlines(): if line.startswith("local credentials:"): in_local = True continue if not line.strip(): continue if line.startswith(("owner admin capability:", "registries owned:")): in_local = False continue if in_local: s = line.strip() if s == "(none)": continue if ":" in s: rid, role = s.rsplit(":", 1) creds.append({"registry_id": rid.strip(), "role": role.strip()}) return creds def grant( userid: str, server: str, registry_id: str, role_name: str, recipient_contact: str, ) -> CliResult: return run_cli([ "grant", userid, "--server", server, "--registry", registry_id, "--role", role_name, "--to", recipient_contact, ]).raise_for_status() # ── background p2p-listen ──────────────────────────────────────────── class P2PListener: """Background ``zkac-node p2p-listen`` process the TUI can stop and watch.""" def __init__( self, userid: str, host: str = "127.0.0.1", port: int = 0, timeout_s: float = DEFAULT_LISTEN_TIMEOUT_S, ) -> None: self.userid = userid self.host = host self.port = port if port else _pick_random_port(host) self.timeout_s = timeout_s self._proc: subprocess.Popen[str] | None = None self._stdout_buf = "" self._lock = threading.Lock() @property def address(self) -> str: return f"{self.host}:{self.port}" def start(self) -> None: prefix, env_overrides = _zkac_invocation() env = { **os.environ, **env_overrides, "PYTHONUNBUFFERED": "1", "ZKAC_HOME": str(zkac_home()), } args = prefix + [ "p2p-listen", self.userid, "--host", self.host, "--port", str(self.port), "--timeout", str(self.timeout_s), ] self._proc = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, cwd=str(ROOT), env=env, bufsize=1, ) threading.Thread(target=self._drain, daemon=True).start() def _drain(self) -> None: if self._proc is None or self._proc.stdout is None: return for line in self._proc.stdout: with self._lock: self._stdout_buf += line def is_running(self) -> bool: return self._proc is not None and self._proc.poll() is None def stop(self) -> None: if self._proc is None: return if self._proc.poll() is None: self._proc.terminate() try: self._proc.wait(timeout=2.0) except subprocess.TimeoutExpired: self._proc.kill() self._proc.wait() def output(self) -> str: with self._lock: return self._stdout_buf def parse_received(self) -> dict | None: """Extract ``{registry_id, role}`` from CLI output once a grant has been received.""" out = self.output() rid = role = None for line in out.splitlines(): s = line.strip() if s.startswith("registry:"): rid = s.split(":", 1)[1].strip() elif s.startswith("role:"): role = s.split(":", 1)[1].strip() if rid and role: return {"registry_id": rid, "role": role} return None def _pick_random_port(host: str) -> int: import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: s.bind((host, 0)) return s.getsockname()[1] finally: s.close() # ── credentials and admin material (CLI on-disk format, read-only) ──── def _user_dir(userid: str) -> Path: return zkac_home() / userid def load_credential(userid: str, registry_id_hex: str, role_name: str) -> zkac.Credential: """Reconstruct a finalized BBS credential previously stored by ``zkac-node grant``.""" p = _user_dir(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json" if not p.is_file(): raise FileNotFoundError( f"no credential for {role_name!r} on {registry_id_hex[:16]}…; " "ask the registry admin to grant via `zkac-node grant`." ) data = json.loads(p.read_text()) pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["issuer_pk_b64"])) return zkac.Credential.finalize( _b64_decode(data["blind_sig_b64"]), _b64_decode(data["member_secret_b64"]), _b64_decode(data["prover_blind_b64"]), zkac.role_id(data["role_name"]), int(data["epoch"]), pk, ) def load_admin_material(userid: str, registry_id_hex: str) -> dict: p = _user_dir(userid) / "admin" / f"{registry_id_hex}.json" if not p.is_file(): raise FileNotFoundError( f"no admin material for {registry_id_hex[:16]}… under {userid!r}; " "you are not the owner of this registry" ) return json.loads(p.read_text()) def load_admin_credential(userid: str, registry_id_hex: str) -> zkac.Credential: """Reconstruct the registry-admin BBS credential (role = ``admin_role_id``).""" data = load_admin_material(userid, registry_id_hex) pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["bbs_issuer_public_b64"])) return zkac.Credential.finalize( _b64_decode(data["admin_blind_sig_b64"]), _b64_decode(data["admin_member_secret_b64"]), _b64_decode(data["admin_prover_blind_b64"]), zkac.admin_role_id(), 0, pk, ) def is_registry_admin(userid: str, registry_id_hex: str) -> bool: return (_user_dir(userid) / "admin" / f"{registry_id_hex}.json").is_file()