""" ZKAC file-share client library (demo). Reusable, UI-agnostic helpers used by ``file_share_tui.py`` and the smoke test harness. The library performs: * deterministic folder flattening and per-file content-key generation, * per-role visibility bitmasks over the flattened index, * opaque blob uploads to ``file_share_server.py``, * role-credential authenticated download + decrypt against the same server. Authenticated sessions piggy-back on the ZKAC TCP framing/handshake from ``zkac.tcp``; the ``zkac-node`` CLI is only used (separately) for identity, registry and direct P2P credential grants. """ from __future__ import annotations import base64 import hashlib import json import os import socket import uuid from dataclasses import dataclass, field from pathlib import Path from typing import Iterable, Iterator import zkac from zkac.tcp import FramedSession, client_handshake_anon CHUNK_SIZE = 128 * 1024 # plaintext bytes per stored blob (well under MAX_TCP_FRAME_BYTES) DEFAULT_CONNECT_TIMEOUT_S = 8.0 # ── small helpers ──────────────────────────────────────────────────── def _b64(data: bytes) -> str: return base64.b64encode(data).decode("ascii") def _unb64(s: str) -> bytes: return base64.b64decode(s) def _parse_server(server: str) -> tuple[str, int]: host, sep, port_s = server.rpartition(":") if not sep: raise ValueError(f"invalid server {server!r}, expected host:port") port = int(port_s, 10) if not 1 <= port <= 65535: raise ValueError(f"port out of range: {port}") return (host or "127.0.0.1"), port # ── data classes ───────────────────────────────────────────────────── @dataclass class FileChunk: blob_id: str eph_pk_b64: str # ephemeral pubkey used by the encrypt step (HPKE-style) @dataclass class FileEntry: rel_path: str size: int sha256_b64: str content_secret_b64: str # 32-byte X25519 secret -> derives the file's recipient kp chunks: list[FileChunk] = field(default_factory=list) def to_grant_dict(self) -> dict: return { "rel_path": self.rel_path, "size": self.size, "sha256_b64": self.sha256_b64, "content_secret_b64": self.content_secret_b64, "chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in self.chunks], } @classmethod def from_grant_dict(cls, data: dict) -> "FileEntry": return cls( rel_path=data["rel_path"], size=int(data["size"]), sha256_b64=data["sha256_b64"], content_secret_b64=data["content_secret_b64"], chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in data["chunks"]], ) @dataclass class BucketManifest: """Admin-side bucket manifest, persisted locally on the bucket creator.""" bucket_id: str server: str registry_id_hex: str files: list[FileEntry] role_masks: dict[str, str] = field(default_factory=dict) # role -> bitmask string ('0'/'1') recipients: list[dict] = field(default_factory=list) # local audit log only (no server identity binding) def to_dict(self) -> dict: return { "bucket_id": self.bucket_id, "server": self.server, "registry_id_hex": self.registry_id_hex, "files": [ { "rel_path": f.rel_path, "size": f.size, "sha256_b64": f.sha256_b64, "content_secret_b64": f.content_secret_b64, "chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in f.chunks], } for f in self.files ], "role_masks": self.role_masks, "recipients": self.recipients, } @classmethod def from_dict(cls, data: dict) -> "BucketManifest": files = [ FileEntry( rel_path=fd["rel_path"], size=int(fd["size"]), sha256_b64=fd["sha256_b64"], content_secret_b64=fd["content_secret_b64"], chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in fd["chunks"]], ) for fd in data.get("files", []) ] return cls( bucket_id=data["bucket_id"], server=data["server"], registry_id_hex=data["registry_id_hex"], files=files, role_masks=dict(data.get("role_masks", {})), recipients=list(data.get("recipients", [])), ) # ── folder flattening + chunking ───────────────────────────────────── def flatten_folder(folder: Path) -> list[Path]: """Deterministic relative-path order: sorted, files only, skipping hidden entries.""" folder = folder.resolve() if not folder.is_dir(): raise NotADirectoryError(folder) out: list[Path] = [] for path in sorted(folder.rglob("*")): if not path.is_file(): continue if any(part.startswith(".") for part in path.relative_to(folder).parts): continue out.append(path) return sorted(out, key=lambda p: str(p.relative_to(folder))) def _read_chunks(path: Path, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]: with path.open("rb") as fh: while True: buf = fh.read(chunk_size) if not buf: return yield buf def _sha256_file(path: Path) -> bytes: h = hashlib.sha256() for buf in _read_chunks(path, 64 * 1024): h.update(buf) return h.digest() def encrypt_file_to_blobs(path: Path) -> tuple[FileEntry, list[bytes]]: """Generate a per-file content keypair, encrypt each chunk to its public key. Each chunk uses ``zkac.encrypt_for_admin`` so the matching ``IssuanceKeypair.decrypt`` on the recipient side succeeds (both bind the ``user->admin`` HKDF label). Returns the file entry plus the parallel list of ciphertext blobs. """ content_kp = zkac.IssuanceKeypair() content_pk = bytes(content_kp.public_key_bytes()) content_sk = bytes(content_kp.secret_bytes()) sha = _sha256_file(path) size = path.stat().st_size entry = FileEntry( rel_path="", size=size, sha256_b64=_b64(sha), content_secret_b64=_b64(content_sk), ) blobs: list[bytes] = [] for chunk in _read_chunks(path): eph_pk, ciphertext = zkac.encrypt_for_admin(content_pk, chunk) entry.chunks.append(FileChunk(blob_id=uuid.uuid4().hex, eph_pk_b64=_b64(bytes(eph_pk)))) blobs.append(bytes(ciphertext)) return entry, blobs def decrypt_file_from_blobs(entry: FileEntry, blobs: Iterable[bytes]) -> bytes: """Reassemble + verify a file from its (already fetched) ciphertext blobs.""" content_kp = zkac.IssuanceKeypair.from_secret(_unb64(entry.content_secret_b64)) out = bytearray() for chunk_meta, ciphertext in zip(entry.chunks, blobs): out.extend(bytes(content_kp.decrypt(_unb64(chunk_meta.eph_pk_b64), ciphertext))) if hashlib.sha256(bytes(out)).digest() != _unb64(entry.sha256_b64): raise RuntimeError(f"hash mismatch for {entry.rel_path!r}") return bytes(out) # ── role bitmasks ──────────────────────────────────────────────────── def normalize_mask(mask: str, file_count: int) -> str: """Pad/truncate a bitmask string of '0'/'1' to ``file_count`` and validate.""" cleaned = "".join(c for c in mask if c in "01") if len(cleaned) > file_count: raise ValueError(f"mask {mask!r} longer than {file_count} files") cleaned = cleaned.ljust(file_count, "0") return cleaned def files_visible_to_mask(files: list[FileEntry], mask: str) -> list[FileEntry]: return [f for f, bit in zip(files, mask) if bit == "1"] def encode_grant_payload( bucket_id: str, role_name: str, server: str, registry_id_hex: str, visible_files: list[FileEntry], ) -> bytes: payload = { "bucket_id": bucket_id, "role_name": role_name, "server": server, "registry_id_hex": registry_id_hex, "files": [f.to_grant_dict() for f in visible_files], } return json.dumps(payload, separators=(",", ":")).encode("utf-8") def encrypt_grant_to_recipient(payload: bytes, recipient_issuance_pk_hex: str) -> tuple[str, str]: """User->admin direction sealed box: ``zkac.encrypt_for_admin`` -> (eph_pk_b64, ciphertext_b64).""" rec_pk = bytes.fromhex(recipient_issuance_pk_hex) if len(rec_pk) != 32: raise ValueError("recipient issuance public key must be 32 bytes") eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk, payload) return _b64(bytes(eph_pk)), _b64(bytes(ciphertext)) def decrypt_grant_for_recipient( eph_pk_b64: str, ciphertext_b64: str, issuance_secret_hex: str, ) -> dict: secret = bytes.fromhex(issuance_secret_hex) if len(secret) != 32: raise ValueError("issuance secret must be 32 bytes") receiver = zkac.IssuanceKeypair.from_secret(secret) plaintext = bytes(receiver.decrypt(_unb64(eph_pk_b64), _unb64(ciphertext_b64))) return json.loads(plaintext.decode("utf-8")) # ── encrypted authenticated session to file-share server ───────────── class FileShareSession: """Anonymous handshake + ``op:'fs'`` BBS+ presentation -> framed JSON RPC.""" def __init__(self, sock: socket.socket, framed: FramedSession) -> None: self._sock = sock self._framed = framed def _call(self, cmd: dict) -> dict: self._framed.send(json.dumps(cmd).encode("utf-8")) resp = json.loads(self._framed.recv()) if resp.get("error"): raise RuntimeError(resp["error"]) return resp def close(self) -> None: try: self._sock.close() except OSError: pass def __enter__(self) -> "FileShareSession": return self def __exit__(self, *exc: object) -> None: self.close() # admin commands def bucket_create(self, bucket_id: str | None = None) -> str: cmd = {"cmd": "bucket_create"} if bucket_id is not None: cmd["bucket_id"] = bucket_id return self._call(cmd)["bucket_id"] def bucket_put_blob(self, bucket_id: str, blob_id: str, ciphertext: bytes) -> None: self._call({ "cmd": "bucket_put_blob", "bucket_id": bucket_id, "blob_id": blob_id, "ciphertext_b64": _b64(ciphertext), }) def bucket_set_role_acl(self, bucket_id: str, role_id_hex: str, allowed_blob_ids: list[str]) -> None: self._call({ "cmd": "bucket_set_role_acl", "bucket_id": bucket_id, "role_id_hex": role_id_hex, "allowed_blob_ids": allowed_blob_ids, }) def bucket_put_role_grant( self, bucket_id: str, role_id_hex: str, acl_version: int, eph_pk_b64: str, ciphertext_b64: str, ) -> None: self._call({ "cmd": "bucket_put_role_grant", "bucket_id": bucket_id, "role_id_hex": role_id_hex, "acl_version": int(acl_version), "eph_pk_b64": eph_pk_b64, "ciphertext_b64": ciphertext_b64, }) def bucket_get_role_acl(self, bucket_id: str, role_id_hex: str) -> dict: return self._call({ "cmd": "bucket_get_role_acl", "bucket_id": bucket_id, "role_id_hex": role_id_hex, }) def bucket_finalize(self, bucket_id: str) -> None: self._call({"cmd": "bucket_finalize", "bucket_id": bucket_id}) def bucket_delete(self, bucket_id: str) -> None: self._call({"cmd": "bucket_delete", "bucket_id": bucket_id}) def bucket_list_owned(self) -> list[str]: return self._call({"cmd": "bucket_list_owned"})["bucket_ids"] # any-role commands def whoami(self) -> dict: return self._call({"cmd": "whoami"}) def fs_buckets(self) -> list[str]: return self._call({"cmd": "fs_buckets"})["bucket_ids"] def fs_get_role_grant(self, bucket_id: str) -> dict: return self._call({"cmd": "fs_get_role_grant", "bucket_id": bucket_id}) def fs_get_blob(self, bucket_id: str, blob_id: str) -> bytes: return _unb64(self._call({"cmd": "fs_get_blob", "bucket_id": bucket_id, "blob_id": blob_id})["ciphertext_b64"]) def open_session( server: str, *, server_pk_hex: str, user_transport_secret: bytes, registry_id_hex: str, role_id: bytes, credential: "zkac.Credential", connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S, ) -> FileShareSession: """Connect, anonymous-handshake, then present a BBS+ proof bound to the transcript.""" host, port = _parse_server(server) sock = socket.create_connection((host, port), timeout=connect_timeout_s) sock.settimeout(None) try: node = zkac.Node(zkac.Keypair.from_secret_key(user_transport_secret)) server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex)) session = client_handshake_anon(sock, node, server_pk) framed = FramedSession(sock, session) transcript_hash = bytes(session.transcript_hash()) bbs_auth_proof = bytes(credential.present(transcript_hash)) framed.send(json.dumps({ "op": "fs", "bbs_auth_b64": _b64(bbs_auth_proof), }).encode("utf-8")) hello = json.loads(framed.recv()) if hello.get("error"): raise RuntimeError(hello["error"]) return FileShareSession(sock, framed) except Exception: sock.close() raise # ── higher-level admin helpers ─────────────────────────────────────── def upload_bucket( sess: FileShareSession, folder: Path, *, server: str, registry_id_hex: str, bucket_id: str | None = None, ) -> BucketManifest: """Encrypt every file under ``folder`` and stream blobs to the server.""" bid = sess.bucket_create(bucket_id) paths = flatten_folder(folder) files: list[FileEntry] = [] for p in paths: entry, blobs = encrypt_file_to_blobs(p) entry.rel_path = str(p.relative_to(folder.resolve())) for chunk_meta, ciphertext in zip(entry.chunks, blobs): sess.bucket_put_blob(bid, chunk_meta.blob_id, ciphertext) files.append(entry) sess.bucket_finalize(bid) return BucketManifest( bucket_id=bid, server=server, registry_id_hex=registry_id_hex, files=files, ) def apply_role_masks_to_server(sess: FileShareSession, manifest: BucketManifest) -> None: """Push per-role blob ACLs so server enforces mask on fs_get_blob.""" if not manifest.role_masks: return for role_name, raw_mask in manifest.role_masks.items(): mask = normalize_mask(raw_mask, len(manifest.files)) visible = files_visible_to_mask(manifest.files, mask) allowed_blob_ids = [c.blob_id for f in visible for c in f.chunks] sess.bucket_set_role_acl( manifest.bucket_id, zkac.role_id(role_name).hex(), allowed_blob_ids, ) def push_role_grant( sess: FileShareSession, manifest: BucketManifest, role_name: str, recipient_issuance_pk_hex: str, ) -> None: """Upload anonymous encrypted role grant envelope (no recipient identifier stored).""" if role_name not in manifest.role_masks: raise RuntimeError(f"role {role_name!r} has no visibility mask in this bucket") mask = normalize_mask(manifest.role_masks[role_name], len(manifest.files)) visible = files_visible_to_mask(manifest.files, mask) role_id_hex = zkac.role_id(role_name).hex() acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex) acl_version = int(acl_meta.get("version", 0)) if acl_version <= 0: raise RuntimeError("server ACL missing for role; apply permissions before sharing") payload = encode_grant_payload( manifest.bucket_id, role_name, manifest.server, manifest.registry_id_hex, visible, ) eph_pk_b64, ct_b64 = encrypt_grant_to_recipient(payload, recipient_issuance_pk_hex) sess.bucket_put_role_grant( manifest.bucket_id, role_id_hex, acl_version, eph_pk_b64, ct_b64, ) manifest.recipients.append({ "role_name": role_name, "recipient_hint": recipient_issuance_pk_hex[:16], }) def download_bucket( sess: FileShareSession, bucket_id: str, *, issuance_secret_hex: str, role_grant_payload: dict | None = None, output_dir: Path, ) -> dict: """Fetch + decrypt files from a role grant, optionally fetched from server.""" payload = role_grant_payload if payload is None: grants = sess.fs_get_role_grant(bucket_id).get("grants", []) for grant in grants: try: payload = decrypt_grant_for_recipient( grant["eph_pk_b64"], grant["ciphertext_b64"], issuance_secret_hex, ) break except Exception: continue if payload is None: raise RuntimeError("no decryptable role grant for this credential") if payload.get("bucket_id") != bucket_id: raise RuntimeError("role grant bucket_id mismatch") output_dir.mkdir(parents=True, exist_ok=True) written: list[str] = [] for fd in payload.get("files", []): entry = FileEntry.from_grant_dict(fd) ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks] plaintext = decrypt_file_from_blobs(entry, ciphertexts) out_path = output_dir / entry.rel_path out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_bytes(plaintext) written.append(str(out_path)) return {"role_name": payload.get("role_name"), "files_written": written} def _received_grant_path(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> Path: return state_dir(userid) / "received_grants" / registry_id_hex / role_name / f"{bucket_id}.json" def save_received_role_grant( userid: str, registry_id_hex: str, role_name: str, bucket_id: str, *, eph_pk_b64: str, ciphertext_b64: str, ) -> Path: path = _received_grant_path(userid, registry_id_hex, role_name, bucket_id) path.parent.mkdir(parents=True, exist_ok=True) path.write_text(json.dumps({ "registry_id_hex": registry_id_hex, "role_name": role_name, "bucket_id": bucket_id, "eph_pk_b64": eph_pk_b64, "ciphertext_b64": ciphertext_b64, }, indent=2)) try: os.chmod(path, 0o600) except OSError: pass return path def list_received_role_grants(userid: str, registry_id_hex: str, role_name: str) -> list[str]: base = state_dir(userid) / "received_grants" / registry_id_hex / role_name if not base.is_dir(): return [] return sorted(p.stem for p in base.glob("*.json")) def load_received_role_grant(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> dict: return json.loads(_received_grant_path(userid, registry_id_hex, role_name, bucket_id).read_text()) # ── local persistence (admin manifests) ────────────────────────────── def state_dir(userid: str) -> Path: # Keep demo state fully self-contained with demo ZKAC_HOME, rather than # mixing with repository-local legacy state directories. base_home = Path(os.environ.get("ZKAC_HOME", Path.home() / ".ZKAC-FS")) base = base_home / userid / "file_share" base.mkdir(parents=True, exist_ok=True) try: os.chmod(base, 0o700) except OSError: pass return base def manifest_path(userid: str, bucket_id: str) -> Path: return state_dir(userid) / "buckets" / f"{bucket_id}.json" def save_manifest(userid: str, manifest: BucketManifest) -> Path: p = manifest_path(userid, manifest.bucket_id) p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(manifest.to_dict(), indent=2)) try: os.chmod(p, 0o600) except OSError: pass return p def load_manifest(userid: str, bucket_id: str) -> BucketManifest: return BucketManifest.from_dict(json.loads(manifest_path(userid, bucket_id).read_text())) def list_manifests(userid: str) -> list[str]: base = state_dir(userid) / "buckets" if not base.is_dir(): return [] return sorted(p.stem for p in base.glob("*.json"))