#!/usr/bin/env python3 """ ZKAC file-share demo Textual TUI. Run the headless server in another terminal:: uv run python demo/file_share_server.py --port 9879 Then start the Textual app:: uv run python demo/file_share_tui.py """ from __future__ import annotations import base64 import json import os import shutil import subprocess import sys import time import traceback from pathlib import Path import zkac from textual import events from textual.app import App, ComposeResult from textual.containers import Horizontal, Vertical from textual.screen import ModalScreen from textual.widgets import Button, Footer, Header, Input, Label, Static, TextArea import file_share_client as fsc import file_share_credentials as fscred import zkac_cli_adapter as cli class PromptScreen(ModalScreen[str | None]): AUTO_FOCUS = "#prompt_input" BINDINGS = [ ("escape", "cancel", "Cancel"), ("ctrl+q", "quit_app", "Quit"), ("q", "quit_app", "Quit"), ("ctrl+c", "quit_app", "Quit"), ] def __init__(self, label: str, default: str = "") -> None: super().__init__() self._label = label self._default = default def compose(self) -> ComposeResult: with Vertical(id="prompt_dialog"): yield Label(self._label, id="prompt_label") yield Input(value=self._default, id="prompt_input") with Horizontal(id="prompt_buttons"): yield Button("OK", id="ok", variant="primary") yield Button("Cancel", id="cancel") def on_mount(self) -> None: # Defer focus until after layout settles so typing works immediately. self.call_after_refresh(self._focus_prompt_input) self.set_timer(0.05, self._focus_prompt_input) def _focus_prompt_input(self) -> None: self.query_one("#prompt_input", Input).focus() def _move_button_focus(self, step: int) -> bool: focused = self.focused if not isinstance(focused, Button): return False buttons = list(self.query("#prompt_buttons Button")) if len(buttons) < 2 or focused not in buttons: return False idx = buttons.index(focused) buttons[(idx + step) % len(buttons)].focus() return True def on_key(self, event) -> None: # noqa: ANN001 if event.key == "left" and self._move_button_focus(-1): event.stop() return if event.key == "right" and self._move_button_focus(1): event.stop() return if event.key == "down" and isinstance(self.focused, Input): buttons = list(self.query("#prompt_buttons Button")) if buttons: buttons[0].focus() event.stop() return if event.key == "up" and isinstance(self.focused, Button): self.query_one("#prompt_input", Input).focus() event.stop() def action_cancel(self) -> None: self.dismiss(None) def action_quit_app(self) -> None: self.app.exit() def on_input_submitted(self, event: Input.Submitted) -> None: self.dismiss(event.value.strip()) def on_button_pressed(self, event: Button.Pressed) -> None: if event.button.id == "ok": value = self.query_one("#prompt_input", Input).value.strip() self.dismiss(value) return self.dismiss(None) class ConfirmScreen(ModalScreen[bool]): AUTO_FOCUS = "#yes" BINDINGS = [ ("escape", "cancel", "Cancel"), ("ctrl+q", "quit_app", "Quit"), ("q", "quit_app", "Quit"), ("ctrl+c", "quit_app", "Quit"), ] def __init__(self, label: str) -> None: super().__init__() self._label = label def compose(self) -> ComposeResult: with Vertical(id="prompt_dialog"): yield Label(self._label, id="prompt_label") with Horizontal(id="prompt_buttons"): yield Button("Yes", id="yes", variant="primary") yield Button("No", id="no") def on_mount(self) -> None: self.query_one("#yes", Button).focus() def _move_button_focus(self, step: int) -> bool: focused = self.focused if not isinstance(focused, Button): return False buttons = list(self.query("#prompt_buttons Button")) if len(buttons) < 2 or focused not in buttons: return False idx = buttons.index(focused) buttons[(idx + step) % len(buttons)].focus() return True def on_key(self, event) -> None: # noqa: ANN001 if event.key == "left" and self._move_button_focus(-1): event.stop() return if event.key == "right" and self._move_button_focus(1): event.stop() def on_button_pressed(self, event: Button.Pressed) -> None: self.dismiss(event.button.id == "yes") def action_cancel(self) -> None: self.dismiss(False) def action_quit_app(self) -> None: self.app.exit() class FileShareApp(App[None]): TITLE = "ZKAC File Share" BINDINGS = [ ("ctrl+q", "quit_app", "Quit"), ("q", "quit_app", "Quit"), ("ctrl+c", "quit_app", "Quit"), ("c", "copy_contact", "Copy Contact"), ] CSS = """ #root { padding: 1 2; } #status { border: round $accent; padding: 0 1; height: 4; margin-bottom: 1; } #actions { layout: grid; grid-size: 8 1; grid-gutter: 1 0; height: auto; margin-bottom: 1; } #actions Button { width: 1fr; height: 3; text-align: center; content-align: center middle; } #log { border: round $panel; } #prompt_dialog { width: 70; height: auto; padding: 1 2; border: round $accent; background: $surface; } #prompt_label { margin-bottom: 1; } #prompt_buttons { margin-top: 1; height: auto; } """ def __init__(self) -> None: super().__init__() self.userid: str | None = None self.identity: cli.Identity | None = None self.server: str | None = None self.server_pk_hex: str | None = None self.current_bucket_id: str | None = None self.listener: cli.P2PListener | None = None self._action_running = False self._log_lines: list[str] = [] self._last_clipboard_error = "" self._last_clipboard_backend = "" self._last_contact_bundle = "" self._server_key_maybe_stale_for: str | None = None def compose(self) -> ComposeResult: yield Header(show_clock=True) with Vertical(id="root"): yield Static(id="status") with Vertical(id="actions"): yield Button("Login", id="login", variant="primary") yield Button("Connect", id="connect") yield Button("Select Bucket", id="bucket") yield Button("Permissions", id="permissions") yield Button("Share Permissions", id="share") yield Button("Listen", id="listen") yield Button("Inbox", id="inbox") yield Button("Quit", id="quit", variant="error") yield TextArea("", id="log", read_only=True) yield Footer() def on_mount(self) -> None: self._refresh_status() self.write_log("Textual file-share UI ready.") self.write_log(f"Demo ZKAC_HOME: {cli.zkac_home()}") self.write_log("Use the action buttons to run flows.") self.set_interval(0.8, self._poll_listener) self._update_actions_layout() self.query_one("#login", Button).focus() def on_resize(self, _event: events.Resize) -> None: self._update_actions_layout() def on_unmount(self) -> None: if self.listener: self.listener.stop() def write_log(self, msg: str) -> None: self._log_lines.extend(msg.splitlines() or [""]) log_view = self.query_one("#log", TextArea) log_view.load_text("\n".join(self._log_lines)) if not isinstance(self.focused, TextArea): log_view.scroll_end(animate=False) def _refresh_status(self) -> None: listener_state = "stopped" if self.listener and self.listener.is_running(): listener_state = f"listening on {self.listener.address}" elif self.listener: listener_state = "stopped (last)" status = ( f"user: {self.userid or '-'}\n" f"server: {self.server or '-'}\n" f"bucket: {self.current_bucket_id or '-'}\n" f"p2p-listen: {listener_state}" ) self.query_one("#status", Static).update(status) def _update_actions_layout(self) -> None: actions = self.query_one("#actions") if self.size.width >= 110: actions.styles.grid_size_columns = 8 actions.styles.grid_size_rows = 1 else: actions.styles.grid_size_columns = 4 actions.styles.grid_size_rows = 2 def _move_action_focus(self, step: int) -> bool: focused = self.focused if not isinstance(focused, Button): return False buttons = list(self.query("#actions Button")) if not buttons or focused not in buttons: return False idx = buttons.index(focused) buttons[(idx + step) % len(buttons)].focus() return True def on_key(self, event) -> None: # noqa: ANN001 # Never intercept keys while a modal prompt/confirm screen is active. if self.screen is not self: return if event.key == "left" and self._move_action_focus(-1): event.stop() return if event.key == "right" and self._move_action_focus(1): event.stop() return if event.key == "down" and isinstance(self.focused, Button): self.query_one("#log", TextArea).focus() event.stop() return if event.key == "up" and isinstance(self.focused, TextArea): self.query_one("#login", Button).focus() event.stop() def action_quit_app(self) -> None: self.exit() async def ask(self, label: str, default: str = "") -> str | None: return await self.push_screen_wait(PromptScreen(label, default)) async def confirm(self, label: str) -> bool: return await self.push_screen_wait(ConfirmScreen(label)) def _require_user(self) -> cli.Identity: if not self.userid or not self.identity: raise RuntimeError("no current user; click Login first") return self.identity def _require_server(self) -> tuple[str, str]: if not self.server or not self.server_pk_hex: raise RuntimeError("no current server; click Connect first") return self.server, self.server_pk_hex def _open_session(self, registry_id_hex: str, role_name: str) -> fsc.FileShareSession: ident = self._require_user() server, server_pk_hex = self._require_server() if role_name == "__admin__": credential = cli.load_admin_credential(self.userid or "", registry_id_hex) role_id = zkac.admin_role_id() else: credential = cli.load_credential(self.userid or "", registry_id_hex, role_name) role_id = zkac.role_id(role_name) return fsc.open_session( server, server_pk_hex=server_pk_hex, user_transport_secret=bytes.fromhex(ident.transport_secret_hex), registry_id_hex=registry_id_hex, role_id=role_id, credential=credential, ) async def _choose_index(self, title: str, options: list[str], default: int = 1) -> int: if not options: raise RuntimeError(f"no options for {title}") self.write_log(f"[{title}]") for i, opt in enumerate(options, 1): self.write_log(f" {i}) {opt}") raw = await self.ask(f"{title}: pick number", str(default)) if raw is None: raise RuntimeError("cancelled") if not raw.isdigit(): raise RuntimeError("selection must be a number") idx = int(raw) - 1 if not 0 <= idx < len(options): raise RuntimeError("invalid selection") return idx async def _pick_owned_registry(self) -> str: regs = cli.registry_list(self.userid or "") if not regs: raise RuntimeError("you do not own any registries; create one in Registry") if self.server: regs = [r for r in regs if r["server"] == self.server] or regs opts = [f"{r['registry_id'][:16]}... @ {r['server']} roles={r['roles']}" for r in regs] idx = await self._choose_index("Owned registries", opts) return regs[idx]["registry_id"] def _poll_listener(self) -> None: if not self.listener or self.listener.is_running(): return result = self.listener.parse_received() if result is not None: self.write_log( f"[listener] received credential: registry={result['registry_id'][:16]}... " f"role={result['role']}" ) self.listener = None self._refresh_status() def _log_exception(self, exc: BaseException) -> None: self.write_log(f"error: {exc}") msg = str(exc).lower() if self.server and any(s in msg for s in ("auth failed", "handshake", "public key", "server key")): self._server_key_maybe_stale_for = self.server self.write_log( "[hint] server key may be stale. Use Connect to update the pinned transport key." ) if "--debug" in sys.argv: self.write_log(traceback.format_exc()) def on_button_pressed(self, event: Button.Pressed) -> None: action = event.button.id or "" if action == "quit": self.exit() return if self._action_running: self.write_log("Another action is still running; wait or press q to quit.") return self._action_running = True self.run_worker(self._run_action(action), exclusive=True, thread=False) async def _run_action(self, action: str) -> None: try: if action == "login": await self.menu_login() elif action == "connect": await self.menu_connect() elif action == "bucket": await self.menu_buckets() elif action == "permissions": await self._bucket_set_masks() elif action == "share": await self.menu_share() elif action == "listen": await self.menu_listen() elif action == "inbox": await self.menu_inbox() self._refresh_status() except Exception as exc: # noqa: BLE001 self._log_exception(exc) finally: self._action_running = False async def menu_login(self) -> None: users = cli.list_local_users() userid = "" if users: self.write_log("[login] local users:") for i, u in enumerate(users, 1): self.write_log(f" {i}) {u}") pick = await self.ask("Select number or type new userid") if pick is None or not pick: return if pick.isdigit() and 1 <= int(pick) <= len(users): userid = users[int(pick) - 1] else: userid = pick else: typed = await self.ask("No local users found. New userid") if not typed: return userid = typed if not cli.user_exists(userid): ok = await self.confirm(f"Create user '{userid}' via zkac-node user create?") if not ok: return cli.create_user(userid).raise_for_status() self.write_log(f"[login] created user {userid}") self.userid = userid self.identity = cli.get_identity(userid, peer=None) self.write_log(f"[login] logged in as {userid}") self.write_log(f" issuance pk: {self.identity.issuance_pk_hex}") self.write_log(f" transport pk: {self.identity.transport_pk_hex}") async def menu_identity(self) -> None: ident = self._require_user() peer = await self.ask("Optional peer host:port for contact bundle") contact = cli.show_user_contact(ident.userid, peer=peer or None) self.write_log("[identity]") self.write_log(f" userid: {ident.userid}") self.write_log(f" issuance pk: {ident.issuance_pk_hex}") self.write_log(f" transport pk: {ident.transport_pk_hex}") self.write_log(" contact bundle:") self.write_log(f" {contact}") self._last_contact_bundle = contact if self._copy_to_clipboard(contact): self.write_log( f" copied contact bundle to clipboard via {self._last_clipboard_backend}" ) else: self.write_log( " warning: could not copy contact bundle to clipboard" f" ({self._last_clipboard_error})" ) async def menu_connect(self) -> None: ident = self._require_user() server = await self.ask("File-share server host:port", self.server or "127.0.0.1:9879") if server is None or not server: return pinned = cli.load_pinned_server_key(ident.userid, server) server_pk_hex: str if pinned and self._server_key_maybe_stale_for != server: server_pk_hex = pinned self.write_log(f"[connect] reused pinned key for {server} ({server_pk_hex[:16]}...)") else: prompt = "Server transport public key hex" if pinned: prompt = "Server key may be stale; enter updated transport public key hex" server_pk_hex = await self.ask(prompt, pinned or self.server_pk_hex or "") if server_pk_hex is None or not server_pk_hex: return cli.server_pin(ident.userid, server, server_pk_hex).raise_for_status() self.write_log(f"[connect] pinned {server} = {server_pk_hex[:16]}...") self._server_key_maybe_stale_for = None self.server = server self.server_pk_hex = server_pk_hex async def menu_buckets(self) -> None: op = await self.ask("Select bucket: l=list/select, c=create new", "l") if op is None: return if op == "c": await self._bucket_create() return await self._bucket_select_from_all_access() async def _bucket_select_from_all_access(self) -> None: ident = self._require_user() server, _ = self._require_server() inventory: dict[str, set[str]] = {} # Owned/admin buckets. for reg in cli.registry_list(ident.userid): if reg["server"] != server: continue rid = reg["registry_id"] if not cli.is_registry_admin(ident.userid, rid): continue try: with self._open_session(rid, "__admin__") as sess: for bid in sess.bucket_list_owned(): inventory.setdefault(bid, set()).add("owner") except Exception as exc: # noqa: BLE001 self.write_log(f"[bucket] skip admin bucket list for {rid[:16]}... ({exc})") # Buckets visible via role credentials. for cred in cli.credentials_list(ident.userid): rid = cred["registry_id"] role = cred["role"] try: with self._open_session(rid, role) as sess: for bid in sess.fs_buckets(): inventory.setdefault(bid, set()).add(f"role:{role}") except Exception as exc: # noqa: BLE001 self.write_log( f"[bucket] skip credential registry={rid[:16]}... role={role} ({exc})" ) if not inventory: self.write_log("[bucket] no owned or permitted buckets yet") return bucket_ids = sorted(inventory.keys()) options = [ f"{bid} access={sorted(inventory[bid])}" for bid in bucket_ids ] default = 1 if self.current_bucket_id and self.current_bucket_id in bucket_ids: default = bucket_ids.index(self.current_bucket_id) + 1 idx = await self._choose_index("Buckets", options, default=default) selected = bucket_ids[idx] self.current_bucket_id = selected if selected in fsc.list_manifests(ident.userid): self.write_log(f"[bucket] selected {selected} (locally managed)") else: self.write_log(f"[bucket] selected {selected} (remote-only access)") async def _bucket_create(self) -> None: ident = self._require_user() rid = await self._pick_or_create_registry_for_bucket() folder_str = await self.ask("Path to folder to share") if folder_str is None or not folder_str: return folder = Path(folder_str).expanduser().resolve() if not folder.is_dir(): raise RuntimeError(f"not a directory: {folder}") files = fsc.flatten_folder(folder) if not files: raise RuntimeError("no shareable files found (empty or hidden only)") self.write_log(f"[bucket-create] flattened {len(files)} files") for i, p in enumerate(files): self.write_log(f" [{i:>3}] {p.relative_to(folder)}") roles_meta = next( (r for r in cli.registry_list(ident.userid) if r["registry_id"] == rid), None, ) if roles_meta is None: raise RuntimeError("registry metadata not found locally") masks: dict[str, str] = {} for role in roles_meta["roles"]: raw = await self.ask( f"Mask for role '{role}' ({len(files)} bits)", "1" * len(files), ) if raw is None: return masks[role] = fsc.normalize_mask(raw, len(files)) with self._open_session(rid, "__admin__") as sess: self.write_log("[bucket-create] uploading encrypted blobs...") manifest = fsc.upload_bucket( sess, folder, server=self.server or "", registry_id_hex=rid, ) manifest.role_masks = masks with self._open_session(rid, "__admin__") as sess: fsc.apply_role_masks_to_server(sess, manifest) self._log_server_acl_summary(sess, manifest) fsc.save_manifest(ident.userid, manifest) self.current_bucket_id = manifest.bucket_id self.write_log(f"[bucket-create] uploaded {manifest.bucket_id} ({len(manifest.files)} files)") async def _select_local_manifest(self) -> fsc.BucketManifest: ident = self._require_user() bids = fsc.list_manifests(ident.userid) if not bids: raise RuntimeError("no local bucket manifests") options: list[str] = [] for bid in bids: man = fsc.load_manifest(ident.userid, bid) options.append( f"{bid[:16]}... files={len(man.files)} roles={sorted(man.role_masks.keys())}" ) default = 1 if self.current_bucket_id and self.current_bucket_id in bids: default = bids.index(self.current_bucket_id) + 1 idx = await self._choose_index("Select bucket", options, default=default) return fsc.load_manifest(ident.userid, bids[idx]) async def _bucket_set_masks(self) -> None: ident = self._require_user() manifest = await self._select_local_manifest() self.write_log(f"[bucket-masks] {manifest.bucket_id} has {len(manifest.files)} files") for i, f in enumerate(manifest.files): self.write_log(f" [{i:>3}] {f.rel_path}") roles = sorted(manifest.role_masks.keys() or []) if not roles: roles_raw = await self.ask("Comma-separated role names", "viewer,editor") if not roles_raw: return roles = [r.strip() for r in roles_raw.split(",") if r.strip()] for role in roles: current = manifest.role_masks.get(role, "1" * len(manifest.files)) new = await self.ask(f"Mask for role '{role}'", current) if new is None: return manifest.role_masks[role] = fsc.normalize_mask(new, len(manifest.files)) with self._open_session(manifest.registry_id_hex, "__admin__") as sess: fsc.apply_role_masks_to_server(sess, manifest) self._log_server_acl_summary(sess, manifest) fsc.save_manifest(ident.userid, manifest) self.current_bucket_id = manifest.bucket_id self.write_log("[permissions] bucket permissions updated") async def _pick_or_create_registry_for_bucket(self) -> str: ident = self._require_user() server, _ = self._require_server() regs = [r for r in cli.registry_list(ident.userid) if r["server"] == server] if regs: options = [ f"use {r['registry_id'][:16]}... roles={r['roles']}" for r in regs ] options.append("create new registry for this server") idx = await self._choose_index("Bucket auth profile", options) if idx < len(regs): return regs[idx]["registry_id"] roles_raw = await self.ask("No auth profile yet. Create roles", "viewer,editor") if not roles_raw: raise RuntimeError("roles required to create bucket auth profile") roles = [r.strip() for r in roles_raw.split(",") if r.strip()] if not roles: raise RuntimeError("no roles provided") rid = cli.registry_create(ident.userid, server, roles) self.write_log(f"[bucket] created auth profile {rid} roles={roles}") return rid async def menu_share(self) -> None: ident = self._require_user() manifest = await self._select_local_manifest() if not manifest.role_masks: raise RuntimeError("set role masks first from Buckets") roles = sorted(manifest.role_masks.keys()) idx = await self._choose_index("Share role", roles) role = roles[idx] contact = await self.ask("Recipient contact bundle") if contact is None or not contact: return result = fscred.grant_role_p2p( ident.userid, manifest.server, manifest.registry_id_hex, role, contact, ) self.write_log(f"[share] issued role '{role}' to peer {result['peer']}") recipient_pk_hex = _parse_contact_issuance_pk(contact) with self._open_session(manifest.registry_id_hex, "__admin__") as sess: fsc.apply_role_masks_to_server(sess, manifest) self._log_server_acl_summary(sess, manifest) fsc.push_role_grant(sess, manifest, role, recipient_pk_hex) fsc.save_manifest(ident.userid, manifest) self.current_bucket_id = manifest.bucket_id self.write_log("[share] uploaded anonymous role-grant envelope (no recipient identifier on server)") async def menu_listen(self) -> None: ident = self._require_user() if self.listener and self.listener.is_running(): keep = await self.confirm( f"Listener already running on {self.listener.address}. Stop it?" ) if keep: self.listener.stop() self.listener = None else: return host = await self.ask("Bind host", "127.0.0.1") if host is None or not host: return raw_port = await self.ask("Optional bind port (blank = random)", "") if raw_port is None: return port = 0 if raw_port.strip(): if not raw_port.isdigit(): raise RuntimeError("port must be a number or blank") port = int(raw_port) if not 1 <= port <= 65535: raise RuntimeError("port out of range") listener = cli.P2PListener(ident.userid, host=host, port=port) listener.start() time.sleep(0.3) if not listener.is_running(): self.write_log(f"[listen] listener exited:\n{listener.output()}") return contact = cli.show_user_contact(ident.userid, peer=listener.address) self.listener = listener self.write_log(f"[listen] listening on {listener.address} timeout={listener.timeout_s:.0f}s") self.write_log("Share this contact bundle out-of-band:") self.write_log(contact) self._last_contact_bundle = contact if self._copy_to_clipboard(contact): self.write_log( f"[listen] copied contact bundle to clipboard via {self._last_clipboard_backend}" ) else: self.write_log( "[listen] warning: could not copy contact bundle to clipboard" f" ({self._last_clipboard_error})" ) async def menu_inbox(self) -> None: ident = self._require_user() creds: list[dict[str, str]] = list(cli.credentials_list(ident.userid)) for reg in cli.registry_list(ident.userid): rid = reg["registry_id"] if cli.is_registry_admin(ident.userid, rid): creds.append({"registry_id": rid, "role": "__admin__"}) if not creds: self.write_log("[inbox] no local credentials; ask an admin to grant one") return unique: dict[tuple[str, str], dict[str, str]] = {} for c in creds: unique[(c["registry_id"], c["role"])] = c creds = list(unique.values()) opts = [f"registry={c['registry_id'][:16]}... role={c['role']}" for c in creds] idx = await self._choose_index("Credential for inbox", opts) cred = creds[idx] with self._open_session(cred["registry_id"], cred["role"]) as sess: who = sess.whoami() self.write_log( "[inbox] authenticated " f"scope={who.get('auth_scope', 'unknown')} " f"admin={bool(who.get('is_admin', False))}" ) if cred["role"] == "__admin__": bids = sess.bucket_list_owned() else: bids = sess.fs_buckets() if not bids: self.write_log("[inbox] no accessible buckets") return self.write_log("[inbox] accessible buckets:") for i, bid in enumerate(bids, 1): self.write_log(f" {i}) {bid}") pick = await self.ask("Download bucket number, 'all', or blank to skip", "1") if pick is None or not pick: return if pick == "all": targets = list(range(len(bids))) elif pick.isdigit(): targets = [int(pick) - 1] else: raise RuntimeError("invalid selection") for ti in targets: if not 0 <= ti < len(bids): raise RuntimeError("bucket index out of range") out_default = str(Path.cwd() / "fs_inbox") out = await self.ask("Download root folder", out_default) if out is None or not out: return out_root = Path(out).expanduser().resolve() for ti in targets: bid = bids[ti] bucket_dir = out_root / bid if cred["role"] == "__admin__": result = self._download_owned_bucket_as_admin( sess=sess, userid=ident.userid, bucket_id=bid, output_dir=bucket_dir, ) else: result = fsc.download_bucket( sess, bid, issuance_secret_hex=ident.issuance_secret_hex, output_dir=bucket_dir, ) self.write_log( f"[inbox] downloaded {bid} -> {bucket_dir} " f"({len(result['files_written'])} files)" ) def _download_owned_bucket_as_admin( self, *, sess: fsc.FileShareSession, userid: str, bucket_id: str, output_dir: Path, ) -> dict: manifest = fsc.load_manifest(userid, bucket_id) output_dir.mkdir(parents=True, exist_ok=True) written: list[str] = [] for entry in manifest.files: ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks] plaintext = fsc.decrypt_file_from_blobs(entry, ciphertexts) out_path = output_dir / entry.rel_path out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_bytes(plaintext) written.append(str(out_path)) return {"files_written": written} def _log_server_acl_summary(self, sess: fsc.FileShareSession, manifest: fsc.BucketManifest) -> None: for role_name, raw_mask in sorted(manifest.role_masks.items()): expected = sum(1 for bit in fsc.normalize_mask(raw_mask, len(manifest.files)) if bit == "1") role_id_hex = zkac.role_id(role_name).hex() acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex) allowed_blob_ids = list(acl_meta.get("allowed_blob_ids", [])) acl_version = int(acl_meta.get("version", 0)) self.write_log( f"[permissions] server ACL role={role_name} " f"version={acl_version} files(expected)={expected} blobs(server)={len(allowed_blob_ids)}" ) def _copy_to_clipboard(self, text: str) -> bool: text = text.strip() if not text: self._last_clipboard_error = "empty text" self._last_clipboard_backend = "" return False is_st = os.environ.get("TERM", "").startswith("st") # In st, prefer explicit system clipboard tools; Textual clipboard often reports # success but doesn't reach the desktop clipboard. if not is_st: try: self.copy_to_clipboard(text) self._last_clipboard_error = "" self._last_clipboard_backend = "textual" return True except Exception: self._last_clipboard_error = "Textual clipboard unavailable" def _run_clip(cmd: list[str], payload: str) -> subprocess.CompletedProcess[str] | None: try: return subprocess.run( cmd, input=payload, text=True, capture_output=True, timeout=0.8, ) except subprocess.TimeoutExpired: self._last_clipboard_error = f"{cmd[0]} timed out" return None except Exception as exc: self._last_clipboard_error = f"{cmd[0]} failed: {exc}" return None # Wayland if shutil.which("wl-copy") and os.environ.get("WAYLAND_DISPLAY"): p = _run_clip(["wl-copy"], text) if p and p.returncode == 0: self._last_clipboard_error = "" self._last_clipboard_backend = "wl-copy" return True if p: self._last_clipboard_error = (p.stderr or p.stdout or "wl-copy failed").strip() # X11: prefer xsel first (often exits faster), then xclip. if shutil.which("xsel") and os.environ.get("DISPLAY"): p1 = _run_clip(["xsel", "--clipboard", "--input"], text) p2 = _run_clip(["xsel", "--primary", "--input"], text) if (p1 and p1.returncode == 0) or (p2 and p2.returncode == 0): self._last_clipboard_error = "" self._last_clipboard_backend = "xsel" return True if p1 or p2: self._last_clipboard_error = ( (p1.stderr if p1 else "") or (p2.stderr if p2 else "") or (p1.stdout if p1 else "") or (p2.stdout if p2 else "") or "xsel failed" ).strip() if shutil.which("xclip") and os.environ.get("DISPLAY"): p1 = _run_clip(["xclip", "-selection", "clipboard", "-in"], text) p2 = _run_clip(["xclip", "-selection", "primary", "-in"], text) if (p1 and p1.returncode == 0) or (p2 and p2.returncode == 0): self._last_clipboard_error = "" self._last_clipboard_backend = "xclip" return True if p1 or p2: self._last_clipboard_error = ( (p1.stderr if p1 else "") or (p2.stderr if p2 else "") or (p1.stdout if p1 else "") or (p2.stdout if p2 else "") or "xclip failed" ).strip() if shutil.which("pbcopy"): p = _run_clip(["pbcopy"], text) if p and p.returncode == 0: self._last_clipboard_error = "" self._last_clipboard_backend = "pbcopy" return True if p: self._last_clipboard_error = (p.stderr or p.stdout or "pbcopy failed").strip() # Final fallback for terminals that support OSC52 clipboard control. try: encoded = base64.b64encode(text.encode("utf-8")).decode("ascii") sys.__stdout__.write(f"\033]52;c;{encoded}\a") sys.__stdout__.flush() self._last_clipboard_error = "" self._last_clipboard_backend = "osc52" return True except Exception as exc: self._last_clipboard_error = f"OSC52 failed: {exc}" self._last_clipboard_backend = "" if is_st: self._last_clipboard_error = ( "st detected; install xsel/xclip and ensure DISPLAY is set" ) return False def action_copy_contact(self) -> None: if not self._last_contact_bundle: self.write_log("[copy-contact] no contact bundle generated yet") return if self._copy_to_clipboard(self._last_contact_bundle): self.write_log( f"[copy-contact] copied latest contact bundle via {self._last_clipboard_backend}" ) return self.write_log( "[copy-contact] failed to copy latest contact bundle" f" ({self._last_clipboard_error})" ) def _parse_contact_issuance_pk(bundle: str) -> str: s = bundle.strip() raw = base64.urlsafe_b64decode((s + "=" * (-len(s) % 4)).encode()) data = json.loads(raw.decode("utf-8")) pk = data.get("issuance_pk_hex") if not isinstance(pk, str) or len(pk) != 64: raise RuntimeError("contact bundle missing issuance_pk_hex") return pk def main() -> None: FileShareApp().run() if __name__ == "__main__": main()