diff --git a/.gitignore b/.gitignore index e9b3e99..66ab9fc 100644 --- a/.gitignore +++ b/.gitignore @@ -24,3 +24,13 @@ cli/zkac_cli/__pycache__ # wasm-pack output for browser demo demo/static/pkg/ + +# file-share demo runtime / local-only artifacts +demo/__pycache__/ +demo/fs_data/ +demo/.demo_state/ +demo/creds/ +demo/test_bucket/ + +# Workflow kept locally only (not versioned) +.github/workflows/fuzz-smoke.yml diff --git a/cli/zkac_node.egg-info/SOURCES.txt b/cli/zkac_node.egg-info/SOURCES.txt index 75468e6..9e29da1 100644 --- a/cli/zkac_node.egg-info/SOURCES.txt +++ b/cli/zkac_node.egg-info/SOURCES.txt @@ -2,9 +2,11 @@ README.md pyproject.toml zkac_cli/__init__.py zkac_cli/client.py +zkac_cli/i2p_serve.py zkac_cli/main.py zkac_cli/paths.py zkac_cli/server.py +zkac_cli/server_debug.py zkac_cli/store.py zkac_node.egg-info/PKG-INFO zkac_node.egg-info/SOURCES.txt diff --git a/cli/zkac_node.egg-info/entry_points.txt b/cli/zkac_node.egg-info/entry_points.txt index e0a11c4..9029a1b 100644 --- a/cli/zkac_node.egg-info/entry_points.txt +++ b/cli/zkac_node.egg-info/entry_points.txt @@ -1,2 +1,3 @@ [console_scripts] zkac-node = zkac_cli.main:main +zkac-node-i2p-server = zkac_cli.i2p_serve:main diff --git a/demo/README.md b/demo/README.md new file mode 100644 index 0000000..4509655 --- /dev/null +++ b/demo/README.md @@ -0,0 +1,49 @@ +# ZKAC File-Share Demo + +This folder contains only the self-contained Textual file-share demo. + +## Files + +- `demo/file_share_server.py`: headless opaque server (registry mgmt + file-share channel). +- `demo/file_share_client.py`: upload/download + role-mask utilities. +- `demo/file_share_credentials.py`: P2P credential grant helper. +- `demo/file_share_tui.py`: Textual UI. +- `demo/zkac_cli_adapter.py`: subprocess bridge to `zkac-node`. +- `demo/file_share_smoke.py`: end-to-end smoke test. +- `demo/test_demo_privacy_guardrails.py`: pytest privacy regressions for the demo. + +## Run + +```bash +uv sync --extra demo +uv run python demo/file_share_server.py --port 9879 +uv run python demo/file_share_tui.py +``` + +The demo uses `ZKAC_HOME=~/.ZKAC-FS` by default, so it stays isolated from other +local ZKAC usage. + +## UI Flow + +- `Login` +- `Connect` (reuses pinned server key when available) +- `Select Bucket` (list owned + permitted buckets, or create new) +- `Permissions` (edit per-role bitmask) +- `Share Permissions` +- `Listen` (optional port; blank means random) +- `Inbox` + +`c` copies the latest generated contact bundle to clipboard. + +## Verify + +```bash +uv run python demo/file_share_smoke.py +pytest demo/test_demo_privacy_guardrails.py +``` + +## Future Work + +- Further reduce at-rest metadata by removing persisted raw role-id indexes used + for proof candidate discovery after restart, while preserving reliable auth + recovery semantics. diff --git a/demo/creds/.gitignore b/demo/creds/.gitignore deleted file mode 100644 index d6b7ef3..0000000 --- a/demo/creds/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -* -!.gitignore diff --git a/demo/file_share_client.py b/demo/file_share_client.py new file mode 100644 index 0000000..9f71228 --- /dev/null +++ b/demo/file_share_client.py @@ -0,0 +1,598 @@ +""" +ZKAC file-share client library (demo). + +Reusable, UI-agnostic helpers used by ``file_share_tui.py`` and the smoke test +harness. The library performs: + +* deterministic folder flattening and per-file content-key generation, +* per-role visibility bitmasks over the flattened index, +* opaque blob uploads to ``file_share_server.py``, +* role-credential authenticated download + decrypt against the same server. + +Authenticated sessions piggy-back on the ZKAC TCP framing/handshake from +``zkac.tcp``; the ``zkac-node`` CLI is only used (separately) for identity, +registry and direct P2P credential grants. +""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import os +import socket +import uuid +from dataclasses import dataclass, field +from pathlib import Path +from typing import Iterable, Iterator + +import zkac +from zkac.tcp import FramedSession, client_handshake_anon + +CHUNK_SIZE = 128 * 1024 # plaintext bytes per stored blob (well under MAX_TCP_FRAME_BYTES) +DEFAULT_CONNECT_TIMEOUT_S = 8.0 + + +# ── small helpers ──────────────────────────────────────────────────── + +def _b64(data: bytes) -> str: + return base64.b64encode(data).decode("ascii") + + +def _unb64(s: str) -> bytes: + return base64.b64decode(s) + + +def _parse_server(server: str) -> tuple[str, int]: + host, sep, port_s = server.rpartition(":") + if not sep: + raise ValueError(f"invalid server {server!r}, expected host:port") + port = int(port_s, 10) + if not 1 <= port <= 65535: + raise ValueError(f"port out of range: {port}") + return (host or "127.0.0.1"), port + + +# ── data classes ───────────────────────────────────────────────────── + +@dataclass +class FileChunk: + blob_id: str + eph_pk_b64: str # ephemeral pubkey used by the encrypt step (HPKE-style) + + +@dataclass +class FileEntry: + rel_path: str + size: int + sha256_b64: str + content_secret_b64: str # 32-byte X25519 secret -> derives the file's recipient kp + chunks: list[FileChunk] = field(default_factory=list) + + def to_grant_dict(self) -> dict: + return { + "rel_path": self.rel_path, + "size": self.size, + "sha256_b64": self.sha256_b64, + "content_secret_b64": self.content_secret_b64, + "chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in self.chunks], + } + + @classmethod + def from_grant_dict(cls, data: dict) -> "FileEntry": + return cls( + rel_path=data["rel_path"], + size=int(data["size"]), + sha256_b64=data["sha256_b64"], + content_secret_b64=data["content_secret_b64"], + chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in data["chunks"]], + ) + + +@dataclass +class BucketManifest: + """Admin-side bucket manifest, persisted locally on the bucket creator.""" + + bucket_id: str + server: str + registry_id_hex: str + files: list[FileEntry] + role_masks: dict[str, str] = field(default_factory=dict) # role -> bitmask string ('0'/'1') + recipients: list[dict] = field(default_factory=list) # local audit log only (no server identity binding) + + def to_dict(self) -> dict: + return { + "bucket_id": self.bucket_id, + "server": self.server, + "registry_id_hex": self.registry_id_hex, + "files": [ + { + "rel_path": f.rel_path, + "size": f.size, + "sha256_b64": f.sha256_b64, + "content_secret_b64": f.content_secret_b64, + "chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in f.chunks], + } + for f in self.files + ], + "role_masks": self.role_masks, + "recipients": self.recipients, + } + + @classmethod + def from_dict(cls, data: dict) -> "BucketManifest": + files = [ + FileEntry( + rel_path=fd["rel_path"], + size=int(fd["size"]), + sha256_b64=fd["sha256_b64"], + content_secret_b64=fd["content_secret_b64"], + chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in fd["chunks"]], + ) + for fd in data.get("files", []) + ] + return cls( + bucket_id=data["bucket_id"], + server=data["server"], + registry_id_hex=data["registry_id_hex"], + files=files, + role_masks=dict(data.get("role_masks", {})), + recipients=list(data.get("recipients", [])), + ) + + +# ── folder flattening + chunking ───────────────────────────────────── + +def flatten_folder(folder: Path) -> list[Path]: + """Deterministic relative-path order: sorted, files only, skipping hidden entries.""" + folder = folder.resolve() + if not folder.is_dir(): + raise NotADirectoryError(folder) + out: list[Path] = [] + for path in sorted(folder.rglob("*")): + if not path.is_file(): + continue + if any(part.startswith(".") for part in path.relative_to(folder).parts): + continue + out.append(path) + return sorted(out, key=lambda p: str(p.relative_to(folder))) + + +def _read_chunks(path: Path, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]: + with path.open("rb") as fh: + while True: + buf = fh.read(chunk_size) + if not buf: + return + yield buf + + +def _sha256_file(path: Path) -> bytes: + h = hashlib.sha256() + for buf in _read_chunks(path, 64 * 1024): + h.update(buf) + return h.digest() + + +def encrypt_file_to_blobs(path: Path) -> tuple[FileEntry, list[bytes]]: + """Generate a per-file content keypair, encrypt each chunk to its public key. + + Each chunk uses ``zkac.encrypt_for_admin`` so the matching + ``IssuanceKeypair.decrypt`` on the recipient side succeeds (both bind the + ``user->admin`` HKDF label). + + Returns the file entry plus the parallel list of ciphertext blobs. + """ + content_kp = zkac.IssuanceKeypair() + content_pk = bytes(content_kp.public_key_bytes()) + content_sk = bytes(content_kp.secret_bytes()) + sha = _sha256_file(path) + size = path.stat().st_size + entry = FileEntry( + rel_path="", + size=size, + sha256_b64=_b64(sha), + content_secret_b64=_b64(content_sk), + ) + blobs: list[bytes] = [] + for chunk in _read_chunks(path): + eph_pk, ciphertext = zkac.encrypt_for_admin(content_pk, chunk) + entry.chunks.append(FileChunk(blob_id=uuid.uuid4().hex, eph_pk_b64=_b64(bytes(eph_pk)))) + blobs.append(bytes(ciphertext)) + return entry, blobs + + +def decrypt_file_from_blobs(entry: FileEntry, blobs: Iterable[bytes]) -> bytes: + """Reassemble + verify a file from its (already fetched) ciphertext blobs.""" + content_kp = zkac.IssuanceKeypair.from_secret(_unb64(entry.content_secret_b64)) + out = bytearray() + for chunk_meta, ciphertext in zip(entry.chunks, blobs): + out.extend(bytes(content_kp.decrypt(_unb64(chunk_meta.eph_pk_b64), ciphertext))) + if hashlib.sha256(bytes(out)).digest() != _unb64(entry.sha256_b64): + raise RuntimeError(f"hash mismatch for {entry.rel_path!r}") + return bytes(out) + + +# ── role bitmasks ──────────────────────────────────────────────────── + +def normalize_mask(mask: str, file_count: int) -> str: + """Pad/truncate a bitmask string of '0'/'1' to ``file_count`` and validate.""" + cleaned = "".join(c for c in mask if c in "01") + if len(cleaned) > file_count: + raise ValueError(f"mask {mask!r} longer than {file_count} files") + cleaned = cleaned.ljust(file_count, "0") + return cleaned + + +def files_visible_to_mask(files: list[FileEntry], mask: str) -> list[FileEntry]: + return [f for f, bit in zip(files, mask) if bit == "1"] + + +def encode_grant_payload( + bucket_id: str, + role_name: str, + server: str, + registry_id_hex: str, + visible_files: list[FileEntry], +) -> bytes: + payload = { + "bucket_id": bucket_id, + "role_name": role_name, + "server": server, + "registry_id_hex": registry_id_hex, + "files": [f.to_grant_dict() for f in visible_files], + } + return json.dumps(payload, separators=(",", ":")).encode("utf-8") + + +def encrypt_grant_to_recipient(payload: bytes, recipient_issuance_pk_hex: str) -> tuple[str, str]: + """User->admin direction sealed box: ``zkac.encrypt_for_admin`` -> (eph_pk_b64, ciphertext_b64).""" + rec_pk = bytes.fromhex(recipient_issuance_pk_hex) + if len(rec_pk) != 32: + raise ValueError("recipient issuance public key must be 32 bytes") + eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk, payload) + return _b64(bytes(eph_pk)), _b64(bytes(ciphertext)) + + +def decrypt_grant_for_recipient( + eph_pk_b64: str, + ciphertext_b64: str, + issuance_secret_hex: str, +) -> dict: + secret = bytes.fromhex(issuance_secret_hex) + if len(secret) != 32: + raise ValueError("issuance secret must be 32 bytes") + receiver = zkac.IssuanceKeypair.from_secret(secret) + plaintext = bytes(receiver.decrypt(_unb64(eph_pk_b64), _unb64(ciphertext_b64))) + return json.loads(plaintext.decode("utf-8")) + + +# ── encrypted authenticated session to file-share server ───────────── + +class FileShareSession: + """Anonymous handshake + ``op:'fs'`` BBS+ presentation -> framed JSON RPC.""" + + def __init__(self, sock: socket.socket, framed: FramedSession) -> None: + self._sock = sock + self._framed = framed + + def _call(self, cmd: dict) -> dict: + self._framed.send(json.dumps(cmd).encode("utf-8")) + resp = json.loads(self._framed.recv()) + if resp.get("error"): + raise RuntimeError(resp["error"]) + return resp + + def close(self) -> None: + try: + self._sock.close() + except OSError: + pass + + def __enter__(self) -> "FileShareSession": + return self + + def __exit__(self, *exc: object) -> None: + self.close() + + # admin commands + + def bucket_create(self, bucket_id: str | None = None) -> str: + cmd = {"cmd": "bucket_create"} + if bucket_id is not None: + cmd["bucket_id"] = bucket_id + return self._call(cmd)["bucket_id"] + + def bucket_put_blob(self, bucket_id: str, blob_id: str, ciphertext: bytes) -> None: + self._call({ + "cmd": "bucket_put_blob", + "bucket_id": bucket_id, + "blob_id": blob_id, + "ciphertext_b64": _b64(ciphertext), + }) + + def bucket_set_role_acl(self, bucket_id: str, role_id_hex: str, allowed_blob_ids: list[str]) -> None: + self._call({ + "cmd": "bucket_set_role_acl", + "bucket_id": bucket_id, + "role_id_hex": role_id_hex, + "allowed_blob_ids": allowed_blob_ids, + }) + + def bucket_put_role_grant( + self, + bucket_id: str, + role_id_hex: str, + acl_version: int, + eph_pk_b64: str, + ciphertext_b64: str, + ) -> None: + self._call({ + "cmd": "bucket_put_role_grant", + "bucket_id": bucket_id, + "role_id_hex": role_id_hex, + "acl_version": int(acl_version), + "eph_pk_b64": eph_pk_b64, + "ciphertext_b64": ciphertext_b64, + }) + + def bucket_get_role_acl(self, bucket_id: str, role_id_hex: str) -> dict: + return self._call({ + "cmd": "bucket_get_role_acl", + "bucket_id": bucket_id, + "role_id_hex": role_id_hex, + }) + + def bucket_finalize(self, bucket_id: str) -> None: + self._call({"cmd": "bucket_finalize", "bucket_id": bucket_id}) + + def bucket_delete(self, bucket_id: str) -> None: + self._call({"cmd": "bucket_delete", "bucket_id": bucket_id}) + + def bucket_list_owned(self) -> list[str]: + return self._call({"cmd": "bucket_list_owned"})["bucket_ids"] + + # any-role commands + + def whoami(self) -> dict: + return self._call({"cmd": "whoami"}) + + def fs_buckets(self) -> list[str]: + return self._call({"cmd": "fs_buckets"})["bucket_ids"] + + def fs_get_role_grant(self, bucket_id: str) -> dict: + return self._call({"cmd": "fs_get_role_grant", "bucket_id": bucket_id}) + + def fs_get_blob(self, bucket_id: str, blob_id: str) -> bytes: + return _unb64(self._call({"cmd": "fs_get_blob", "bucket_id": bucket_id, "blob_id": blob_id})["ciphertext_b64"]) + + +def open_session( + server: str, + *, + server_pk_hex: str, + user_transport_secret: bytes, + registry_id_hex: str, + role_id: bytes, + credential: "zkac.Credential", + connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S, +) -> FileShareSession: + """Connect, anonymous-handshake, then present a BBS+ proof bound to the transcript.""" + host, port = _parse_server(server) + sock = socket.create_connection((host, port), timeout=connect_timeout_s) + sock.settimeout(None) + try: + node = zkac.Node(zkac.Keypair.from_secret_key(user_transport_secret)) + server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex)) + session = client_handshake_anon(sock, node, server_pk) + framed = FramedSession(sock, session) + transcript_hash = bytes(session.transcript_hash()) + bbs_auth_proof = bytes(credential.present(transcript_hash)) + framed.send(json.dumps({ + "op": "fs", + "bbs_auth_b64": _b64(bbs_auth_proof), + }).encode("utf-8")) + hello = json.loads(framed.recv()) + if hello.get("error"): + raise RuntimeError(hello["error"]) + return FileShareSession(sock, framed) + except Exception: + sock.close() + raise + + +# ── higher-level admin helpers ─────────────────────────────────────── + +def upload_bucket( + sess: FileShareSession, + folder: Path, + *, + server: str, + registry_id_hex: str, + bucket_id: str | None = None, +) -> BucketManifest: + """Encrypt every file under ``folder`` and stream blobs to the server.""" + bid = sess.bucket_create(bucket_id) + paths = flatten_folder(folder) + files: list[FileEntry] = [] + for p in paths: + entry, blobs = encrypt_file_to_blobs(p) + entry.rel_path = str(p.relative_to(folder.resolve())) + for chunk_meta, ciphertext in zip(entry.chunks, blobs): + sess.bucket_put_blob(bid, chunk_meta.blob_id, ciphertext) + files.append(entry) + sess.bucket_finalize(bid) + return BucketManifest( + bucket_id=bid, + server=server, + registry_id_hex=registry_id_hex, + files=files, + ) + + +def apply_role_masks_to_server(sess: FileShareSession, manifest: BucketManifest) -> None: + """Push per-role blob ACLs so server enforces mask on fs_get_blob.""" + if not manifest.role_masks: + return + for role_name, raw_mask in manifest.role_masks.items(): + mask = normalize_mask(raw_mask, len(manifest.files)) + visible = files_visible_to_mask(manifest.files, mask) + allowed_blob_ids = [c.blob_id for f in visible for c in f.chunks] + sess.bucket_set_role_acl( + manifest.bucket_id, + zkac.role_id(role_name).hex(), + allowed_blob_ids, + ) + + +def push_role_grant( + sess: FileShareSession, + manifest: BucketManifest, + role_name: str, + recipient_issuance_pk_hex: str, +) -> None: + """Upload anonymous encrypted role grant envelope (no recipient identifier stored).""" + if role_name not in manifest.role_masks: + raise RuntimeError(f"role {role_name!r} has no visibility mask in this bucket") + mask = normalize_mask(manifest.role_masks[role_name], len(manifest.files)) + visible = files_visible_to_mask(manifest.files, mask) + role_id_hex = zkac.role_id(role_name).hex() + acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex) + acl_version = int(acl_meta.get("version", 0)) + if acl_version <= 0: + raise RuntimeError("server ACL missing for role; apply permissions before sharing") + payload = encode_grant_payload( + manifest.bucket_id, role_name, manifest.server, manifest.registry_id_hex, visible, + ) + eph_pk_b64, ct_b64 = encrypt_grant_to_recipient(payload, recipient_issuance_pk_hex) + sess.bucket_put_role_grant( + manifest.bucket_id, + role_id_hex, + acl_version, + eph_pk_b64, + ct_b64, + ) + manifest.recipients.append({ + "role_name": role_name, + "recipient_hint": recipient_issuance_pk_hex[:16], + }) + + +def download_bucket( + sess: FileShareSession, + bucket_id: str, + *, + issuance_secret_hex: str, + role_grant_payload: dict | None = None, + output_dir: Path, +) -> dict: + """Fetch + decrypt files from a role grant, optionally fetched from server.""" + payload = role_grant_payload + if payload is None: + grants = sess.fs_get_role_grant(bucket_id).get("grants", []) + for grant in grants: + try: + payload = decrypt_grant_for_recipient( + grant["eph_pk_b64"], + grant["ciphertext_b64"], + issuance_secret_hex, + ) + break + except Exception: + continue + if payload is None: + raise RuntimeError("no decryptable role grant for this credential") + if payload.get("bucket_id") != bucket_id: + raise RuntimeError("role grant bucket_id mismatch") + output_dir.mkdir(parents=True, exist_ok=True) + written: list[str] = [] + for fd in payload.get("files", []): + entry = FileEntry.from_grant_dict(fd) + ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks] + plaintext = decrypt_file_from_blobs(entry, ciphertexts) + out_path = output_dir / entry.rel_path + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(plaintext) + written.append(str(out_path)) + return {"role_name": payload.get("role_name"), "files_written": written} + + +def _received_grant_path(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> Path: + return state_dir(userid) / "received_grants" / registry_id_hex / role_name / f"{bucket_id}.json" + + +def save_received_role_grant( + userid: str, + registry_id_hex: str, + role_name: str, + bucket_id: str, + *, + eph_pk_b64: str, + ciphertext_b64: str, +) -> Path: + path = _received_grant_path(userid, registry_id_hex, role_name, bucket_id) + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(json.dumps({ + "registry_id_hex": registry_id_hex, + "role_name": role_name, + "bucket_id": bucket_id, + "eph_pk_b64": eph_pk_b64, + "ciphertext_b64": ciphertext_b64, + }, indent=2)) + try: + os.chmod(path, 0o600) + except OSError: + pass + return path + + +def list_received_role_grants(userid: str, registry_id_hex: str, role_name: str) -> list[str]: + base = state_dir(userid) / "received_grants" / registry_id_hex / role_name + if not base.is_dir(): + return [] + return sorted(p.stem for p in base.glob("*.json")) + + +def load_received_role_grant(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> dict: + return json.loads(_received_grant_path(userid, registry_id_hex, role_name, bucket_id).read_text()) + + +# ── local persistence (admin manifests) ────────────────────────────── + +def state_dir(userid: str) -> Path: + # Keep demo state fully self-contained with demo ZKAC_HOME, rather than + # mixing with repository-local legacy state directories. + base_home = Path(os.environ.get("ZKAC_HOME", Path.home() / ".ZKAC-FS")) + base = base_home / userid / "file_share" + base.mkdir(parents=True, exist_ok=True) + try: + os.chmod(base, 0o700) + except OSError: + pass + return base + + +def manifest_path(userid: str, bucket_id: str) -> Path: + return state_dir(userid) / "buckets" / f"{bucket_id}.json" + + +def save_manifest(userid: str, manifest: BucketManifest) -> Path: + p = manifest_path(userid, manifest.bucket_id) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps(manifest.to_dict(), indent=2)) + try: + os.chmod(p, 0o600) + except OSError: + pass + return p + + +def load_manifest(userid: str, bucket_id: str) -> BucketManifest: + return BucketManifest.from_dict(json.loads(manifest_path(userid, bucket_id).read_text())) + + +def list_manifests(userid: str) -> list[str]: + base = state_dir(userid) / "buckets" + if not base.is_dir(): + return [] + return sorted(p.stem for p in base.glob("*.json")) diff --git a/demo/file_share_credentials.py b/demo/file_share_credentials.py new file mode 100644 index 0000000..cba15f8 --- /dev/null +++ b/demo/file_share_credentials.py @@ -0,0 +1,338 @@ +""" +Demo-local P2P credential grant helper. + +The shipped ``zkac-node grant`` command pairs a sender that calls +``IssuanceKeypair.encrypt`` (HKDF info ``admin->user``) with a listener that +calls ``IssuanceKeypair.decrypt`` (HKDF info ``user->admin``). Those labels +intentionally differ in the protocol — they describe the user/admin direction +of the issuance pipeline — so the round-trip in the same direction must use +``encrypt_for_admin`` + ``IssuanceKeypair.decrypt``, which both bind the +``user->admin`` label. + +This module performs the grant from the demo using the matched pair so the +encrypted payload decodes correctly under ``zkac-node p2p-listen``. The wire +format of the ``p2p_grant`` message is unchanged, only the encryption call. + +We deliberately avoid importing ``cli/zkac_cli/*``; identity, admin and pin +material is read straight from the CLI's stable on-disk JSON formats via +``zkac_cli_adapter`` helpers. +""" + +from __future__ import annotations + +import base64 +import json +import socket +import time +from pathlib import Path + +import zkac +from zkac.tcp import FramedSession, client_handshake_anon + +import file_share_client as fsc +import zkac_cli_adapter as cli + + +def _b64(data: bytes) -> str: + return base64.b64encode(data).decode("ascii") + + +def _unb64(s: str) -> bytes: + return base64.b64decode(s) + + +def _parse_server(server: str) -> tuple[str, int]: + host, sep, port_s = server.rpartition(":") + if not sep: + raise ValueError(f"invalid server {server!r}, expected host:port") + return (host or "127.0.0.1"), int(port_s, 10) + + +def _parse_contact_bundle(bundle: str) -> dict: + s = bundle.strip() + raw = base64.urlsafe_b64decode((s + "=" * (-len(s) % 4)).encode()) + return json.loads(raw.decode("utf-8")) + + +def _resolve_server_pk_hex(userid: str, server: str) -> str: + """Locate the user's pinned transport key for ``server`` on disk.""" + server_pk_hex = cli.load_pinned_server_key(userid, server) + if not server_pk_hex: + raise FileNotFoundError( + f"no pinned server key for {server!r} under {userid!r}; " + f"run: zkac-node server pin {userid} {server} --key " + ) + return server_pk_hex + + +def _fetch_registry_state( + userid: str, + server: str, + registry_id_hex: str, + admin_cred: zkac.Credential, +) -> tuple[bytes, bytes]: + """Authenticated mgmt-channel ``get_registry`` (matches the CLI's mgmt protocol).""" + server_pk_hex = _resolve_server_pk_hex(userid, server) + transport_secret = bytes.fromhex(cli.load_identity_secrets(userid)["transport_secret_hex"]) + host, port = _parse_server(server) + sock = socket.create_connection((host, port), timeout=8.0) + sock.settimeout(None) + try: + node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret)) + server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex)) + session = client_handshake_anon(sock, node, server_pk) + framed = FramedSession(sock, session) + framed.send(json.dumps({"op": "mgmt"}).encode()) + transcript_hash = bytes(session.transcript_hash()) + framed.send(json.dumps({ + "cmd": "get_registry", + "registry_id": registry_id_hex, + "auth_registry_id": registry_id_hex, + "admin_proof_b64": _b64(bytes(admin_cred.present(transcript_hash))), + }).encode()) + resp = json.loads(framed.recv()) + if resp.get("error"): + raise RuntimeError(resp["error"]) + return _unb64(resp["state_bytes_b64"]), _unb64(resp["state_cert_b64"]) + finally: + sock.close() + + +# ── grant (sender side) ────────────────────────────────────────────── + +def grant_role_p2p( + admin_userid: str, + server: str, + registry_id_hex: str, + role_name: str, + recipient_contact_bundle: str, + *, + bucket_role_grant: dict | None = None, + connect_timeout_s: float = 8.0, +) -> dict: + """Issue a BBS+ role credential and ship it to the recipient over an authenticated TCP session. + + Wire-compatible with ``zkac-node p2p-listen``: the ciphertext is produced via + ``encrypt_for_admin`` so the listener's ``IssuanceKeypair.decrypt`` succeeds. + """ + parsed = _parse_contact_bundle(recipient_contact_bundle) + recipient_issuance_pk_hex = parsed["issuance_pk_hex"] + recipient_transport_pk_hex = parsed["transport_pk_hex"] + recipient_grant_token_b64 = parsed["grant_token_b64"] + peer = parsed.get("peer") + if not peer: + raise RuntimeError( + "contact bundle missing peer endpoint; ask recipient to regenerate " + "with `zkac-node user show --peer `" + ) + + admin_data = cli.load_admin_material(admin_userid, registry_id_hex) + roles = admin_data.get("roles", []) + if role_name not in roles: + raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})") + role_epochs = admin_data.get("role_epochs", {}) + if role_name not in role_epochs: + raise RuntimeError(f"missing epoch metadata for role {role_name!r}") + epoch = int(role_epochs[role_name]) + + bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(admin_data["bbs_issuer_secret_b64"])) + bbs_pk = bbs_issuer.public_key() + admin_cred = cli.load_admin_credential(admin_userid, registry_id_hex) + + role_rid = zkac.role_id(role_name) + req = zkac.prepare_blind_request() + blind_sig = bbs_issuer.issue_blind(req.commitment_with_proof(), role_rid, epoch) + payload = json.dumps({ + "registry_id": registry_id_hex, + "role_name": role_name, + "epoch": epoch, + "issuer_pk_b64": _b64(bytes(bbs_pk.to_bytes())), + "blind_sig_b64": _b64(bytes(blind_sig)), + "member_secret_b64": _b64(bytes(req.member_secret())), + "prover_blind_b64": _b64(bytes(req.prover_blind())), + "bucket_role_grant": bucket_role_grant or None, + }).encode() + + rec_pk_bytes = bytes.fromhex(recipient_issuance_pk_hex) + if len(rec_pk_bytes) != 32: + raise RuntimeError("recipient issuance pubkey must decode to 32 bytes") + eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk_bytes, payload) + + state_bytes, state_cert = _fetch_registry_state( + admin_userid, server, registry_id_hex, admin_cred, + ) + + sender_transport_secret = bytes.fromhex( + cli.load_identity_secrets(admin_userid)["transport_secret_hex"] + ) + peer_host, peer_port = _parse_server(peer) + sock = socket.create_connection((peer_host, peer_port), timeout=connect_timeout_s) + sock.settimeout(None) + try: + node = zkac.Node(zkac.Keypair.from_secret_key(sender_transport_secret)) + peer_pk = zkac.PublicKey.from_bytes(bytes.fromhex(recipient_transport_pk_hex)) + session = client_handshake_anon(sock, node, peer_pk) + framed = FramedSession(sock, session) + ready = json.loads(framed.recv()) + if ready.get("error") or ready.get("op") != "ready_for_grant": + raise RuntimeError(f"peer did not accept grant session: {ready}") + transcript_hash = bytes(session.transcript_hash()) + admin_proof = bytes(admin_cred.present(transcript_hash)) + framed.send(json.dumps({ + "op": "p2p_grant", + "grant_token_b64": recipient_grant_token_b64, + "registry_id": registry_id_hex, + "registry_state_bytes_b64": _b64(state_bytes), + "registry_state_cert_b64": _b64(state_cert), + "role_name": role_name, + "eph_pk_b64": _b64(bytes(eph_pk)), + "ciphertext_b64": _b64(bytes(ciphertext)), + "admin_proof_b64": _b64(admin_proof), + }).encode()) + ack = json.loads(framed.recv()) + if ack.get("error"): + raise RuntimeError(ack["error"]) + return {"status": ack.get("status", "ok"), "peer": peer} + finally: + sock.close() + + +# ── listen (recipient side) ────────────────────────────────────────── + +def _save_credential_json( + userid: str, + registry_id_hex: str, + role_name: str, + payload: dict, +) -> Path: + """Write the received credential to ``ZKAC_HOME//credentials/_.json``. + + Mirrors the on-disk format used by ``zkac-node grant`` so subsequent + ``zkac-node`` commands can pick up the credential normally. + """ + creds_dir = cli.zkac_home() / userid / "credentials" + creds_dir.mkdir(parents=True, exist_ok=True) + target = creds_dir / f"{registry_id_hex}_{role_name}.json" + target.write_text(json.dumps({ + "blind_sig_b64": payload["blind_sig_b64"], + "member_secret_b64": payload["member_secret_b64"], + "prover_blind_b64": payload["prover_blind_b64"], + "role_name": payload["role_name"], + "epoch": payload["epoch"], + "issuer_pk_b64": payload["issuer_pk_b64"], + }, indent=2)) + try: + target.chmod(0o600) + except OSError: + pass + return target + + +def listen_for_role_credential( + userid: str, + host: str, + port: int, + *, + timeout_s: float = 60.0, +) -> dict: + """Block until one valid grant arrives, then save the credential locally. + + Compatible with ``demo.file_share_credentials.grant_role_p2p`` and (because + the wire format matches) also useful for testing against the existing + ``zkac-node grant`` listener API. + """ + from zkac.tcp import server_handshake_anon + + secrets = cli.load_identity_secrets(userid) + transport_secret = bytes.fromhex(secrets["transport_secret_hex"]) + issuance_secret = bytes.fromhex(secrets["issuance_secret_hex"]) + expected_token_b64 = secrets["grant_token_b64"] + + listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + listener.bind((host, port)) + listener.listen(1) + deadline = time.monotonic() + timeout_s + receiver_kp = zkac.IssuanceKeypair.from_secret(issuance_secret) + try: + while True: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise RuntimeError("timed out waiting for authenticated grant sender") + listener.settimeout(remaining) + conn, _addr = listener.accept() + try: + conn.settimeout(min(remaining, 30.0)) + node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret)) + session = server_handshake_anon(conn, node) + framed = FramedSession(conn, session) + framed.send(json.dumps({"ok": True, "op": "ready_for_grant"}).encode()) + msg = json.loads(framed.recv()) + if msg.get("error"): + raise RuntimeError(msg["error"]) + if msg.get("op") != "p2p_grant": + raise RuntimeError("unexpected p2p message") + if msg.get("grant_token_b64") != expected_token_b64: + raise RuntimeError("grant pairing token mismatch") + registry_id_hex = msg["registry_id"] + role_name = msg["role_name"] + + state_bytes = _unb64(msg["registry_state_bytes_b64"]) + state_cert = _unb64(msg["registry_state_cert_b64"]) + mgr = zkac.RegistryManager() + restored = mgr.restore(state_bytes, state_cert).hex() + if restored != registry_id_hex: + raise RuntimeError("registry snapshot does not match announced registry_id") + if not mgr.verify_admin( + bytes.fromhex(registry_id_hex), + _unb64(msg["admin_proof_b64"]), + bytes(session.transcript_hash()), + ): + raise RuntimeError("sender admin proof failed") + + payload = json.loads(receiver_kp.decrypt( + _unb64(msg["eph_pk_b64"]), _unb64(msg["ciphertext_b64"]), + )) + if payload.get("registry_id") != registry_id_hex: + raise RuntimeError("payload registry_id mismatch") + if payload.get("role_name") != role_name: + raise RuntimeError("payload role mismatch") + + cred = zkac.Credential.finalize( + _unb64(payload["blind_sig_b64"]), + _unb64(payload["member_secret_b64"]), + _unb64(payload["prover_blind_b64"]), + zkac.role_id(role_name), + int(payload["epoch"]), + zkac.BbsPublicKey.from_bytes(_unb64(payload["issuer_pk_b64"])), + ) + nonce = bytes(session.transcript_hash()) + if not mgr.verify_presentation( + bytes.fromhex(registry_id_hex), + zkac.role_id(role_name), + bytes(cred.present(nonce)), + nonce, + ): + raise RuntimeError("granted credential does not verify") + + _save_credential_json(userid, registry_id_hex, role_name, payload) + bucket_grant = payload.get("bucket_role_grant") + if isinstance(bucket_grant, dict): + bucket_id = bucket_grant.get("bucket_id") + eph_pk_b64 = bucket_grant.get("eph_pk_b64") + ciphertext_b64 = bucket_grant.get("ciphertext_b64") + if isinstance(bucket_id, str) and isinstance(eph_pk_b64, str) and isinstance(ciphertext_b64, str): + fsc.save_received_role_grant( + userid, + registry_id_hex, + role_name, + bucket_id, + eph_pk_b64=eph_pk_b64, + ciphertext_b64=ciphertext_b64, + ) + framed.send(json.dumps({"ok": True, "status": "stored"}).encode()) + return {"registry_id": registry_id_hex, "role": role_name} + finally: + conn.close() + finally: + listener.close() diff --git a/demo/file_share_server.py b/demo/file_share_server.py new file mode 100644 index 0000000..d58e5fb --- /dev/null +++ b/demo/file_share_server.py @@ -0,0 +1,815 @@ +#!/usr/bin/env python3 +""" +ZKAC opaque file-share server (demo). + +Headless TCP service that combines: + +* The same management-channel wire protocol as ``zkac-node serve`` so existing + ``zkac-node registry create/get/update`` commands work unchanged. +* A new file-share channel that, after BBS+ role authentication, exposes + bucket primitives. The server only ever sees opaque ciphertext blobs and + encrypted role grants; file names, contents and per-role + visibility masks are never visible server-side. + +Run with:: + + uv run python demo/file_share_server.py --host 127.0.0.1 --port 9879 + +The server transport public key is printed at start-up; pin it on each client +with ``zkac-node server pin 127.0.0.1:9879 --key ``. +""" + +from __future__ import annotations + +import argparse +import base64 +import hashlib +import json +import os +import socket +import sys +import threading +import uuid +from pathlib import Path +from typing import Any + +import zkac +from zkac.tcp import FramedSession, server_handshake_anon + + +# ── small helpers ──────────────────────────────────────────────────── + +def _b64(data: bytes) -> str: + return base64.b64encode(data).decode("ascii") + + +def _unb64(s: str) -> bytes: + return base64.b64decode(s) + + +def _chmod(path: Path, mode: int) -> None: + try: + os.chmod(path, mode) + except OSError: + pass + + +def _write_private_json(path: Path, payload: dict) -> None: + path.write_text(json.dumps(payload, indent=2)) + _chmod(path, 0o600) + + +def _is_loopback(host: str) -> bool: + return host.strip().lower() in {"127.0.0.1", "::1", "localhost"} + + +def _safe_id(value: str) -> str: + """Strict allowlist on identifiers used as filesystem names.""" + if not isinstance(value, str) or not value: + raise ValueError("missing identifier") + if len(value) > 128 or any(c not in "0123456789abcdefABCDEF-" for c in value): + raise ValueError(f"invalid identifier {value!r}") + return value + + +def _privacy_safe_error_tag(exc: BaseException) -> str: + """Return a coarse error label suitable for public service logs.""" + return type(exc).__name__ + + +# ── opaque on-disk store ───────────────────────────────────────────── + +class _FileShareStore: + """Persists registry snapshots and opaque bucket state under one data dir.""" + + def __init__(self, data_dir: Path) -> None: + self._dir = data_dir + self._reg_dir = data_dir / "registries" + self._buckets_dir = data_dir / "buckets" + self._privacy_key: bytes = b"" + for d in (self._dir, self._reg_dir, self._buckets_dir): + d.mkdir(parents=True, exist_ok=True) + _chmod(d, 0o700) + self._lock = threading.Lock() + + def set_privacy_key(self, key: bytes) -> None: + if not key: + raise RuntimeError("privacy key must not be empty") + self._privacy_key = bytes(key) + + def _require_privacy_key(self) -> bytes: + if not self._privacy_key: + raise RuntimeError("privacy key not initialized") + return self._privacy_key + + def _tag(self, kind: str, raw_id: str) -> str: + key = self._require_privacy_key() + raw = _safe_id(raw_id).encode("utf-8") + h = hashlib.sha256() + h.update(kind.encode("utf-8")) + h.update(b"\x00") + h.update(key) + h.update(b"\x00") + h.update(raw) + return h.hexdigest() + + def _registry_tag(self, registry_id_hex: str) -> str: + return self._tag("registry", registry_id_hex) + + def _role_tag(self, role_id_hex: str) -> str: + return self._tag("role", role_id_hex) + + # transport key + + def load_or_create_keypair(self) -> zkac.Keypair: + kf = self._dir / "server_key.json" + if kf.exists(): + data = json.loads(kf.read_text()) + return zkac.Keypair.from_secret_key(_unb64(data["secret_b64"])) + kp = zkac.Keypair() + _write_private_json(kf, { + "secret_b64": _b64(kp.secret_key_bytes()), + "public_b64": _b64(kp.public_key().to_bytes()), + }) + return kp + + # registries (mirrors zkac-node serve so the CLI works unchanged) + + def save_registry(self, rid_hex: str, state_bytes: bytes, cert_bytes: bytes) -> None: + with self._lock: + (self._reg_dir / f"{rid_hex}.state").write_bytes(state_bytes) + (self._reg_dir / f"{rid_hex}.cert").write_bytes(cert_bytes) + + def load_all_registries(self, mgr: zkac.RegistryManager) -> int: + n = 0 + for p in sorted(self._reg_dir.glob("*.state")): + cert = self._reg_dir / f"{p.stem}.cert" + if not cert.exists(): + continue + try: + mgr.restore(p.read_bytes(), cert.read_bytes()) + n += 1 + except Exception as exc: + print(f"[fs-server] skip registry {p.stem}: {exc}") + return n + + def list_registry_ids(self) -> list[str]: + out: list[str] = [] + for p in sorted(self._reg_dir.glob("*.state")): + cert = self._reg_dir / f"{p.stem}.cert" + if cert.exists(): + out.append(p.stem) + return out + + # buckets + + def _bucket_dir(self, bucket_id: str) -> Path: + return self._buckets_dir / _safe_id(bucket_id) + + def _bucket_meta_path(self, bucket_id: str) -> Path: + return self._bucket_dir(bucket_id) / "meta.json" + + def _roles_index_path(self, registry_id_hex: str) -> Path: + return self._reg_dir / f"{_safe_id(registry_id_hex)}.roles.json" + + def _remember_role_id(self, registry_id_hex: str, role_id_hex: str) -> None: + p = self._roles_index_path(registry_id_hex) + roles: set[str] = set() + if p.is_file(): + try: + data = json.loads(p.read_text()) + if isinstance(data, list): + roles = {_safe_id(str(v)) for v in data} + except Exception: + roles = set() + roles.add(_safe_id(role_id_hex)) + _write_private_json(p, sorted(roles)) + + def bucket_create(self, bucket_id: str, owner_registry_id_hex: str) -> None: + bd = self._bucket_dir(bucket_id) + with self._lock: + if bd.exists(): + raise RuntimeError("bucket already exists") + (bd / "blobs").mkdir(parents=True) + (bd / "role_grants").mkdir() + _chmod(bd, 0o700) + _write_private_json(self._bucket_meta_path(bucket_id), { + "bucket_id": bucket_id, + "owner_registry_tag": self._registry_tag(owner_registry_id_hex), + "finalized": False, + "role_acl": {}, # role_tag -> {"version": int, "allowed_blob_ids": [blob_id...]} + }) + + def bucket_meta(self, bucket_id: str) -> dict: + return json.loads(self._bucket_meta_path(bucket_id).read_text()) + + def bucket_is_finalized(self, bucket_id: str) -> bool: + return bool(self.bucket_meta(bucket_id).get("finalized", False)) + + def _bucket_require_owner(self, bucket_id: str, registry_id_hex: str) -> dict: + meta = self.bucket_meta(bucket_id) + expected_owner_tag = self._registry_tag(registry_id_hex) + if meta.get("owner_registry_tag") != expected_owner_tag: + raise RuntimeError("not bucket owner") + return meta + + def bucket_set_finalized(self, bucket_id: str, registry_id_hex: str, finalized: bool) -> None: + with self._lock: + meta = self._bucket_require_owner(bucket_id, registry_id_hex) + meta["finalized"] = bool(finalized) + _write_private_json(self._bucket_meta_path(bucket_id), meta) + + def bucket_delete(self, bucket_id: str, registry_id_hex: str) -> None: + with self._lock: + self._bucket_require_owner(bucket_id, registry_id_hex) + bd = self._bucket_dir(bucket_id) + for sub in ("blobs", "role_grants"): + d = bd / sub + if d.is_dir(): + for p in d.rglob("*"): + if p.is_file(): + try: + p.unlink() + except OSError: + pass + for p in sorted(d.rglob("*"), reverse=True): + if p.is_dir(): + try: + p.rmdir() + except OSError: + pass + try: + d.rmdir() + except OSError: + pass + try: + self._bucket_meta_path(bucket_id).unlink() + except OSError: + pass + try: + bd.rmdir() + except OSError: + pass + + def bucket_put_blob( + self, + bucket_id: str, + registry_id_hex: str, + blob_id: str, + ciphertext: bytes, + ) -> None: + with self._lock: + self._bucket_require_owner(bucket_id, registry_id_hex) + (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).write_bytes(ciphertext) + + def bucket_set_role_acl( + self, + bucket_id: str, + registry_id_hex: str, + role_id_hex: str, + allowed_blob_ids: list[str], + ) -> None: + with self._lock: + meta = self._bucket_require_owner(bucket_id, registry_id_hex) + role_acl = dict(meta.get("role_acl", {})) + key = self._role_tag(role_id_hex) + prev = role_acl.get(key, {}) + prev_version = int(prev.get("version", 0)) if isinstance(prev, dict) else 0 + role_acl[key] = { + "version": prev_version + 1, + "allowed_blob_ids": [_safe_id(b) for b in allowed_blob_ids], + } + meta["role_acl"] = role_acl + _write_private_json(self._bucket_meta_path(bucket_id), meta) + self._remember_role_id(registry_id_hex, role_id_hex) + + def bucket_get_blob(self, bucket_id: str, blob_id: str) -> bytes: + return (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).read_bytes() + + def bucket_blob_allowed_for_role(self, bucket_id: str, role_id_hex: str, blob_id: str) -> bool: + meta = self.bucket_meta(bucket_id) + role_acl = meta.get("role_acl", {}) + role_meta = role_acl.get(self._role_tag(role_id_hex)) + if not isinstance(role_meta, dict): + return False + allowed = role_meta.get("allowed_blob_ids") + if not isinstance(allowed, list): + return False + return _safe_id(blob_id) in {str(v) for v in allowed} + + def bucket_role_acl(self, bucket_id: str, role_id_hex: str) -> dict: + meta = self.bucket_meta(bucket_id) + role_acl = meta.get("role_acl", {}) + role_meta = role_acl.get(self._role_tag(role_id_hex), {}) + if not isinstance(role_meta, dict): + return {"version": 0, "allowed_blob_ids": []} + version = int(role_meta.get("version", 0)) + allowed = role_meta.get("allowed_blob_ids", []) + if not isinstance(allowed, list): + allowed = [] + return { + "version": version, + "allowed_blob_ids": [_safe_id(str(v)) for v in allowed], + } + + def buckets_for_role_in_registry(self, role_id_hex: str, registry_id_hex: str) -> list[str]: + """Bucket ids where this authenticated role currently has non-empty ACL.""" + role_id_hex = _safe_id(role_id_hex) + out: list[str] = [] + for bd in sorted(self._buckets_dir.iterdir()): + bid = bd.name + try: + if not self.bucket_is_finalized(bid): + continue + if self.bucket_owner_registry_tag(bid) != self._registry_tag(registry_id_hex): + continue + acl = self.bucket_role_acl(bid, role_id_hex) + allowed = acl.get("allowed_blob_ids", []) + grants = self.bucket_get_role_grants(bid, role_id_hex) + current_acl_version = int(acl.get("version", -1)) + has_fresh_grant = any( + isinstance(g.get("acl_version"), int) and g["acl_version"] == current_acl_version + for g in grants + ) + if isinstance(allowed, list) and len(allowed) > 0 and has_fresh_grant: + out.append(bid) + except Exception: + continue + return out + + def bucket_put_role_grant( + self, + bucket_id: str, + registry_id_hex: str, + role_id_hex: str, + acl_version: int, + eph_pk_b64: str, + ciphertext_b64: str, + ) -> None: + with self._lock: + self._bucket_require_owner(bucket_id, registry_id_hex) + role_id_hex = _safe_id(role_id_hex) + role_tag = self._role_tag(role_id_hex) + payload = { + "bucket_id": bucket_id, + "role_tag": role_tag, + "acl_version": int(acl_version), + "eph_pk_b64": eph_pk_b64, + "ciphertext_b64": ciphertext_b64, + } + role_dir = self._bucket_dir(bucket_id) / "role_grants" / role_tag + role_dir.mkdir(parents=True, exist_ok=True) + target = role_dir / f"{uuid.uuid4().hex}.json" + _write_private_json(target, payload) + self._remember_role_id(registry_id_hex, role_id_hex) + + def bucket_get_role_grants(self, bucket_id: str, role_id_hex: str) -> list[dict]: + role_dir = self._bucket_dir(bucket_id) / "role_grants" / self._role_tag(role_id_hex) + if not role_dir.is_dir(): + return [] + out: list[dict] = [] + for p in sorted(role_dir.glob("*.json")): + try: + out.append(json.loads(p.read_text())) + except Exception: + continue + return out + + def bucket_owner_registry_tag(self, bucket_id: str) -> str: + owner = self.bucket_meta(bucket_id).get("owner_registry_tag") + if not isinstance(owner, str) or not owner: + raise RuntimeError("bucket metadata missing owner_registry_tag") + return owner + + def buckets_owned_by(self, registry_id_hex: str) -> list[str]: + out: list[str] = [] + for bd in sorted(self._buckets_dir.iterdir()): + meta = bd / "meta.json" + if not meta.is_file(): + continue + try: + owner_tag = self._registry_tag(registry_id_hex) + if json.loads(meta.read_text()).get("owner_registry_tag") == owner_tag: + out.append(bd.name) + except (OSError, json.JSONDecodeError): + continue + return out + + def role_ids_for_registry(self, registry_id_hex: str) -> list[str]: + p = self._roles_index_path(registry_id_hex) + if not p.is_file(): + return [] + try: + data = json.loads(p.read_text()) + if not isinstance(data, list): + return [] + return sorted({_safe_id(str(v)) for v in data}) + except Exception: + return [] + + +# ── command dispatch (inside encrypted session) ────────────────────── + +def _dispatch_mgmt( + cmd: dict, + mgr: zkac.RegistryManager, + store: _FileShareStore, + server_pk_b64: str, + transcript_hash: bytes, +) -> dict: + """Registry management commands, wire-compatible with ``zkac-node`` CLI.""" + try: + action = cmd.get("cmd") + rid_hex = cmd.get("auth_registry_id") + admin_proof_b64 = cmd.get("admin_proof_b64") + + def _require_admin_for(target_rid_hex: str) -> None: + if rid_hex != target_rid_hex: + raise RuntimeError("auth_registry_id must match command registry_id") + if not isinstance(admin_proof_b64, str) or not admin_proof_b64: + raise RuntimeError("missing admin_proof_b64") + if not mgr.verify_admin( + bytes.fromhex(target_rid_hex), + _unb64(admin_proof_b64), + transcript_hash, + ): + raise RuntimeError("admin authorization failed") + + if action == "server_info": + return {"ok": True, "server_public_key_b64": server_pk_b64} + + if action == "create_registry": + state_bytes = _unb64(cmd["state_bytes_b64"]) + state_cert = _unb64(cmd["state_cert_b64"]) + auth_rid = cmd.get("auth_registry_id") + if not isinstance(auth_rid, str): + raise RuntimeError("missing auth_registry_id") + if not isinstance(admin_proof_b64, str) or not admin_proof_b64: + raise RuntimeError("missing admin_proof_b64") + tmp_mgr = zkac.RegistryManager() + expected_rid = tmp_mgr.create(state_bytes, state_cert).hex() + if expected_rid != auth_rid: + raise RuntimeError("auth_registry_id does not match certified state") + if not tmp_mgr.verify_admin( + bytes.fromhex(expected_rid), + _unb64(admin_proof_b64), + transcript_hash, + ): + raise RuntimeError("admin authorization failed for create_registry") + rid = mgr.create(state_bytes, state_cert) + store.save_registry(rid.hex(), state_bytes, state_cert) + return {"ok": True, "registry_id": rid.hex()} + + if action == "get_registry": + rid_hex_cmd = cmd["registry_id"] + _require_admin_for(rid_hex_cmd) + rid = bytes.fromhex(rid_hex_cmd) + state_bytes, state_cert = mgr.get(rid) + return { + "ok": True, + "state_bytes_b64": _b64(state_bytes), + "state_cert_b64": _b64(state_cert), + } + + if action == "update_registry": + rid_hex_cmd = cmd["registry_id"] + _require_admin_for(rid_hex_cmd) + rid = bytes.fromhex(rid_hex_cmd) + state_bytes = _unb64(cmd["state_bytes_b64"]) + state_cert = _unb64(cmd["state_cert_b64"]) + mgr.update(rid, state_bytes, state_cert) + store.save_registry(rid_hex_cmd, state_bytes, state_cert) + return {"ok": True} + + return {"error": f"unknown command: {action}"} + + except Exception as exc: + return {"error": str(exc)} + + +def _dispatch_fs( + cmd: dict, + store: _FileShareStore, + ctx: dict, +) -> dict: + """File-share commands authenticated via opaque per-session authorization context.""" + try: + action = cmd.get("cmd") + registry_id_hex: str = ctx["registry_id_hex"] + registry_tag: str = ctx["registry_tag"] + role_id_hex: str = ctx["role_id_hex"] + is_admin: bool = bool(ctx["is_admin"]) + + def _require_admin() -> None: + if not is_admin: + raise RuntimeError("admin role required for this command") + + if action == "whoami": + return { + "ok": True, + "is_admin": is_admin, + "auth_scope": "admin" if is_admin else "credential", + } + + if action == "bucket_create": + _require_admin() + bid = cmd.get("bucket_id") or uuid.uuid4().hex + store.bucket_create(bid, registry_id_hex) + return {"ok": True, "bucket_id": bid} + + if action == "bucket_put_blob": + _require_admin() + bid = cmd["bucket_id"] + blob_id = cmd["blob_id"] + ciphertext = _unb64(cmd["ciphertext_b64"]) + store.bucket_put_blob(bid, registry_id_hex, blob_id, ciphertext) + return {"ok": True} + + if action == "bucket_set_role_acl": + _require_admin() + store.bucket_set_role_acl( + cmd["bucket_id"], + registry_id_hex, + cmd["role_id_hex"], + list(cmd.get("allowed_blob_ids", [])), + ) + return {"ok": True} + + if action == "bucket_put_role_grant": + _require_admin() + store.bucket_put_role_grant( + cmd["bucket_id"], + registry_id_hex, + cmd["role_id_hex"], + int(cmd["acl_version"]), + cmd["eph_pk_b64"], + cmd["ciphertext_b64"], + ) + return {"ok": True} + + if action == "bucket_get_role_acl": + _require_admin() + role_acl = store.bucket_role_acl(cmd["bucket_id"], cmd["role_id_hex"]) + return {"ok": True, **role_acl} + + if action == "bucket_finalize": + _require_admin() + store.bucket_set_finalized(cmd["bucket_id"], registry_id_hex, True) + return {"ok": True} + + if action == "bucket_delete": + _require_admin() + store.bucket_delete(cmd["bucket_id"], registry_id_hex) + return {"ok": True} + + if action == "bucket_list_owned": + _require_admin() + return {"ok": True, "bucket_ids": store.buckets_owned_by(registry_id_hex)} + + if action == "fs_buckets": + return { + "ok": True, + "bucket_ids": store.buckets_for_role_in_registry( + role_id_hex, + registry_id_hex, + ), + } + + if action == "fs_get_role_grant": + bid = cmd["bucket_id"] + if store.bucket_owner_registry_tag(bid) != registry_tag: + raise RuntimeError("bucket does not belong to authenticated registry") + if not store.bucket_is_finalized(bid): + raise RuntimeError("bucket is not finalized") + current_acl = store.bucket_role_acl(bid, role_id_hex) + acl_version = int(current_acl.get("version", -1)) + grants = [ + g for g in store.bucket_get_role_grants(bid, role_id_hex) + if isinstance(g.get("acl_version"), int) and g["acl_version"] == acl_version + ] + if not grants: + raise RuntimeError("permissions updated; request a fresh role grant") + return {"ok": True, "grants": grants} + + if action == "fs_get_blob": + bid = cmd["bucket_id"] + blob_id = cmd["blob_id"] + if store.bucket_owner_registry_tag(bid) != registry_tag: + raise RuntimeError("bucket does not belong to authenticated registry") + if not store.bucket_is_finalized(bid): + raise RuntimeError("bucket is not finalized") + if not is_admin and not store.bucket_blob_allowed_for_role(bid, role_id_hex, blob_id): + raise RuntimeError("blob access denied by role mask") + data = store.bucket_get_blob(bid, blob_id) + return {"ok": True, "ciphertext_b64": _b64(data)} + + return {"error": f"unknown command: {action}"} + + except Exception as exc: + return {"error": str(exc)} + + +def _authenticate_fs_identity( + mgr: zkac.RegistryManager, + store: _FileShareStore, + proof: bytes, + transcript_hash: bytes, +) -> dict[str, object] | None: + """Resolve opaque auth context from proof without client-supplied identifiers.""" + registry_ids = store.list_registry_ids() + admin_matches: list[str] = [] + role_matches: list[tuple[str, str]] = [] + for rid_hex in registry_ids: + try: + rid = bytes.fromhex(rid_hex) + except ValueError: + continue + try: + if mgr.verify_admin(rid, proof, transcript_hash): + admin_matches.append(rid_hex) + except Exception: + pass + for role_id_hex in store.role_ids_for_registry(rid_hex): + try: + role_id = bytes.fromhex(role_id_hex) + except ValueError: + continue + if role_id == zkac.admin_role_id(): + continue + try: + if mgr.verify_presentation(rid, role_id, proof, transcript_hash): + role_matches.append((rid_hex, role_id_hex)) + except Exception: + continue + if len(admin_matches) == 1 and not role_matches: + rid_hex = admin_matches[0] + role_hex = zkac.admin_role_id().hex() + return { + "registry_id_hex": rid_hex, + "registry_tag": store._registry_tag(rid_hex), + "role_id_hex": role_hex, + "role_tag": store._role_tag(role_hex), + "is_admin": True, + } + if not admin_matches and len(role_matches) == 1: + rid_hex, role_hex = role_matches[0] + return { + "registry_id_hex": rid_hex, + "registry_tag": store._registry_tag(rid_hex), + "role_id_hex": role_hex, + "role_tag": store._role_tag(role_hex), + "is_admin": False, + } + # Ambiguous or invalid proof. + return None + + +# ── per-connection handler ──────────────────────────────────────────── + +def _handle_conn( + conn: socket.socket, + addr: tuple, + node: zkac.Node, + mgr: zkac.RegistryManager, + store: _FileShareStore, + server_pk_b64: str, + idle_timeout_s: float, + slots: threading.BoundedSemaphore, +) -> None: + try: + conn.settimeout(idle_timeout_s) + session = server_handshake_anon(conn, node) + framed = FramedSession(conn, session) + transcript_hash = bytes(session.transcript_hash()) + + hello = json.loads(framed.recv()) + op = hello.get("op") + + if op == "mgmt": + while True: + try: + cmd = json.loads(framed.recv()) + except (ConnectionError, OSError): + break + resp = _dispatch_mgmt(cmd, mgr, store, server_pk_b64, transcript_hash) + framed.send(json.dumps(resp).encode()) + return + + if op == "fs": + try: + proof = _unb64(hello["bbs_auth_b64"]) + except (KeyError, ValueError) as exc: + framed.send(json.dumps({"error": f"invalid fs hello: {exc}"}).encode()) + return + auth = _authenticate_fs_identity(mgr, store, proof, transcript_hash) + if auth is None: + framed.send(json.dumps({"error": "auth failed"}).encode()) + return + framed.send(json.dumps({"ok": True, "status": "authenticated"}).encode()) + ctx = auth + while True: + try: + cmd = json.loads(framed.recv()) + except (ConnectionError, OSError): + break + resp = _dispatch_fs(cmd, store, ctx) + framed.send(json.dumps(resp).encode()) + return + + framed.send(json.dumps({"error": f"unknown op: {op}"}).encode()) + + except (ConnectionError, BrokenPipeError, OSError): + pass + except Exception as exc: + # Privacy model: never emit client endpoint or request-linked payloads. + print(f"[fs-server] connection error ({_privacy_safe_error_tag(exc)})") + finally: + conn.close() + slots.release() + + +# ── entry point ────────────────────────────────────────────────────── + +def serve( + data_dir: Path, + host: str = "127.0.0.1", + port: int = 9879, + *, + max_connections: int = 64, + idle_timeout_s: float = 60.0, + listen_backlog: int = 64, + allow_non_loopback: bool = False, +) -> None: + data_dir.mkdir(parents=True, exist_ok=True) + store = _FileShareStore(data_dir) + kp = store.load_or_create_keypair() + store.set_privacy_key(bytes(kp.secret_key_bytes())) + server_pk_b64 = _b64(kp.public_key().to_bytes()) + pk_hex = kp.public_key().to_bytes().hex() + node = zkac.Node(kp) + + mgr = zkac.RegistryManager() + n = store.load_all_registries(mgr) + + print(f"[fs-server] data dir: {data_dir}") + print(f"[fs-server] server transport public key (pin OOB): {pk_hex}") + print(f"[fs-server] loaded {n} registries") + print(f"[fs-server] listening on {host}:{port}") + + if not _is_loopback(host) and not allow_non_loopback: + raise RuntimeError( + "refusing to bind outside loopback. " + "Use --allow-non-loopback only when exposure is intentional." + ) + if not _is_loopback(host): + print(f"[fs-server] warning: binding outside loopback: {host}:{port}", file=sys.stderr) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + sock.bind((host, port)) + slots = threading.BoundedSemaphore(max_connections) + sock.listen(listen_backlog) + try: + while True: + conn, addr = sock.accept() + if not slots.acquire(blocking=False): + conn.close() + continue + threading.Thread( + target=_handle_conn, + args=(conn, addr, node, mgr, store, server_pk_b64, idle_timeout_s, slots), + daemon=True, + ).start() + except KeyboardInterrupt: + print("\n[fs-server] shutdown") + finally: + sock.close() + + +def main() -> None: + p = argparse.ArgumentParser(description="ZKAC opaque file-share server (demo)") + p.add_argument("--host", default="127.0.0.1") + p.add_argument("--port", type=int, default=9879) + p.add_argument( + "--data-dir", + type=Path, + default=Path(__file__).resolve().parent / "fs_data", + help="server state (transport key, registries, opaque buckets)", + ) + p.add_argument("--idle-timeout", type=float, default=60.0) + p.add_argument("--max-connections", type=int, default=64) + p.add_argument("--allow-non-loopback", action="store_true") + args = p.parse_args() + + serve( + args.data_dir, + host=args.host, + port=args.port, + max_connections=args.max_connections, + idle_timeout_s=args.idle_timeout, + allow_non_loopback=args.allow_non_loopback, + ) + + +if __name__ == "__main__": + main() diff --git a/demo/file_share_smoke.py b/demo/file_share_smoke.py new file mode 100644 index 0000000..4ff4b8b --- /dev/null +++ b/demo/file_share_smoke.py @@ -0,0 +1,256 @@ +#!/usr/bin/env python3 +""" +End-to-end smoke test for the ZKAC file-share demo. + +Exercises: + +* admin (Alice) creates a registry on the file-share server, +* admin uploads a folder as an encrypted bucket with per-role visibility masks, +* Alice issues a ZKAC role credential to Bob via direct P2P grant, +* Alice uploads an anonymous role-grant envelope to the server, +* Bob authenticates to the file-share server with his role credential, + downloads, and decrypts the files his role can see, +* server opacity: every byte at rest in the bucket directory is ciphertext + (no plaintext file content survives on disk). + +All ZKAC operations go through ``zkac-node`` (subprocess) and a temporary +``ZKAC_HOME`` so the test is hermetic. + +Run:: + + uv run python demo/file_share_smoke.py +""" + +from __future__ import annotations + +import os +import shutil +import socket +import sys +import tempfile +import threading +import time +import traceback +from pathlib import Path + +DEMO_DIR = Path(__file__).resolve().parent +sys.path.insert(0, str(DEMO_DIR)) + +import zkac # noqa: E402 + +import file_share_client as fsc # noqa: E402 +import file_share_credentials as fscred # noqa: E402 +import file_share_server as fss # noqa: E402 +import zkac_cli_adapter as cli # noqa: E402 + + +def _free_port() -> int: + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + finally: + s.close() + + +def _wait_until(predicate, timeout_s: float = 5.0, interval_s: float = 0.05) -> bool: + deadline = time.monotonic() + timeout_s + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(interval_s) + return False + + +def _make_files(folder: Path) -> dict[str, bytes]: + folder.mkdir(parents=True, exist_ok=True) + contents = { + "alpha.txt": b"public-summary: " + os.urandom(2048), + "nested/beta.bin": os.urandom(200 * 1024), # > one chunk + "secret.md": b"# top-secret\n" + os.urandom(8192), + } + paths: dict[str, bytes] = {} + for rel, data in contents.items(): + target = folder / rel + target.parent.mkdir(parents=True, exist_ok=True) + target.write_bytes(data) + paths[rel] = data + return paths + + +def _start_fs_server(host: str, port: int, data_dir: Path) -> threading.Thread: + t = threading.Thread( + target=fss.serve, + args=(data_dir,), + kwargs={"host": host, "port": port, "allow_non_loopback": False}, + daemon=True, + ) + t.start() + return t + + +def _server_transport_pubkey_hex(data_dir: Path) -> str: + import json + return zkac.PublicKey.from_bytes( + __import__("base64").b64decode( + json.loads((data_dir / "server_key.json").read_text())["public_b64"] + ) + ).to_bytes().hex() + + +def _open_admin_session(userid: str, server: str, server_pk_hex: str, registry_id: str) -> fsc.FileShareSession: + secrets = cli.load_identity_secrets(userid) + cred = cli.load_admin_credential(userid, registry_id) + return fsc.open_session( + server, + server_pk_hex=server_pk_hex, + user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]), + registry_id_hex=registry_id, + role_id=zkac.admin_role_id(), + credential=cred, + ) + + +def _open_role_session(userid: str, server: str, server_pk_hex: str, registry_id: str, role_name: str) -> fsc.FileShareSession: + secrets = cli.load_identity_secrets(userid) + cred = cli.load_credential(userid, registry_id, role_name) + return fsc.open_session( + server, + server_pk_hex=server_pk_hex, + user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]), + registry_id_hex=registry_id, + role_id=zkac.role_id(role_name), + credential=cred, + ) + + +def _scan_for_plaintext(haystack_root: Path, needles: list[bytes]) -> list[tuple[Path, int]]: + """Brute-force grep for any ``needle`` bytes appearing in any file under ``haystack_root``.""" + hits: list[tuple[Path, int]] = [] + for p in haystack_root.rglob("*"): + if not p.is_file(): + continue + data = p.read_bytes() + for n in needles: + if len(n) >= 64 and n in data: + hits.append((p, len(n))) + break + return hits + + +def main() -> int: + tmp = Path(tempfile.mkdtemp(prefix="zkac-fs-smoke-")) + print(f"[smoke] temp dir: {tmp}") + home = tmp / "zkac_home" + fs_data = tmp / "fs_data" + src = tmp / "src_folder" + out = tmp / "downloads" + home.mkdir() + fs_data.mkdir() + + os.environ["ZKAC_HOME"] = str(home) + + contents = _make_files(src) + print(f"[smoke] source folder: {src} ({len(contents)} files)") + + port = _free_port() + server_addr = f"127.0.0.1:{port}" + _start_fs_server("127.0.0.1", port, fs_data) + if not _wait_until(lambda: (fs_data / "server_key.json").is_file()): + raise RuntimeError("server did not start") + if not _wait_until(lambda: socket.create_connection(("127.0.0.1", port), timeout=0.2)): + raise RuntimeError("server port not ready") + server_pk_hex = _server_transport_pubkey_hex(fs_data) + print(f"[smoke] file-share server ready @ {server_addr}, pk={server_pk_hex[:16]}…") + + cli.run_cli(["user", "create", "alice"]).raise_for_status() + cli.run_cli(["user", "create", "bob"]).raise_for_status() + cli.server_pin("alice", server_addr, server_pk_hex).raise_for_status() + cli.server_pin("bob", server_addr, server_pk_hex).raise_for_status() + print("[smoke] alice + bob created and pinned server") + + rid = cli.registry_create("alice", server_addr, ["viewer", "editor"]) + print(f"[smoke] registry created: {rid}") + + # --- direct P2P grant: bob listens, alice grants ---------------------- + listener_port = _free_port() + listener = cli.P2PListener("bob", host="127.0.0.1", port=listener_port, timeout_s=30.0) + listener.start() + if not _wait_until(lambda: listener.is_running()): + raise RuntimeError("bob listener did not start") + bob_contact = cli.show_user_contact("bob", peer=f"127.0.0.1:{listener_port}") + print(f"[smoke] bob listening on 127.0.0.1:{listener_port}") + # --- alice uploads bucket + sets masks -------------------------------- + files_in_order = fsc.flatten_folder(src) + rel_paths = [str(p.relative_to(src.resolve())) for p in files_in_order] + print(f"[smoke] flattened: {rel_paths}") + # mask: viewer sees only first file (alpha.txt), editor sees everything. + viewer_mask = "1" + "0" * (len(files_in_order) - 1) + editor_mask = "1" * len(files_in_order) + bob_secrets = cli.load_identity_secrets("bob") + + with _open_admin_session("alice", server_addr, server_pk_hex, rid) as sess: + manifest = fsc.upload_bucket( + sess, src, server=server_addr, registry_id_hex=rid, + ) + manifest.role_masks = {"viewer": viewer_mask, "editor": editor_mask} + fsc.apply_role_masks_to_server(sess, manifest) + fsc.push_role_grant(sess, manifest, "viewer", bob_secrets["issuance_pk_hex"]) + fsc.save_manifest("alice", manifest) + print(f"[smoke] uploaded bucket {manifest.bucket_id} with role ACLs + anonymous grant envelope") + + try: + fscred.grant_role_p2p( + "alice", + server_addr, + rid, + "viewer", + bob_contact, + ) + except RuntimeError as exc: + listener.stop() + print(f"[smoke] grant failed: {exc}\n[smoke] bob listener output:\n{listener.output()}") + raise + if not _wait_until(lambda: not listener.is_running(), timeout_s=10.0): + listener.stop() + raise RuntimeError(f"listener never exited after grant; output:\n{listener.output()}") + received = listener.parse_received() + print(f"[smoke] bob received: {received}") + assert received and received["role"] == "viewer" + assert received["registry_id"] == rid + + # --- bob downloads using his viewer credential ------------------------ + with _open_role_session("bob", server_addr, server_pk_hex, rid, "viewer") as sess: + accessible = sess.fs_buckets() + assert accessible == [manifest.bucket_id], accessible + result = fsc.download_bucket( + sess, manifest.bucket_id, + issuance_secret_hex=bob_secrets["issuance_secret_hex"], + output_dir=out / manifest.bucket_id, + ) + print(f"[smoke] bob downloaded files: {result['files_written']}") + + # --- assertions ------------------------------------------------------- + expected_visible = {rel_paths[0]} # only first file + actual = {str(Path(p).relative_to(out / manifest.bucket_id)) for p in result["files_written"]} + assert actual == expected_visible, f"visibility mismatch: {actual} vs {expected_visible}" + decrypted = (out / manifest.bucket_id / rel_paths[0]).read_bytes() + assert decrypted == contents[rel_paths[0]], "decrypted content does not match plaintext" + + # opacity: no plaintext file content lives on disk under fs_data + plaintext_needles = list(contents.values()) + leaks = _scan_for_plaintext(fs_data / "buckets", plaintext_needles) + assert not leaks, f"plaintext leaked into server storage: {leaks}" + + print("[smoke] OK: visibility enforced, content matches, server storage opaque") + print(f"[smoke] cleanup {tmp}") + shutil.rmtree(tmp, ignore_errors=True) + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except Exception: + traceback.print_exc() + sys.exit(1) diff --git a/demo/file_share_tui.py b/demo/file_share_tui.py new file mode 100644 index 0000000..557459b --- /dev/null +++ b/demo/file_share_tui.py @@ -0,0 +1,1003 @@ +#!/usr/bin/env python3 +""" +ZKAC file-share demo Textual TUI. + +Run the headless server in another terminal:: + + uv run python demo/file_share_server.py --port 9879 + +Then start the Textual app:: + + uv run python demo/file_share_tui.py +""" + +from __future__ import annotations + +import base64 +import json +import os +import shutil +import subprocess +import sys +import time +import traceback +from pathlib import Path + +import zkac +from textual import events +from textual.app import App, ComposeResult +from textual.containers import Horizontal, Vertical +from textual.screen import ModalScreen +from textual.widgets import Button, Footer, Header, Input, Label, Static, TextArea + +import file_share_client as fsc +import file_share_credentials as fscred +import zkac_cli_adapter as cli + + +class PromptScreen(ModalScreen[str | None]): + AUTO_FOCUS = "#prompt_input" + BINDINGS = [ + ("escape", "cancel", "Cancel"), + ("ctrl+q", "quit_app", "Quit"), + ("q", "quit_app", "Quit"), + ("ctrl+c", "quit_app", "Quit"), + ] + + def __init__(self, label: str, default: str = "") -> None: + super().__init__() + self._label = label + self._default = default + + def compose(self) -> ComposeResult: + with Vertical(id="prompt_dialog"): + yield Label(self._label, id="prompt_label") + yield Input(value=self._default, id="prompt_input") + with Horizontal(id="prompt_buttons"): + yield Button("OK", id="ok", variant="primary") + yield Button("Cancel", id="cancel") + + def on_mount(self) -> None: + # Defer focus until after layout settles so typing works immediately. + self.call_after_refresh(self._focus_prompt_input) + self.set_timer(0.05, self._focus_prompt_input) + + def _focus_prompt_input(self) -> None: + self.query_one("#prompt_input", Input).focus() + + def _move_button_focus(self, step: int) -> bool: + focused = self.focused + if not isinstance(focused, Button): + return False + buttons = list(self.query("#prompt_buttons Button")) + if len(buttons) < 2 or focused not in buttons: + return False + idx = buttons.index(focused) + buttons[(idx + step) % len(buttons)].focus() + return True + + def on_key(self, event) -> None: # noqa: ANN001 + if event.key == "left" and self._move_button_focus(-1): + event.stop() + return + if event.key == "right" and self._move_button_focus(1): + event.stop() + return + if event.key == "down" and isinstance(self.focused, Input): + buttons = list(self.query("#prompt_buttons Button")) + if buttons: + buttons[0].focus() + event.stop() + return + if event.key == "up" and isinstance(self.focused, Button): + self.query_one("#prompt_input", Input).focus() + event.stop() + + def action_cancel(self) -> None: + self.dismiss(None) + + def action_quit_app(self) -> None: + self.app.exit() + + def on_input_submitted(self, event: Input.Submitted) -> None: + self.dismiss(event.value.strip()) + + def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "ok": + value = self.query_one("#prompt_input", Input).value.strip() + self.dismiss(value) + return + self.dismiss(None) + + +class ConfirmScreen(ModalScreen[bool]): + AUTO_FOCUS = "#yes" + BINDINGS = [ + ("escape", "cancel", "Cancel"), + ("ctrl+q", "quit_app", "Quit"), + ("q", "quit_app", "Quit"), + ("ctrl+c", "quit_app", "Quit"), + ] + + def __init__(self, label: str) -> None: + super().__init__() + self._label = label + + def compose(self) -> ComposeResult: + with Vertical(id="prompt_dialog"): + yield Label(self._label, id="prompt_label") + with Horizontal(id="prompt_buttons"): + yield Button("Yes", id="yes", variant="primary") + yield Button("No", id="no") + + def on_mount(self) -> None: + self.query_one("#yes", Button).focus() + + def _move_button_focus(self, step: int) -> bool: + focused = self.focused + if not isinstance(focused, Button): + return False + buttons = list(self.query("#prompt_buttons Button")) + if len(buttons) < 2 or focused not in buttons: + return False + idx = buttons.index(focused) + buttons[(idx + step) % len(buttons)].focus() + return True + + def on_key(self, event) -> None: # noqa: ANN001 + if event.key == "left" and self._move_button_focus(-1): + event.stop() + return + if event.key == "right" and self._move_button_focus(1): + event.stop() + + def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(event.button.id == "yes") + + def action_cancel(self) -> None: + self.dismiss(False) + + def action_quit_app(self) -> None: + self.app.exit() + + +class FileShareApp(App[None]): + TITLE = "ZKAC File Share" + BINDINGS = [ + ("ctrl+q", "quit_app", "Quit"), + ("q", "quit_app", "Quit"), + ("ctrl+c", "quit_app", "Quit"), + ("c", "copy_contact", "Copy Contact"), + ] + CSS = """ + #root { + padding: 1 2; + } + #status { + border: round $accent; + padding: 0 1; + height: 4; + margin-bottom: 1; + } + #actions { + layout: grid; + grid-size: 8 1; + grid-gutter: 1 0; + height: auto; + margin-bottom: 1; + } + #actions Button { + width: 1fr; + height: 3; + text-align: center; + content-align: center middle; + } + #log { + border: round $panel; + } + #prompt_dialog { + width: 70; + height: auto; + padding: 1 2; + border: round $accent; + background: $surface; + } + #prompt_label { + margin-bottom: 1; + } + #prompt_buttons { + margin-top: 1; + height: auto; + } + """ + + def __init__(self) -> None: + super().__init__() + self.userid: str | None = None + self.identity: cli.Identity | None = None + self.server: str | None = None + self.server_pk_hex: str | None = None + self.current_bucket_id: str | None = None + self.listener: cli.P2PListener | None = None + self._action_running = False + self._log_lines: list[str] = [] + self._last_clipboard_error = "" + self._last_clipboard_backend = "" + self._last_contact_bundle = "" + self._server_key_maybe_stale_for: str | None = None + + def compose(self) -> ComposeResult: + yield Header(show_clock=True) + with Vertical(id="root"): + yield Static(id="status") + with Vertical(id="actions"): + yield Button("Login", id="login", variant="primary") + yield Button("Connect", id="connect") + yield Button("Select Bucket", id="bucket") + yield Button("Permissions", id="permissions") + yield Button("Share Permissions", id="share") + yield Button("Listen", id="listen") + yield Button("Inbox", id="inbox") + yield Button("Quit", id="quit", variant="error") + yield TextArea("", id="log", read_only=True) + yield Footer() + + def on_mount(self) -> None: + self._refresh_status() + self.write_log("Textual file-share UI ready.") + self.write_log(f"Demo ZKAC_HOME: {cli.zkac_home()}") + self.write_log("Use the action buttons to run flows.") + self.set_interval(0.8, self._poll_listener) + self._update_actions_layout() + self.query_one("#login", Button).focus() + + def on_resize(self, _event: events.Resize) -> None: + self._update_actions_layout() + + def on_unmount(self) -> None: + if self.listener: + self.listener.stop() + + def write_log(self, msg: str) -> None: + self._log_lines.extend(msg.splitlines() or [""]) + log_view = self.query_one("#log", TextArea) + log_view.load_text("\n".join(self._log_lines)) + if not isinstance(self.focused, TextArea): + log_view.scroll_end(animate=False) + + def _refresh_status(self) -> None: + listener_state = "stopped" + if self.listener and self.listener.is_running(): + listener_state = f"listening on {self.listener.address}" + elif self.listener: + listener_state = "stopped (last)" + status = ( + f"user: {self.userid or '-'}\n" + f"server: {self.server or '-'}\n" + f"bucket: {self.current_bucket_id or '-'}\n" + f"p2p-listen: {listener_state}" + ) + self.query_one("#status", Static).update(status) + + def _update_actions_layout(self) -> None: + actions = self.query_one("#actions") + if self.size.width >= 110: + actions.styles.grid_size_columns = 8 + actions.styles.grid_size_rows = 1 + else: + actions.styles.grid_size_columns = 4 + actions.styles.grid_size_rows = 2 + + def _move_action_focus(self, step: int) -> bool: + focused = self.focused + if not isinstance(focused, Button): + return False + buttons = list(self.query("#actions Button")) + if not buttons or focused not in buttons: + return False + idx = buttons.index(focused) + buttons[(idx + step) % len(buttons)].focus() + return True + + def on_key(self, event) -> None: # noqa: ANN001 + # Never intercept keys while a modal prompt/confirm screen is active. + if self.screen is not self: + return + if event.key == "left" and self._move_action_focus(-1): + event.stop() + return + if event.key == "right" and self._move_action_focus(1): + event.stop() + return + if event.key == "down" and isinstance(self.focused, Button): + self.query_one("#log", TextArea).focus() + event.stop() + return + if event.key == "up" and isinstance(self.focused, TextArea): + self.query_one("#login", Button).focus() + event.stop() + + def action_quit_app(self) -> None: + self.exit() + + async def ask(self, label: str, default: str = "") -> str | None: + return await self.push_screen_wait(PromptScreen(label, default)) + + async def confirm(self, label: str) -> bool: + return await self.push_screen_wait(ConfirmScreen(label)) + + def _require_user(self) -> cli.Identity: + if not self.userid or not self.identity: + raise RuntimeError("no current user; click Login first") + return self.identity + + def _require_server(self) -> tuple[str, str]: + if not self.server or not self.server_pk_hex: + raise RuntimeError("no current server; click Connect first") + return self.server, self.server_pk_hex + + def _open_session(self, registry_id_hex: str, role_name: str) -> fsc.FileShareSession: + ident = self._require_user() + server, server_pk_hex = self._require_server() + if role_name == "__admin__": + credential = cli.load_admin_credential(self.userid or "", registry_id_hex) + role_id = zkac.admin_role_id() + else: + credential = cli.load_credential(self.userid or "", registry_id_hex, role_name) + role_id = zkac.role_id(role_name) + return fsc.open_session( + server, + server_pk_hex=server_pk_hex, + user_transport_secret=bytes.fromhex(ident.transport_secret_hex), + registry_id_hex=registry_id_hex, + role_id=role_id, + credential=credential, + ) + + async def _choose_index(self, title: str, options: list[str], default: int = 1) -> int: + if not options: + raise RuntimeError(f"no options for {title}") + self.write_log(f"[{title}]") + for i, opt in enumerate(options, 1): + self.write_log(f" {i}) {opt}") + raw = await self.ask(f"{title}: pick number", str(default)) + if raw is None: + raise RuntimeError("cancelled") + if not raw.isdigit(): + raise RuntimeError("selection must be a number") + idx = int(raw) - 1 + if not 0 <= idx < len(options): + raise RuntimeError("invalid selection") + return idx + + async def _pick_owned_registry(self) -> str: + regs = cli.registry_list(self.userid or "") + if not regs: + raise RuntimeError("you do not own any registries; create one in Registry") + if self.server: + regs = [r for r in regs if r["server"] == self.server] or regs + opts = [f"{r['registry_id'][:16]}... @ {r['server']} roles={r['roles']}" for r in regs] + idx = await self._choose_index("Owned registries", opts) + return regs[idx]["registry_id"] + + def _poll_listener(self) -> None: + if not self.listener or self.listener.is_running(): + return + result = self.listener.parse_received() + if result is not None: + self.write_log( + f"[listener] received credential: registry={result['registry_id'][:16]}... " + f"role={result['role']}" + ) + self.listener = None + self._refresh_status() + + def _log_exception(self, exc: BaseException) -> None: + self.write_log(f"error: {exc}") + msg = str(exc).lower() + if self.server and any(s in msg for s in ("auth failed", "handshake", "public key", "server key")): + self._server_key_maybe_stale_for = self.server + self.write_log( + "[hint] server key may be stale. Use Connect to update the pinned transport key." + ) + if "--debug" in sys.argv: + self.write_log(traceback.format_exc()) + + def on_button_pressed(self, event: Button.Pressed) -> None: + action = event.button.id or "" + if action == "quit": + self.exit() + return + if self._action_running: + self.write_log("Another action is still running; wait or press q to quit.") + return + self._action_running = True + self.run_worker(self._run_action(action), exclusive=True, thread=False) + + async def _run_action(self, action: str) -> None: + try: + if action == "login": + await self.menu_login() + elif action == "connect": + await self.menu_connect() + elif action == "bucket": + await self.menu_buckets() + elif action == "permissions": + await self._bucket_set_masks() + elif action == "share": + await self.menu_share() + elif action == "listen": + await self.menu_listen() + elif action == "inbox": + await self.menu_inbox() + self._refresh_status() + except Exception as exc: # noqa: BLE001 + self._log_exception(exc) + finally: + self._action_running = False + + async def menu_login(self) -> None: + users = cli.list_local_users() + userid = "" + if users: + self.write_log("[login] local users:") + for i, u in enumerate(users, 1): + self.write_log(f" {i}) {u}") + pick = await self.ask("Select number or type new userid") + if pick is None or not pick: + return + if pick.isdigit() and 1 <= int(pick) <= len(users): + userid = users[int(pick) - 1] + else: + userid = pick + else: + typed = await self.ask("No local users found. New userid") + if not typed: + return + userid = typed + if not cli.user_exists(userid): + ok = await self.confirm(f"Create user '{userid}' via zkac-node user create?") + if not ok: + return + cli.create_user(userid).raise_for_status() + self.write_log(f"[login] created user {userid}") + self.userid = userid + self.identity = cli.get_identity(userid, peer=None) + self.write_log(f"[login] logged in as {userid}") + self.write_log(f" issuance pk: {self.identity.issuance_pk_hex}") + self.write_log(f" transport pk: {self.identity.transport_pk_hex}") + + async def menu_identity(self) -> None: + ident = self._require_user() + peer = await self.ask("Optional peer host:port for contact bundle") + contact = cli.show_user_contact(ident.userid, peer=peer or None) + self.write_log("[identity]") + self.write_log(f" userid: {ident.userid}") + self.write_log(f" issuance pk: {ident.issuance_pk_hex}") + self.write_log(f" transport pk: {ident.transport_pk_hex}") + self.write_log(" contact bundle:") + self.write_log(f" {contact}") + self._last_contact_bundle = contact + if self._copy_to_clipboard(contact): + self.write_log( + f" copied contact bundle to clipboard via {self._last_clipboard_backend}" + ) + else: + self.write_log( + " warning: could not copy contact bundle to clipboard" + f" ({self._last_clipboard_error})" + ) + + async def menu_connect(self) -> None: + ident = self._require_user() + server = await self.ask("File-share server host:port", self.server or "127.0.0.1:9879") + if server is None or not server: + return + pinned = cli.load_pinned_server_key(ident.userid, server) + server_pk_hex: str + if pinned and self._server_key_maybe_stale_for != server: + server_pk_hex = pinned + self.write_log(f"[connect] reused pinned key for {server} ({server_pk_hex[:16]}...)") + else: + prompt = "Server transport public key hex" + if pinned: + prompt = "Server key may be stale; enter updated transport public key hex" + server_pk_hex = await self.ask(prompt, pinned or self.server_pk_hex or "") + if server_pk_hex is None or not server_pk_hex: + return + cli.server_pin(ident.userid, server, server_pk_hex).raise_for_status() + self.write_log(f"[connect] pinned {server} = {server_pk_hex[:16]}...") + self._server_key_maybe_stale_for = None + + self.server = server + self.server_pk_hex = server_pk_hex + + async def menu_buckets(self) -> None: + op = await self.ask("Select bucket: l=list/select, c=create new", "l") + if op is None: + return + if op == "c": + await self._bucket_create() + return + await self._bucket_select_from_all_access() + + async def _bucket_select_from_all_access(self) -> None: + ident = self._require_user() + server, _ = self._require_server() + inventory: dict[str, set[str]] = {} + + # Owned/admin buckets. + for reg in cli.registry_list(ident.userid): + if reg["server"] != server: + continue + rid = reg["registry_id"] + if not cli.is_registry_admin(ident.userid, rid): + continue + try: + with self._open_session(rid, "__admin__") as sess: + for bid in sess.bucket_list_owned(): + inventory.setdefault(bid, set()).add("owner") + except Exception as exc: # noqa: BLE001 + self.write_log(f"[bucket] skip admin bucket list for {rid[:16]}... ({exc})") + + # Buckets visible via role credentials. + for cred in cli.credentials_list(ident.userid): + rid = cred["registry_id"] + role = cred["role"] + try: + with self._open_session(rid, role) as sess: + for bid in sess.fs_buckets(): + inventory.setdefault(bid, set()).add(f"role:{role}") + except Exception as exc: # noqa: BLE001 + self.write_log( + f"[bucket] skip credential registry={rid[:16]}... role={role} ({exc})" + ) + + if not inventory: + self.write_log("[bucket] no owned or permitted buckets yet") + return + + bucket_ids = sorted(inventory.keys()) + options = [ + f"{bid} access={sorted(inventory[bid])}" + for bid in bucket_ids + ] + default = 1 + if self.current_bucket_id and self.current_bucket_id in bucket_ids: + default = bucket_ids.index(self.current_bucket_id) + 1 + idx = await self._choose_index("Buckets", options, default=default) + selected = bucket_ids[idx] + self.current_bucket_id = selected + if selected in fsc.list_manifests(ident.userid): + self.write_log(f"[bucket] selected {selected} (locally managed)") + else: + self.write_log(f"[bucket] selected {selected} (remote-only access)") + + async def _bucket_create(self) -> None: + ident = self._require_user() + rid = await self._pick_or_create_registry_for_bucket() + folder_str = await self.ask("Path to folder to share") + if folder_str is None or not folder_str: + return + folder = Path(folder_str).expanduser().resolve() + if not folder.is_dir(): + raise RuntimeError(f"not a directory: {folder}") + files = fsc.flatten_folder(folder) + if not files: + raise RuntimeError("no shareable files found (empty or hidden only)") + self.write_log(f"[bucket-create] flattened {len(files)} files") + for i, p in enumerate(files): + self.write_log(f" [{i:>3}] {p.relative_to(folder)}") + + roles_meta = next( + (r for r in cli.registry_list(ident.userid) if r["registry_id"] == rid), + None, + ) + if roles_meta is None: + raise RuntimeError("registry metadata not found locally") + masks: dict[str, str] = {} + for role in roles_meta["roles"]: + raw = await self.ask( + f"Mask for role '{role}' ({len(files)} bits)", + "1" * len(files), + ) + if raw is None: + return + masks[role] = fsc.normalize_mask(raw, len(files)) + + with self._open_session(rid, "__admin__") as sess: + self.write_log("[bucket-create] uploading encrypted blobs...") + manifest = fsc.upload_bucket( + sess, + folder, + server=self.server or "", + registry_id_hex=rid, + ) + manifest.role_masks = masks + with self._open_session(rid, "__admin__") as sess: + fsc.apply_role_masks_to_server(sess, manifest) + self._log_server_acl_summary(sess, manifest) + fsc.save_manifest(ident.userid, manifest) + self.current_bucket_id = manifest.bucket_id + self.write_log(f"[bucket-create] uploaded {manifest.bucket_id} ({len(manifest.files)} files)") + + async def _select_local_manifest(self) -> fsc.BucketManifest: + ident = self._require_user() + bids = fsc.list_manifests(ident.userid) + if not bids: + raise RuntimeError("no local bucket manifests") + options: list[str] = [] + for bid in bids: + man = fsc.load_manifest(ident.userid, bid) + options.append( + f"{bid[:16]}... files={len(man.files)} roles={sorted(man.role_masks.keys())}" + ) + default = 1 + if self.current_bucket_id and self.current_bucket_id in bids: + default = bids.index(self.current_bucket_id) + 1 + idx = await self._choose_index("Select bucket", options, default=default) + return fsc.load_manifest(ident.userid, bids[idx]) + + async def _bucket_set_masks(self) -> None: + ident = self._require_user() + manifest = await self._select_local_manifest() + self.write_log(f"[bucket-masks] {manifest.bucket_id} has {len(manifest.files)} files") + for i, f in enumerate(manifest.files): + self.write_log(f" [{i:>3}] {f.rel_path}") + roles = sorted(manifest.role_masks.keys() or []) + if not roles: + roles_raw = await self.ask("Comma-separated role names", "viewer,editor") + if not roles_raw: + return + roles = [r.strip() for r in roles_raw.split(",") if r.strip()] + for role in roles: + current = manifest.role_masks.get(role, "1" * len(manifest.files)) + new = await self.ask(f"Mask for role '{role}'", current) + if new is None: + return + manifest.role_masks[role] = fsc.normalize_mask(new, len(manifest.files)) + with self._open_session(manifest.registry_id_hex, "__admin__") as sess: + fsc.apply_role_masks_to_server(sess, manifest) + self._log_server_acl_summary(sess, manifest) + fsc.save_manifest(ident.userid, manifest) + self.current_bucket_id = manifest.bucket_id + self.write_log("[permissions] bucket permissions updated") + + async def _pick_or_create_registry_for_bucket(self) -> str: + ident = self._require_user() + server, _ = self._require_server() + regs = [r for r in cli.registry_list(ident.userid) if r["server"] == server] + if regs: + options = [ + f"use {r['registry_id'][:16]}... roles={r['roles']}" + for r in regs + ] + options.append("create new registry for this server") + idx = await self._choose_index("Bucket auth profile", options) + if idx < len(regs): + return regs[idx]["registry_id"] + roles_raw = await self.ask("No auth profile yet. Create roles", "viewer,editor") + if not roles_raw: + raise RuntimeError("roles required to create bucket auth profile") + roles = [r.strip() for r in roles_raw.split(",") if r.strip()] + if not roles: + raise RuntimeError("no roles provided") + rid = cli.registry_create(ident.userid, server, roles) + self.write_log(f"[bucket] created auth profile {rid} roles={roles}") + return rid + + async def menu_share(self) -> None: + ident = self._require_user() + manifest = await self._select_local_manifest() + if not manifest.role_masks: + raise RuntimeError("set role masks first from Buckets") + roles = sorted(manifest.role_masks.keys()) + idx = await self._choose_index("Share role", roles) + role = roles[idx] + contact = await self.ask("Recipient contact bundle") + if contact is None or not contact: + return + result = fscred.grant_role_p2p( + ident.userid, + manifest.server, + manifest.registry_id_hex, + role, + contact, + ) + self.write_log(f"[share] issued role '{role}' to peer {result['peer']}") + recipient_pk_hex = _parse_contact_issuance_pk(contact) + with self._open_session(manifest.registry_id_hex, "__admin__") as sess: + fsc.apply_role_masks_to_server(sess, manifest) + self._log_server_acl_summary(sess, manifest) + fsc.push_role_grant(sess, manifest, role, recipient_pk_hex) + fsc.save_manifest(ident.userid, manifest) + self.current_bucket_id = manifest.bucket_id + self.write_log("[share] uploaded anonymous role-grant envelope (no recipient identifier on server)") + + async def menu_listen(self) -> None: + ident = self._require_user() + if self.listener and self.listener.is_running(): + keep = await self.confirm( + f"Listener already running on {self.listener.address}. Stop it?" + ) + if keep: + self.listener.stop() + self.listener = None + else: + return + host = await self.ask("Bind host", "127.0.0.1") + if host is None or not host: + return + raw_port = await self.ask("Optional bind port (blank = random)", "") + if raw_port is None: + return + port = 0 + if raw_port.strip(): + if not raw_port.isdigit(): + raise RuntimeError("port must be a number or blank") + port = int(raw_port) + if not 1 <= port <= 65535: + raise RuntimeError("port out of range") + listener = cli.P2PListener(ident.userid, host=host, port=port) + listener.start() + time.sleep(0.3) + if not listener.is_running(): + self.write_log(f"[listen] listener exited:\n{listener.output()}") + return + contact = cli.show_user_contact(ident.userid, peer=listener.address) + self.listener = listener + self.write_log(f"[listen] listening on {listener.address} timeout={listener.timeout_s:.0f}s") + self.write_log("Share this contact bundle out-of-band:") + self.write_log(contact) + self._last_contact_bundle = contact + if self._copy_to_clipboard(contact): + self.write_log( + f"[listen] copied contact bundle to clipboard via {self._last_clipboard_backend}" + ) + else: + self.write_log( + "[listen] warning: could not copy contact bundle to clipboard" + f" ({self._last_clipboard_error})" + ) + + async def menu_inbox(self) -> None: + ident = self._require_user() + creds: list[dict[str, str]] = list(cli.credentials_list(ident.userid)) + for reg in cli.registry_list(ident.userid): + rid = reg["registry_id"] + if cli.is_registry_admin(ident.userid, rid): + creds.append({"registry_id": rid, "role": "__admin__"}) + if not creds: + self.write_log("[inbox] no local credentials; ask an admin to grant one") + return + unique: dict[tuple[str, str], dict[str, str]] = {} + for c in creds: + unique[(c["registry_id"], c["role"])] = c + creds = list(unique.values()) + opts = [f"registry={c['registry_id'][:16]}... role={c['role']}" for c in creds] + idx = await self._choose_index("Credential for inbox", opts) + cred = creds[idx] + with self._open_session(cred["registry_id"], cred["role"]) as sess: + who = sess.whoami() + self.write_log( + "[inbox] authenticated " + f"scope={who.get('auth_scope', 'unknown')} " + f"admin={bool(who.get('is_admin', False))}" + ) + if cred["role"] == "__admin__": + bids = sess.bucket_list_owned() + else: + bids = sess.fs_buckets() + if not bids: + self.write_log("[inbox] no accessible buckets") + return + self.write_log("[inbox] accessible buckets:") + for i, bid in enumerate(bids, 1): + self.write_log(f" {i}) {bid}") + pick = await self.ask("Download bucket number, 'all', or blank to skip", "1") + if pick is None or not pick: + return + if pick == "all": + targets = list(range(len(bids))) + elif pick.isdigit(): + targets = [int(pick) - 1] + else: + raise RuntimeError("invalid selection") + for ti in targets: + if not 0 <= ti < len(bids): + raise RuntimeError("bucket index out of range") + out_default = str(Path.cwd() / "fs_inbox") + out = await self.ask("Download root folder", out_default) + if out is None or not out: + return + out_root = Path(out).expanduser().resolve() + for ti in targets: + bid = bids[ti] + bucket_dir = out_root / bid + if cred["role"] == "__admin__": + result = self._download_owned_bucket_as_admin( + sess=sess, + userid=ident.userid, + bucket_id=bid, + output_dir=bucket_dir, + ) + else: + result = fsc.download_bucket( + sess, + bid, + issuance_secret_hex=ident.issuance_secret_hex, + output_dir=bucket_dir, + ) + self.write_log( + f"[inbox] downloaded {bid} -> {bucket_dir} " + f"({len(result['files_written'])} files)" + ) + + def _download_owned_bucket_as_admin( + self, + *, + sess: fsc.FileShareSession, + userid: str, + bucket_id: str, + output_dir: Path, + ) -> dict: + manifest = fsc.load_manifest(userid, bucket_id) + output_dir.mkdir(parents=True, exist_ok=True) + written: list[str] = [] + for entry in manifest.files: + ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks] + plaintext = fsc.decrypt_file_from_blobs(entry, ciphertexts) + out_path = output_dir / entry.rel_path + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_bytes(plaintext) + written.append(str(out_path)) + return {"files_written": written} + + def _log_server_acl_summary(self, sess: fsc.FileShareSession, manifest: fsc.BucketManifest) -> None: + for role_name, raw_mask in sorted(manifest.role_masks.items()): + expected = sum(1 for bit in fsc.normalize_mask(raw_mask, len(manifest.files)) if bit == "1") + role_id_hex = zkac.role_id(role_name).hex() + acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex) + allowed_blob_ids = list(acl_meta.get("allowed_blob_ids", [])) + acl_version = int(acl_meta.get("version", 0)) + self.write_log( + f"[permissions] server ACL role={role_name} " + f"version={acl_version} files(expected)={expected} blobs(server)={len(allowed_blob_ids)}" + ) + + def _copy_to_clipboard(self, text: str) -> bool: + text = text.strip() + if not text: + self._last_clipboard_error = "empty text" + self._last_clipboard_backend = "" + return False + is_st = os.environ.get("TERM", "").startswith("st") + + # In st, prefer explicit system clipboard tools; Textual clipboard often reports + # success but doesn't reach the desktop clipboard. + if not is_st: + try: + self.copy_to_clipboard(text) + self._last_clipboard_error = "" + self._last_clipboard_backend = "textual" + return True + except Exception: + self._last_clipboard_error = "Textual clipboard unavailable" + + def _run_clip(cmd: list[str], payload: str) -> subprocess.CompletedProcess[str] | None: + try: + return subprocess.run( + cmd, + input=payload, + text=True, + capture_output=True, + timeout=0.8, + ) + except subprocess.TimeoutExpired: + self._last_clipboard_error = f"{cmd[0]} timed out" + return None + except Exception as exc: + self._last_clipboard_error = f"{cmd[0]} failed: {exc}" + return None + + # Wayland + if shutil.which("wl-copy") and os.environ.get("WAYLAND_DISPLAY"): + p = _run_clip(["wl-copy"], text) + if p and p.returncode == 0: + self._last_clipboard_error = "" + self._last_clipboard_backend = "wl-copy" + return True + if p: + self._last_clipboard_error = (p.stderr or p.stdout or "wl-copy failed").strip() + + # X11: prefer xsel first (often exits faster), then xclip. + if shutil.which("xsel") and os.environ.get("DISPLAY"): + p1 = _run_clip(["xsel", "--clipboard", "--input"], text) + p2 = _run_clip(["xsel", "--primary", "--input"], text) + if (p1 and p1.returncode == 0) or (p2 and p2.returncode == 0): + self._last_clipboard_error = "" + self._last_clipboard_backend = "xsel" + return True + if p1 or p2: + self._last_clipboard_error = ( + (p1.stderr if p1 else "") + or (p2.stderr if p2 else "") + or (p1.stdout if p1 else "") + or (p2.stdout if p2 else "") + or "xsel failed" + ).strip() + + if shutil.which("xclip") and os.environ.get("DISPLAY"): + p1 = _run_clip(["xclip", "-selection", "clipboard", "-in"], text) + p2 = _run_clip(["xclip", "-selection", "primary", "-in"], text) + if (p1 and p1.returncode == 0) or (p2 and p2.returncode == 0): + self._last_clipboard_error = "" + self._last_clipboard_backend = "xclip" + return True + if p1 or p2: + self._last_clipboard_error = ( + (p1.stderr if p1 else "") + or (p2.stderr if p2 else "") + or (p1.stdout if p1 else "") + or (p2.stdout if p2 else "") + or "xclip failed" + ).strip() + + if shutil.which("pbcopy"): + p = _run_clip(["pbcopy"], text) + if p and p.returncode == 0: + self._last_clipboard_error = "" + self._last_clipboard_backend = "pbcopy" + return True + if p: + self._last_clipboard_error = (p.stderr or p.stdout or "pbcopy failed").strip() + + # Final fallback for terminals that support OSC52 clipboard control. + try: + encoded = base64.b64encode(text.encode("utf-8")).decode("ascii") + sys.__stdout__.write(f"\033]52;c;{encoded}\a") + sys.__stdout__.flush() + self._last_clipboard_error = "" + self._last_clipboard_backend = "osc52" + return True + except Exception as exc: + self._last_clipboard_error = f"OSC52 failed: {exc}" + self._last_clipboard_backend = "" + + if is_st: + self._last_clipboard_error = ( + "st detected; install xsel/xclip and ensure DISPLAY is set" + ) + return False + + def action_copy_contact(self) -> None: + if not self._last_contact_bundle: + self.write_log("[copy-contact] no contact bundle generated yet") + return + if self._copy_to_clipboard(self._last_contact_bundle): + self.write_log( + f"[copy-contact] copied latest contact bundle via {self._last_clipboard_backend}" + ) + return + self.write_log( + "[copy-contact] failed to copy latest contact bundle" + f" ({self._last_clipboard_error})" + ) + + +def _parse_contact_issuance_pk(bundle: str) -> str: + s = bundle.strip() + raw = base64.urlsafe_b64decode((s + "=" * (-len(s) % 4)).encode()) + data = json.loads(raw.decode("utf-8")) + pk = data.get("issuance_pk_hex") + if not isinstance(pk, str) or len(pk) != 64: + raise RuntimeError("contact bundle missing issuance_pk_hex") + return pk + + +def main() -> None: + FileShareApp().run() + + +if __name__ == "__main__": + main() diff --git a/demo/test_demo_privacy_guardrails.py b/demo/test_demo_privacy_guardrails.py new file mode 100644 index 0000000..20f65ce --- /dev/null +++ b/demo/test_demo_privacy_guardrails.py @@ -0,0 +1,192 @@ +import json +import sys +import threading +from pathlib import Path + +import zkac + +DEMO_DIR = Path(__file__).resolve().parent +if str(DEMO_DIR) not in sys.path: + sys.path.insert(0, str(DEMO_DIR)) + +import file_share_client as fsc # noqa: E402 +import file_share_server as fss # noqa: E402 + + +def _make_credential() -> tuple[bytes, zkac.Credential]: + issuer = zkac.BbsIssuer() + pk = issuer.public_key() + role_id = zkac.role_id("viewer") + req = zkac.prepare_blind_request() + blind_sig = issuer.issue_blind(req.commitment_with_proof(), role_id, 1) + cred = zkac.Credential.finalize( + blind_sig, + req.member_secret(), + req.prover_blind(), + role_id, + 1, + pk, + ) + return role_id, cred + + +def test_open_session_fs_hello_contains_only_proof(monkeypatch): + role_id, credential = _make_credential() + sent_payloads: list[dict] = [] + + class _FakeSocket: + def settimeout(self, _value): + return None + + def close(self): + return None + + class _FakeHandshakeSession: + def transcript_hash(self) -> bytes: + return b"\x11" * 32 + + class _FakeFramedSession: + def __init__(self, _sock, _session): + pass + + def send(self, payload: bytes) -> None: + sent_payloads.append(json.loads(payload.decode("utf-8"))) + + def recv(self) -> bytes: + return b'{"ok": true, "status": "authenticated"}' + + monkeypatch.setattr(fsc.socket, "create_connection", lambda *_args, **_kwargs: _FakeSocket()) + monkeypatch.setattr(fsc, "client_handshake_anon", lambda *_args, **_kwargs: _FakeHandshakeSession()) + monkeypatch.setattr(fsc, "FramedSession", _FakeFramedSession) + + sess = fsc.open_session( + "127.0.0.1:9879", + server_pk_hex=zkac.Keypair().public_key().to_bytes().hex(), + user_transport_secret=bytes(zkac.Keypair().secret_key_bytes()), + registry_id_hex="00" * 32, + role_id=role_id, + credential=credential, + ) + sess.close() + + assert len(sent_payloads) == 1 + hello = sent_payloads[0] + assert set(hello.keys()) == {"op", "bbs_auth_b64"} + assert hello["op"] == "fs" + assert isinstance(hello["bbs_auth_b64"], str) and hello["bbs_auth_b64"] + + +def test_handle_conn_error_log_never_includes_peer_endpoint(monkeypatch, tmp_path, capsys): + class _DummyConn: + def settimeout(self, _value): + return None + + def close(self): + return None + + def _raise_handshake(_conn, _node): + raise RuntimeError("boom") + + monkeypatch.setattr(fss, "server_handshake_anon", _raise_handshake) + + slots = threading.BoundedSemaphore(1) + assert slots.acquire(blocking=False) + + fss._handle_conn( + _DummyConn(), + ("203.0.113.44", 4242), + zkac.Node(zkac.Keypair()), + zkac.RegistryManager(), + fss._FileShareStore(tmp_path), + "", + 5.0, + slots, + ) + + out = capsys.readouterr().out + assert "[fs-server] connection error (RuntimeError)" in out + assert "203.0.113.44" not in out + assert ":4242" not in out + + +def test_bucket_metadata_uses_opaque_tags(tmp_path): + store = fss._FileShareStore(tmp_path) + store.set_privacy_key(b"privacy-key-32-bytes-minimum-seed") + registry_id = "aa" * 32 + role_id = "bb" * 32 + bucket_id = "cc" * 16 + blob_id = "dd" * 16 + + store.bucket_create(bucket_id, registry_id) + meta = store.bucket_meta(bucket_id) + assert "owner_registry_id" not in meta + assert isinstance(meta.get("owner_registry_tag"), str) + assert len(meta["owner_registry_tag"]) == 64 + + store.bucket_set_role_acl(bucket_id, registry_id, role_id, [blob_id]) + meta2 = store.bucket_meta(bucket_id) + acl_keys = list(meta2.get("role_acl", {}).keys()) + assert acl_keys and acl_keys[0] != role_id + assert all(len(k) == 64 for k in acl_keys) + + store.bucket_put_role_grant(bucket_id, registry_id, role_id, 1, "eph", "ct") + grants_root = tmp_path / "buckets" / bucket_id / "role_grants" + assert (grants_root / role_id).exists() is False + role_dirs = [p.name for p in grants_root.iterdir() if p.is_dir()] + assert role_dirs and all(len(d) == 64 for d in role_dirs) + + +def test_auth_scan_does_not_return_early(): + class _Mgr: + def __init__(self): + self.admin_checks: list[str] = [] + self.role_checks: list[tuple[str, str]] = [] + + def verify_admin(self, rid: bytes, _proof: bytes, _th: bytes) -> bool: + self.admin_checks.append(rid.hex()) + return rid.hex() == ("11" * 32) + + def verify_presentation(self, rid: bytes, role_id: bytes, _proof: bytes, _th: bytes) -> bool: + self.role_checks.append((rid.hex(), role_id.hex())) + return False + + class _Store: + def list_registry_ids(self): + return ["11" * 32, "22" * 32] + + def role_ids_for_registry(self, rid_hex: str): + if rid_hex == "11" * 32: + return ["33" * 32] + return ["44" * 32] + + def _registry_tag(self, rid_hex: str): + return "r-" + rid_hex[:8] + + def _role_tag(self, role_hex: str): + return "k-" + role_hex[:8] + + mgr = _Mgr() + auth = fss._authenticate_fs_identity(mgr, _Store(), b"proof", b"nonce") + assert auth is not None and auth["is_admin"] is True + # Both registries must be checked even though first one matched admin. + assert mgr.admin_checks == ["11" * 32, "22" * 32] + # Role checks also run for all known role candidates. + assert mgr.role_checks == [("11" * 32, "33" * 32), ("22" * 32, "44" * 32)] + + +def test_dispatch_whoami_does_not_expose_registry_or_role(): + resp = fss._dispatch_fs( + {"cmd": "whoami"}, + store=None, # not used by whoami branch + ctx={ + "registry_id_hex": "11" * 32, + "registry_tag": "r-tag", + "role_id_hex": "22" * 32, + "role_tag": "k-tag", + "is_admin": False, + }, + ) + assert resp["ok"] is True + assert "registry_id" not in resp + assert "role_id" not in resp + assert resp["auth_scope"] == "credential" diff --git a/demo/zkac_admin_serve.py b/demo/zkac_admin_serve.py deleted file mode 100644 index 6a232a5..0000000 --- a/demo/zkac_admin_serve.py +++ /dev/null @@ -1,322 +0,0 @@ -#!/usr/bin/env python3 -""" -HTTP admin debug dashboard for ``zkac-node serve``. - -Runs the ZKAC TCP node in a background thread and serves a read-only web UI on -another port. Intended for loopback + I2P server-tunnel forwarding. - -**Security:** This page is fully transparent (registry metadata, live sessions, -public keys). Do not expose it to untrusted networks without tunnel ACLs. - -Usage:: - - uv sync --extra demo - uv run python demo/zkac_admin_serve.py alice \\ - --node-host 127.0.0.1 --node-port 9800 \\ - --web-host 127.0.0.1 --web-port 8766 -""" - -from __future__ import annotations - -import argparse -import json -import os -import sys -import threading -import time -from pathlib import Path - -from flask import Flask, Response, jsonify, render_template_string, request - -from zkac_cli.paths import user_dir -from zkac_cli.server import serve -from zkac_cli.server_debug import ( - ServerDebugState, - collect_registry_debug, - data_dir_tree, - server_key_meta, -) - -_PAGE = """ - - - - - {% if refresh_s %} - - {% endif %} - ZKAC node — admin debug - - - -

ZKAC node — admin debug

-

- User {{ snap.userid }} · TCP - {{ snap.listen.host }}:{{ snap.listen.port }} - · uptime {{ snap.uptime_s }}s - {% if refresh_s %} - · auto-refresh {{ refresh_s }}s - {% endif %} -

- -
-
-

Status

- - - - - - - -
Data directory{{ snap.data_dir }}
Started (wall){{ snap.started_wall }}
Server public key{{ snap.server_public_key_hex or "—" }}
Registries (boot){{ snap.registries_loaded_boot }}
Active TCP sessions - {{ snap.active_connection_count }} live -
Processpid={{ proc.pid }} threads={{ proc.threads }}
-
- -
-

Server key file

-
{{ sk_meta | tojson(indent=2) }}
-
- -
-

Registries (on disk + parsed)

- {% if reg %} - - - - - - - - - {% for r in reg %} - - - - - - - - {% endfor %} -
ID (file)versionstate hashbytes (state / cert)parse
{{ r.file_registry_id_hex[:16] }}…{{ r.get("version", "—") }}{% if r.get("state_hash_hex") %}{{ r.state_hash_hex[:24] }}…{% else %}—{% endif %}{{ r.state_bytes }} / {{ r.cert_bytes }}{% if r.parsed_ok %}ok{% else %}fail{% endif %}
- {% else %} -

No registry state files yet.

- {% endif %} -
- -
-

Session connections (live)

- {% if snap.active_connections %} - - - - - - {% for c in snap.active_connections %} - - - - - - - - - - - - {% endfor %} -
idpeerphaseoptranscriptauth registryrolemgmt #echo bytes
{{ c.id }}{{ c.peer }}{{ c.phase }}{{ c.hello_op or "—" }}{% if c.transcript_hash_hex %}{{ c.transcript_hash_hex[:20] }}…{% else %}—{% endif %}{% if c.auth_registry_hex %}{{ c.auth_registry_hex[:16] }}…{% else %}—{% endif %}{% if c.auth_role_hex %}{{ c.auth_role_hex[:16] }}…{% else %}—{% endif %}{{ c.mgmt_commands or 0 }}{{ c.bytes_echoed or 0 }}
- {% else %} -

No active connections.

- {% endif %} -
- -
-

Recent connections

- {% if snap.recent_connections %} - - - - - {% for c in snap.recent_connections %} - - - - - - - - - {% endfor %} -
idpeerphasemgmt #echo byteserror
{{ c.id }}{{ c.peer }}{{ c.phase }}{{ c.mgmt_commands or 0 }}{{ c.bytes_echoed or 0 }}{{ c.error or "—" }}
- {% else %} -

No recent disconnects recorded.

- {% endif %} -
- -
-

Data directory tree (debug)

-
{{ files | tojson(indent=2) }}
-
- -
-

Full debug JSON

-

Machine-readable snapshot (same as /api/debug.json).

-
{{ full_json }}
-
-
- - - - -""" - - -def _full_payload(debug: ServerDebugState, data_dir: Path) -> dict: - snap = debug.snapshot() - reg = collect_registry_debug(data_dir) - sk = server_key_meta(data_dir) - files = data_dir_tree(data_dir) - return { - "snapshot": snap, - "server_key_file": sk, - "registries": reg, - "data_dir_files": files, - "process": {"pid": os.getpid(), "threads": threading.active_count()}, - "generated_wall": time.time(), - } - - -def create_app(debug: ServerDebugState, data_dir: Path) -> Flask: - app = Flask(__name__) - - @app.get("/") - def index(): - refresh = request.args.get("refresh", "").strip() - refresh_s = int(refresh) if refresh.isdigit() and 1 <= int(refresh) <= 60 else None - data = _full_payload(debug, data_dir) - snap = data["snapshot"] - proc = data["process"] - full_json = json.dumps(data, indent=2, sort_keys=True) - return render_template_string( - _PAGE, - snap=snap, - reg=data["registries"], - sk_meta=data["server_key_file"], - files=data["data_dir_files"], - full_json=full_json, - proc=proc, - refresh_s=refresh_s, - ) - - @app.get("/api/debug.json") - def api_debug(): - return jsonify(_full_payload(debug, data_dir)) - - @app.get("/healthz") - def healthz(): - return Response("ok", mimetype="text/plain") - - return app - - -def main() -> None: - p = argparse.ArgumentParser(description="ZKAC node TCP + HTTP admin debug dashboard") - p.add_argument("userid", help="user whose ~/.zkac//server/ holds node state") - p.add_argument("--data-dir", default=None, help="override server data directory") - p.add_argument("--node-host", default="127.0.0.1") - p.add_argument("--node-port", type=int, default=9800) - p.add_argument("--web-host", default="127.0.0.1") - p.add_argument("--web-port", type=int, default=8766) - args = p.parse_args() - - data_dir = Path(args.data_dir) if args.data_dir else user_dir(args.userid) / "server" - data_dir = data_dir.resolve() - debug = ServerDebugState(userid=args.userid, data_dir=str(data_dir)) - - t = threading.Thread( - target=serve, - kwargs={ - "data_dir": str(data_dir), - "host": args.node_host, - "port": args.node_port, - "debug": debug, - }, - name="zkac-serve", - daemon=True, - ) - t.start() - time.sleep(0.15) - if not t.is_alive(): - print("ZKAC node thread died on startup; check stderr above.", file=sys.stderr) - sys.exit(1) - - app = create_app(debug, data_dir) - print(f"ZKAC TCP node: {args.node_host}:{args.node_port} (data {data_dir})") - print(f"Admin debug UI: http://{args.web_host}:{args.web_port}/") - print("Warning: admin UI exposes live sessions and registry metadata.", file=sys.stderr) - app.run(host=args.web_host, port=args.web_port, debug=False, threaded=True) - - -if __name__ == "__main__": - main() diff --git a/demo/zkac_cli_adapter.py b/demo/zkac_cli_adapter.py new file mode 100644 index 0000000..ee9043f --- /dev/null +++ b/demo/zkac_cli_adapter.py @@ -0,0 +1,420 @@ +""" +Subprocess adapter around the existing ``zkac-node`` CLI. + +The demo deliberately does not import ``cli/zkac_cli/*``: every interaction +with identity, registry, grant or p2p-listen flows runs through the CLI as a +black box, exactly the way a third-party application would integrate. + +The only direct filesystem touch is reading the CLI-managed ``identity.json`` +under ``ZKAC_HOME`` (default ``~/.ZKAC-FS//`` for this demo). That file is the CLI's +public on-disk format and is needed locally to obtain the user's transport and +issuance secrets for the file-share session and role-grant decryption. +""" + +from __future__ import annotations + +import base64 +import hashlib +import json +import os +import re +import shutil +import subprocess +import sys +import threading +from dataclasses import dataclass +from pathlib import Path + +import zkac + +ROOT = Path(__file__).resolve().parents[1] +CLI_DIR = ROOT / "cli" + +DEFAULT_TIMEOUT_S = 60.0 +DEFAULT_LISTEN_TIMEOUT_S = 600.0 +DEFAULT_DEMO_ZKAC_HOME = Path.home() / ".ZKAC-FS" + + +# ── helpers ────────────────────────────────────────────────────────── + +def _b64_decode(s: str) -> bytes: + return base64.b64decode(s) + + +def zkac_home() -> Path: + return Path(os.environ.get("ZKAC_HOME", DEFAULT_DEMO_ZKAC_HOME)) + + +def user_identity_path(userid: str) -> Path: + return zkac_home() / userid / "identity.json" + + +def user_exists(userid: str) -> bool: + return user_identity_path(userid).is_file() + + +def list_local_users() -> list[str]: + home = zkac_home() + if not home.is_dir(): + return [] + return sorted(d.name for d in home.iterdir() if (d / "identity.json").is_file()) + + +def _zkac_invocation() -> tuple[list[str], dict[str, str]]: + """Resolve a runnable ``zkac-node`` command, falling back to the source tree.""" + exe = shutil.which("zkac-node") + if exe: + return [exe], {} + return [sys.executable, "-m", "zkac_cli.main"], {"PYTHONPATH": str(CLI_DIR)} + + +@dataclass +class CliResult: + rc: int + stdout: str + stderr: str + + def ok(self) -> bool: + return self.rc == 0 + + def raise_for_status(self) -> "CliResult": + if self.rc != 0: + raise RuntimeError( + f"zkac-node failed (rc={self.rc}): {self.stderr.strip() or self.stdout.strip()}" + ) + return self + + +def run_cli( + args: list[str], + *, + timeout_s: float = DEFAULT_TIMEOUT_S, + extra_env: dict[str, str] | None = None, +) -> CliResult: + prefix, env_overrides = _zkac_invocation() + env = { + **os.environ, + **env_overrides, + "ZKAC_HOME": str(zkac_home()), + **(extra_env or {}), + } + try: + p = subprocess.run( + prefix + args, + capture_output=True, + text=True, + timeout=timeout_s, + cwd=str(ROOT), + env=env, + ) + except subprocess.TimeoutExpired as exc: + return CliResult(rc=124, stdout=exc.stdout or "", stderr=f"timed out after {timeout_s}s") + return CliResult(rc=p.returncode, stdout=p.stdout, stderr=p.stderr) + + +# ── identity ───────────────────────────────────────────────────────── + +@dataclass +class Identity: + userid: str + issuance_pk_hex: str + issuance_secret_hex: str + transport_pk_hex: str + transport_secret_hex: str + grant_token_b64: str + contact_bundle: str # value of ``zkac-node user show --peer ...`` + + +def load_identity_secrets(userid: str) -> dict[str, str]: + """Read the CLI-managed ``identity.json`` (creator: ``zkac-node user create``).""" + path = user_identity_path(userid) + if not path.is_file(): + raise FileNotFoundError(f"unknown user {userid!r} (run: zkac-node user create {userid})") + data = json.loads(path.read_text()) + + def hexed(b64_field: str) -> str: + return _b64_decode(data[b64_field]).hex() + + return { + "issuance_pk_hex": hexed("issuance_public_b64"), + "issuance_secret_hex": hexed("issuance_secret_b64"), + "transport_pk_hex": hexed("transport_public_b64"), + "transport_secret_hex": hexed("transport_secret_b64"), + "grant_token_b64": data["grant_token_b64"], + } + + +def create_user(userid: str) -> CliResult: + return run_cli(["user", "create", userid]) + + +def show_user_contact(userid: str, peer: str | None = None) -> str: + args = ["user", "show", userid] + if peer: + args += ["--peer", peer] + res = run_cli(args).raise_for_status() + for line in res.stdout.splitlines(): + line = line.strip() + # contact bundle is the indented blob after "share contact:" + if line and not line.startswith(("user:", "share contact:", "issuance pk:", "p2p transport pk:", "(", "registries owned:", "credentials:", "contact peer endpoint:", "owner admin")): + if re.fullmatch(r"[A-Za-z0-9_\-]+", line): + return line + raise RuntimeError("could not parse contact bundle from `zkac-node user show`") + + +def get_identity(userid: str, peer: str | None = None) -> Identity: + secrets = load_identity_secrets(userid) + contact = show_user_contact(userid, peer=peer) + return Identity( + userid=userid, + issuance_pk_hex=secrets["issuance_pk_hex"], + issuance_secret_hex=secrets["issuance_secret_hex"], + transport_pk_hex=secrets["transport_pk_hex"], + transport_secret_hex=secrets["transport_secret_hex"], + grant_token_b64=secrets["grant_token_b64"], + contact_bundle=contact, + ) + + +# ── registry / pinning / grants ────────────────────────────────────── + +def server_pin(userid: str, server: str, server_pk_hex: str) -> CliResult: + return run_cli(["server", "pin", userid, server, "--key", server_pk_hex]) + + +def load_pinned_server_key(userid: str, server: str) -> str | None: + digest = hashlib.sha256(server.encode("utf-8")).hexdigest() + pin_file = zkac_home() / userid / "servers" / f"sha256_{digest}.json" + if not pin_file.is_file(): + return None + data = json.loads(pin_file.read_text()) + server_pk_b64 = data.get("server_public_key_b64") + if not isinstance(server_pk_b64, str): + return None + return _b64_decode(server_pk_b64).hex() + + +def registry_create(userid: str, server: str, roles: list[str]) -> str: + res = run_cli(["registry", "create", userid, server, "--roles", ",".join(roles)]).raise_for_status() + for line in res.stdout.splitlines(): + line = line.strip() + if line.startswith("registry created:"): + return line.split(":", 1)[1].strip() + raise RuntimeError(f"could not parse registry id from output:\n{res.stdout}") + + +def registry_list(userid: str) -> list[dict]: + res = run_cli(["registry", "list", userid]).raise_for_status() + out: list[dict] = [] + for line in res.stdout.splitlines(): + s = line.strip() + m = re.match( + r"^([0-9a-fA-F]+)\s+@\s+(\S+)\s+roles=\[(.*)\]\s*$", + s, + ) + if m: + roles_raw = m.group(3) + roles = [r.strip().strip("'\"") for r in roles_raw.split(",") if r.strip()] + out.append({"registry_id": m.group(1), "server": m.group(2), "roles": roles}) + return out + + +def registry_add_roles(userid: str, server: str, registry_id: str, add_roles: list[str]) -> CliResult: + return run_cli([ + "registry", "update", userid, server, + "--registry", registry_id, + "--add-roles", ",".join(add_roles), + ]).raise_for_status() + + +def credentials_list(userid: str) -> list[dict]: + """Return locally held BBS credentials as [{registry_id, role}].""" + res = run_cli(["credentials", "list", userid]).raise_for_status() + creds: list[dict] = [] + in_local = False + for line in res.stdout.splitlines(): + if line.startswith("local credentials:"): + in_local = True + continue + if not line.strip(): + continue + if line.startswith(("owner admin capability:", "registries owned:")): + in_local = False + continue + if in_local: + s = line.strip() + if s == "(none)": + continue + if ":" in s: + rid, role = s.rsplit(":", 1) + creds.append({"registry_id": rid.strip(), "role": role.strip()}) + return creds + + +def grant( + userid: str, + server: str, + registry_id: str, + role_name: str, + recipient_contact: str, +) -> CliResult: + return run_cli([ + "grant", userid, + "--server", server, + "--registry", registry_id, + "--role", role_name, + "--to", recipient_contact, + ]).raise_for_status() + + +# ── background p2p-listen ──────────────────────────────────────────── + +class P2PListener: + """Background ``zkac-node p2p-listen`` process the TUI can stop and watch.""" + + def __init__( + self, + userid: str, + host: str = "127.0.0.1", + port: int = 0, + timeout_s: float = DEFAULT_LISTEN_TIMEOUT_S, + ) -> None: + self.userid = userid + self.host = host + self.port = port if port else _pick_random_port(host) + self.timeout_s = timeout_s + self._proc: subprocess.Popen[str] | None = None + self._stdout_buf = "" + self._lock = threading.Lock() + + @property + def address(self) -> str: + return f"{self.host}:{self.port}" + + def start(self) -> None: + prefix, env_overrides = _zkac_invocation() + env = { + **os.environ, + **env_overrides, + "PYTHONUNBUFFERED": "1", + "ZKAC_HOME": str(zkac_home()), + } + args = prefix + [ + "p2p-listen", self.userid, + "--host", self.host, + "--port", str(self.port), + "--timeout", str(self.timeout_s), + ] + self._proc = subprocess.Popen( + args, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + cwd=str(ROOT), + env=env, + bufsize=1, + ) + threading.Thread(target=self._drain, daemon=True).start() + + def _drain(self) -> None: + if self._proc is None or self._proc.stdout is None: + return + for line in self._proc.stdout: + with self._lock: + self._stdout_buf += line + + def is_running(self) -> bool: + return self._proc is not None and self._proc.poll() is None + + def stop(self) -> None: + if self._proc is None: + return + if self._proc.poll() is None: + self._proc.terminate() + try: + self._proc.wait(timeout=2.0) + except subprocess.TimeoutExpired: + self._proc.kill() + self._proc.wait() + + def output(self) -> str: + with self._lock: + return self._stdout_buf + + def parse_received(self) -> dict | None: + """Extract ``{registry_id, role}`` from CLI output once a grant has been received.""" + out = self.output() + rid = role = None + for line in out.splitlines(): + s = line.strip() + if s.startswith("registry:"): + rid = s.split(":", 1)[1].strip() + elif s.startswith("role:"): + role = s.split(":", 1)[1].strip() + if rid and role: + return {"registry_id": rid, "role": role} + return None + + +def _pick_random_port(host: str) -> int: + import socket + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + s.bind((host, 0)) + return s.getsockname()[1] + finally: + s.close() + + +# ── credentials and admin material (CLI on-disk format, read-only) ──── + +def _user_dir(userid: str) -> Path: + return zkac_home() / userid + + +def load_credential(userid: str, registry_id_hex: str, role_name: str) -> zkac.Credential: + """Reconstruct a finalized BBS credential previously stored by ``zkac-node grant``.""" + p = _user_dir(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json" + if not p.is_file(): + raise FileNotFoundError( + f"no credential for {role_name!r} on {registry_id_hex[:16]}…; " + "ask the registry admin to grant via `zkac-node grant`." + ) + data = json.loads(p.read_text()) + pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["issuer_pk_b64"])) + return zkac.Credential.finalize( + _b64_decode(data["blind_sig_b64"]), + _b64_decode(data["member_secret_b64"]), + _b64_decode(data["prover_blind_b64"]), + zkac.role_id(data["role_name"]), + int(data["epoch"]), + pk, + ) + + +def load_admin_material(userid: str, registry_id_hex: str) -> dict: + p = _user_dir(userid) / "admin" / f"{registry_id_hex}.json" + if not p.is_file(): + raise FileNotFoundError( + f"no admin material for {registry_id_hex[:16]}… under {userid!r}; " + "you are not the owner of this registry" + ) + return json.loads(p.read_text()) + + +def load_admin_credential(userid: str, registry_id_hex: str) -> zkac.Credential: + """Reconstruct the registry-admin BBS credential (role = ``admin_role_id``).""" + data = load_admin_material(userid, registry_id_hex) + pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["bbs_issuer_public_b64"])) + return zkac.Credential.finalize( + _b64_decode(data["admin_blind_sig_b64"]), + _b64_decode(data["admin_member_secret_b64"]), + _b64_decode(data["admin_prover_blind_b64"]), + zkac.admin_role_id(), + 0, + pk, + ) + + +def is_registry_admin(userid: str, registry_id_hex: str) -> bool: + return (_user_dir(userid) / "admin" / f"{registry_id_hex}.json").is_file() diff --git a/pyproject.toml b/pyproject.toml index 814343c..f0a6a64 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ cli = ["zkac-node"] demo = [ "flask>=3.0", "flask-sock>=0.7", + "textual>=0.70", "zkac-node", ] dev = [ diff --git a/uv.lock b/uv.lock index 1b7645f..e1bd437 100644 --- a/uv.lock +++ b/uv.lock @@ -857,6 +857,35 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0a/dd/8050c947d435c8d4bc94e3252f4d8bb8a76cfb424f043a8680be637a57f1/kiwisolver-1.5.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:59cd8683f575d96df5bb48f6add94afc055012c29e28124fcae2b63661b9efb1", size = 73558, upload-time = "2026-03-09T13:15:52.112Z" }, ] +[[package]] +name = "linkify-it-py" +version = "2.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "uc-micro-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/2e/c9/06ea13676ef354f0af6169587ae292d3e2406e212876a413bf9eece4eb23/linkify_it_py-2.1.0.tar.gz", hash = "sha256:43360231720999c10e9328dc3691160e27a718e280673d444c38d7d3aaa3b98b", size = 29158, upload-time = "2026-03-01T07:48:47.683Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b4/de/88b3be5c31b22333b3ca2f6ff1de4e863d8fe45aaea7485f591970ec1d3e/linkify_it_py-2.1.0-py3-none-any.whl", hash = "sha256:0d252c1594ecba2ecedc444053db5d3a9b7ec1b0dd929c8f1d74dce89f86c05e", size = 19878, upload-time = "2026-03-01T07:48:46.098Z" }, +] + +[[package]] +name = "markdown-it-py" +version = "4.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mdurl" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5c/5c/f3aedc83549aae71cd52b9e9687fe896e3dc6e966ba20eba04718605d198/markdown_it_py-4.1.0.tar.gz", hash = "sha256:760e3f87b2787c044c5138a5ba107b7c2be26c03b13cc7f8fe42756b65b1df6c", size = 81613, upload-time = "2026-05-06T16:32:13.649Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a8/88/802c82060c54bc7dde21eb0033e337838b8181a1323254aa9ec41cbfc3d1/markdown_it_py-4.1.0-py3-none-any.whl", hash = "sha256:d4939a62a2dd0cd9cb80a191a711ba1d39bac8ed5ef9e9966895b0171c01c46d", size = 90955, upload-time = "2026-05-06T16:32:12.184Z" }, +] + +[package.optional-dependencies] +linkify = [ + { name = "linkify-it-py" }, +] + [[package]] name = "markupsafe" version = "3.0.3" @@ -1053,6 +1082,27 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c5/2a/afe0193b673a79ffd2e01ad999511b7e9e6b49af02bb3759d82a78c3043d/maturin-1.13.1-py3-none-win_arm64.whl", hash = "sha256:2839024dcd65776abb4759e5bca29941971e095574162a4d335191da4be9ff24", size = 8905575, upload-time = "2026-04-09T15:14:03.891Z" }, ] +[[package]] +name = "mdit-py-plugins" +version = "0.5.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" }, +] + +[[package]] +name = "mdurl" +version = "0.1.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" }, +] + [[package]] name = "mistune" version = "3.2.1" @@ -1607,6 +1657,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl", hash = "sha256:381329a9f99628c9069361716891d34ad94af76e461dcb0335825aecc7692231", size = 26766, upload-time = "2025-10-13T15:30:47.625Z" }, ] +[[package]] +name = "rich" +version = "15.0.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" }, +] + [[package]] name = "rpds-py" version = "0.30.0" @@ -1773,6 +1836,23 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f1/7b/ce1eafaf1a76852e2ec9b22edecf1daa58175c090266e9f6c64afcd81d91/stack_data-0.6.3-py3-none-any.whl", hash = "sha256:d5558e0c25a4cb0853cddad3d77da9891a08cb85dd9f9f91b9f8cd66e511e695", size = 24521, upload-time = "2023-09-30T13:58:03.53Z" }, ] +[[package]] +name = "textual" +version = "8.2.5" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markdown-it-py", extra = ["linkify"] }, + { name = "mdit-py-plugins" }, + { name = "platformdirs" }, + { name = "pygments" }, + { name = "rich" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/62/1e/1eedc5bac184d00aaa5f9a99095f7e266af3ec46fa926c1051be5d358da1/textual-8.2.5.tar.gz", hash = "sha256:6c894e65a879dadb4f6cf46ddcfedb0173ff7e0cb1fe605ff7b357a597bdbc90", size = 1851596, upload-time = "2026-04-30T08:02:58.956Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cd/01/c4555f9c8a692ff83d84930150540f743ce94c89234f9e9a15ff4baba3a8/textual-8.2.5-py3-none-any.whl", hash = "sha256:247d2aa2faf222749c321f88a736247f37ee2c023604079c7490bfacddfcd4b2", size = 727050, upload-time = "2026-04-30T08:03:01.421Z" }, +] + [[package]] name = "tinycss2" version = "1.4.0" @@ -1874,6 +1954,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, ] +[[package]] +name = "uc-micro-py" +version = "2.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/78/67/9a363818028526e2d4579334460df777115bdec1bb77c08f9db88f6389f2/uc_micro_py-2.0.0.tar.gz", hash = "sha256:c53691e495c8db60e16ffc4861a35469b0ba0821fe409a8a7a0a71864d33a811", size = 6611, upload-time = "2026-03-01T06:31:27.526Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/61/73/d21edf5b204d1467e06500080a50f79d49ef2b997c79123a536d4a17d97c/uc_micro_py-2.0.0-py3-none-any.whl", hash = "sha256:3603a3859af53e5a39bc7677713c78ea6589ff188d70f4fee165db88e22b242c", size = 6383, upload-time = "2026-03-01T06:31:26.257Z" }, +] + [[package]] name = "wcwidth" version = "0.6.0" @@ -1931,6 +2020,7 @@ cli = [ demo = [ { name = "flask" }, { name = "flask-sock" }, + { name = "textual" }, { name = "zkac-node" }, ] dev = [ @@ -1950,6 +2040,7 @@ requires-dist = [ { name = "flask-sock", marker = "extra == 'demo'", specifier = ">=0.7" }, { name = "ipykernel", specifier = ">=6.31.0" }, { name = "maturin", marker = "extra == 'dev'", specifier = ">=1.0,<2.0" }, + { name = "textual", marker = "extra == 'demo'", specifier = ">=0.70" }, { name = "zkac-node", marker = "extra == 'cli'", editable = "cli" }, { name = "zkac-node", marker = "extra == 'demo'", editable = "cli" }, { name = "zkac-node", marker = "extra == 'dev'", editable = "cli" },