#!/usr/bin/env python3 """ ZKAC opaque file-share server (demo). Headless TCP service that combines: * The same management-channel wire protocol as ``zkac-node serve`` so existing ``zkac-node registry create/get/update`` commands work unchanged. * A new file-share channel that, after BBS+ role authentication, exposes bucket primitives. The server only ever sees opaque ciphertext blobs and encrypted role grants; file names, contents and per-role visibility masks are never visible server-side. Run with:: uv run python demo/file_share_server.py --host 127.0.0.1 --port 9879 The server transport public key is printed at start-up; pin it on each client with ``zkac-node server pin 127.0.0.1:9879 --key ``. """ from __future__ import annotations import argparse import base64 import hashlib import json import os import socket import sys import threading import uuid from pathlib import Path from typing import Any import zkac from zkac.tcp import FramedSession, server_handshake_anon # ── small helpers ──────────────────────────────────────────────────── def _b64(data: bytes) -> str: return base64.b64encode(data).decode("ascii") def _unb64(s: str) -> bytes: return base64.b64decode(s) def _chmod(path: Path, mode: int) -> None: try: os.chmod(path, mode) except OSError: pass def _write_private_json(path: Path, payload: dict) -> None: path.write_text(json.dumps(payload, indent=2)) _chmod(path, 0o600) def _is_loopback(host: str) -> bool: return host.strip().lower() in {"127.0.0.1", "::1", "localhost"} def _safe_id(value: str) -> str: """Strict allowlist on identifiers used as filesystem names.""" if not isinstance(value, str) or not value: raise ValueError("missing identifier") if len(value) > 128 or any(c not in "0123456789abcdefABCDEF-" for c in value): raise ValueError(f"invalid identifier {value!r}") return value def _privacy_safe_error_tag(exc: BaseException) -> str: """Return a coarse error label suitable for public service logs.""" return type(exc).__name__ # ── opaque on-disk store ───────────────────────────────────────────── class _FileShareStore: """Persists registry snapshots and opaque bucket state under one data dir.""" def __init__(self, data_dir: Path) -> None: self._dir = data_dir self._reg_dir = data_dir / "registries" self._buckets_dir = data_dir / "buckets" self._privacy_key: bytes = b"" for d in (self._dir, self._reg_dir, self._buckets_dir): d.mkdir(parents=True, exist_ok=True) _chmod(d, 0o700) self._lock = threading.Lock() def set_privacy_key(self, key: bytes) -> None: if not key: raise RuntimeError("privacy key must not be empty") self._privacy_key = bytes(key) def _require_privacy_key(self) -> bytes: if not self._privacy_key: raise RuntimeError("privacy key not initialized") return self._privacy_key def _tag(self, kind: str, raw_id: str) -> str: key = self._require_privacy_key() raw = _safe_id(raw_id).encode("utf-8") h = hashlib.sha256() h.update(kind.encode("utf-8")) h.update(b"\x00") h.update(key) h.update(b"\x00") h.update(raw) return h.hexdigest() def _registry_tag(self, registry_id_hex: str) -> str: return self._tag("registry", registry_id_hex) def _role_tag(self, role_id_hex: str) -> str: return self._tag("role", role_id_hex) # transport key def load_or_create_keypair(self) -> zkac.Keypair: kf = self._dir / "server_key.json" if kf.exists(): data = json.loads(kf.read_text()) return zkac.Keypair.from_secret_key(_unb64(data["secret_b64"])) kp = zkac.Keypair() _write_private_json(kf, { "secret_b64": _b64(kp.secret_key_bytes()), "public_b64": _b64(kp.public_key().to_bytes()), }) return kp # registries (mirrors zkac-node serve so the CLI works unchanged) def save_registry(self, rid_hex: str, state_bytes: bytes, cert_bytes: bytes) -> None: with self._lock: (self._reg_dir / f"{rid_hex}.state").write_bytes(state_bytes) (self._reg_dir / f"{rid_hex}.cert").write_bytes(cert_bytes) def load_all_registries(self, mgr: zkac.RegistryManager) -> int: n = 0 for p in sorted(self._reg_dir.glob("*.state")): cert = self._reg_dir / f"{p.stem}.cert" if not cert.exists(): continue try: mgr.restore(p.read_bytes(), cert.read_bytes()) n += 1 except Exception as exc: print(f"[fs-server] skip registry {p.stem}: {exc}") return n def list_registry_ids(self) -> list[str]: out: list[str] = [] for p in sorted(self._reg_dir.glob("*.state")): cert = self._reg_dir / f"{p.stem}.cert" if cert.exists(): out.append(p.stem) return out # buckets def _bucket_dir(self, bucket_id: str) -> Path: return self._buckets_dir / _safe_id(bucket_id) def _bucket_meta_path(self, bucket_id: str) -> Path: return self._bucket_dir(bucket_id) / "meta.json" def _roles_index_path(self, registry_id_hex: str) -> Path: return self._reg_dir / f"{_safe_id(registry_id_hex)}.roles.json" def _remember_role_id(self, registry_id_hex: str, role_id_hex: str) -> None: p = self._roles_index_path(registry_id_hex) roles: set[str] = set() if p.is_file(): try: data = json.loads(p.read_text()) if isinstance(data, list): roles = {_safe_id(str(v)) for v in data} except Exception: roles = set() roles.add(_safe_id(role_id_hex)) _write_private_json(p, sorted(roles)) def bucket_create(self, bucket_id: str, owner_registry_id_hex: str) -> None: bd = self._bucket_dir(bucket_id) with self._lock: if bd.exists(): raise RuntimeError("bucket already exists") (bd / "blobs").mkdir(parents=True) (bd / "role_grants").mkdir() _chmod(bd, 0o700) _write_private_json(self._bucket_meta_path(bucket_id), { "bucket_id": bucket_id, "owner_registry_tag": self._registry_tag(owner_registry_id_hex), "finalized": False, "role_acl": {}, # role_tag -> {"version": int, "allowed_blob_ids": [blob_id...]} }) def bucket_meta(self, bucket_id: str) -> dict: return json.loads(self._bucket_meta_path(bucket_id).read_text()) def bucket_is_finalized(self, bucket_id: str) -> bool: return bool(self.bucket_meta(bucket_id).get("finalized", False)) def _bucket_require_owner(self, bucket_id: str, registry_id_hex: str) -> dict: meta = self.bucket_meta(bucket_id) expected_owner_tag = self._registry_tag(registry_id_hex) if meta.get("owner_registry_tag") != expected_owner_tag: raise RuntimeError("not bucket owner") return meta def bucket_set_finalized(self, bucket_id: str, registry_id_hex: str, finalized: bool) -> None: with self._lock: meta = self._bucket_require_owner(bucket_id, registry_id_hex) meta["finalized"] = bool(finalized) _write_private_json(self._bucket_meta_path(bucket_id), meta) def bucket_delete(self, bucket_id: str, registry_id_hex: str) -> None: with self._lock: self._bucket_require_owner(bucket_id, registry_id_hex) bd = self._bucket_dir(bucket_id) for sub in ("blobs", "role_grants"): d = bd / sub if d.is_dir(): for p in d.rglob("*"): if p.is_file(): try: p.unlink() except OSError: pass for p in sorted(d.rglob("*"), reverse=True): if p.is_dir(): try: p.rmdir() except OSError: pass try: d.rmdir() except OSError: pass try: self._bucket_meta_path(bucket_id).unlink() except OSError: pass try: bd.rmdir() except OSError: pass def bucket_put_blob( self, bucket_id: str, registry_id_hex: str, blob_id: str, ciphertext: bytes, ) -> None: with self._lock: self._bucket_require_owner(bucket_id, registry_id_hex) (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).write_bytes(ciphertext) def bucket_set_role_acl( self, bucket_id: str, registry_id_hex: str, role_id_hex: str, allowed_blob_ids: list[str], ) -> None: with self._lock: meta = self._bucket_require_owner(bucket_id, registry_id_hex) role_acl = dict(meta.get("role_acl", {})) key = self._role_tag(role_id_hex) prev = role_acl.get(key, {}) prev_version = int(prev.get("version", 0)) if isinstance(prev, dict) else 0 role_acl[key] = { "version": prev_version + 1, "allowed_blob_ids": [_safe_id(b) for b in allowed_blob_ids], } meta["role_acl"] = role_acl _write_private_json(self._bucket_meta_path(bucket_id), meta) self._remember_role_id(registry_id_hex, role_id_hex) def bucket_get_blob(self, bucket_id: str, blob_id: str) -> bytes: return (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).read_bytes() def bucket_blob_allowed_for_role(self, bucket_id: str, role_id_hex: str, blob_id: str) -> bool: meta = self.bucket_meta(bucket_id) role_acl = meta.get("role_acl", {}) role_meta = role_acl.get(self._role_tag(role_id_hex)) if not isinstance(role_meta, dict): return False allowed = role_meta.get("allowed_blob_ids") if not isinstance(allowed, list): return False return _safe_id(blob_id) in {str(v) for v in allowed} def bucket_role_acl(self, bucket_id: str, role_id_hex: str) -> dict: meta = self.bucket_meta(bucket_id) role_acl = meta.get("role_acl", {}) role_meta = role_acl.get(self._role_tag(role_id_hex), {}) if not isinstance(role_meta, dict): return {"version": 0, "allowed_blob_ids": []} version = int(role_meta.get("version", 0)) allowed = role_meta.get("allowed_blob_ids", []) if not isinstance(allowed, list): allowed = [] return { "version": version, "allowed_blob_ids": [_safe_id(str(v)) for v in allowed], } def buckets_for_role_in_registry(self, role_id_hex: str, registry_id_hex: str) -> list[str]: """Bucket ids where this authenticated role currently has non-empty ACL.""" role_id_hex = _safe_id(role_id_hex) out: list[str] = [] for bd in sorted(self._buckets_dir.iterdir()): bid = bd.name try: if not self.bucket_is_finalized(bid): continue if self.bucket_owner_registry_tag(bid) != self._registry_tag(registry_id_hex): continue acl = self.bucket_role_acl(bid, role_id_hex) allowed = acl.get("allowed_blob_ids", []) grants = self.bucket_get_role_grants(bid, role_id_hex) current_acl_version = int(acl.get("version", -1)) has_fresh_grant = any( isinstance(g.get("acl_version"), int) and g["acl_version"] == current_acl_version for g in grants ) if isinstance(allowed, list) and len(allowed) > 0 and has_fresh_grant: out.append(bid) except Exception: continue return out def bucket_put_role_grant( self, bucket_id: str, registry_id_hex: str, role_id_hex: str, acl_version: int, eph_pk_b64: str, ciphertext_b64: str, ) -> None: with self._lock: self._bucket_require_owner(bucket_id, registry_id_hex) role_id_hex = _safe_id(role_id_hex) role_tag = self._role_tag(role_id_hex) payload = { "bucket_id": bucket_id, "role_tag": role_tag, "acl_version": int(acl_version), "eph_pk_b64": eph_pk_b64, "ciphertext_b64": ciphertext_b64, } role_dir = self._bucket_dir(bucket_id) / "role_grants" / role_tag role_dir.mkdir(parents=True, exist_ok=True) target = role_dir / f"{uuid.uuid4().hex}.json" _write_private_json(target, payload) self._remember_role_id(registry_id_hex, role_id_hex) def bucket_get_role_grants(self, bucket_id: str, role_id_hex: str) -> list[dict]: role_dir = self._bucket_dir(bucket_id) / "role_grants" / self._role_tag(role_id_hex) if not role_dir.is_dir(): return [] out: list[dict] = [] for p in sorted(role_dir.glob("*.json")): try: out.append(json.loads(p.read_text())) except Exception: continue return out def bucket_owner_registry_tag(self, bucket_id: str) -> str: owner = self.bucket_meta(bucket_id).get("owner_registry_tag") if not isinstance(owner, str) or not owner: raise RuntimeError("bucket metadata missing owner_registry_tag") return owner def buckets_owned_by(self, registry_id_hex: str) -> list[str]: out: list[str] = [] for bd in sorted(self._buckets_dir.iterdir()): meta = bd / "meta.json" if not meta.is_file(): continue try: owner_tag = self._registry_tag(registry_id_hex) if json.loads(meta.read_text()).get("owner_registry_tag") == owner_tag: out.append(bd.name) except (OSError, json.JSONDecodeError): continue return out def role_ids_for_registry(self, registry_id_hex: str) -> list[str]: p = self._roles_index_path(registry_id_hex) if not p.is_file(): return [] try: data = json.loads(p.read_text()) if not isinstance(data, list): return [] return sorted({_safe_id(str(v)) for v in data}) except Exception: return [] # ── command dispatch (inside encrypted session) ────────────────────── def _dispatch_mgmt( cmd: dict, mgr: zkac.RegistryManager, store: _FileShareStore, server_pk_b64: str, transcript_hash: bytes, ) -> dict: """Registry management commands, wire-compatible with ``zkac-node`` CLI.""" try: action = cmd.get("cmd") rid_hex = cmd.get("auth_registry_id") admin_proof_b64 = cmd.get("admin_proof_b64") def _require_admin_for(target_rid_hex: str) -> None: if rid_hex != target_rid_hex: raise RuntimeError("auth_registry_id must match command registry_id") if not isinstance(admin_proof_b64, str) or not admin_proof_b64: raise RuntimeError("missing admin_proof_b64") if not mgr.verify_admin( bytes.fromhex(target_rid_hex), _unb64(admin_proof_b64), transcript_hash, ): raise RuntimeError("admin authorization failed") if action == "server_info": return {"ok": True, "server_public_key_b64": server_pk_b64} if action == "create_registry": state_bytes = _unb64(cmd["state_bytes_b64"]) state_cert = _unb64(cmd["state_cert_b64"]) auth_rid = cmd.get("auth_registry_id") if not isinstance(auth_rid, str): raise RuntimeError("missing auth_registry_id") if not isinstance(admin_proof_b64, str) or not admin_proof_b64: raise RuntimeError("missing admin_proof_b64") tmp_mgr = zkac.RegistryManager() expected_rid = tmp_mgr.create(state_bytes, state_cert).hex() if expected_rid != auth_rid: raise RuntimeError("auth_registry_id does not match certified state") if not tmp_mgr.verify_admin( bytes.fromhex(expected_rid), _unb64(admin_proof_b64), transcript_hash, ): raise RuntimeError("admin authorization failed for create_registry") rid = mgr.create(state_bytes, state_cert) store.save_registry(rid.hex(), state_bytes, state_cert) return {"ok": True, "registry_id": rid.hex()} if action == "get_registry": rid_hex_cmd = cmd["registry_id"] _require_admin_for(rid_hex_cmd) rid = bytes.fromhex(rid_hex_cmd) state_bytes, state_cert = mgr.get(rid) return { "ok": True, "state_bytes_b64": _b64(state_bytes), "state_cert_b64": _b64(state_cert), } if action == "update_registry": rid_hex_cmd = cmd["registry_id"] _require_admin_for(rid_hex_cmd) rid = bytes.fromhex(rid_hex_cmd) state_bytes = _unb64(cmd["state_bytes_b64"]) state_cert = _unb64(cmd["state_cert_b64"]) mgr.update(rid, state_bytes, state_cert) store.save_registry(rid_hex_cmd, state_bytes, state_cert) return {"ok": True} return {"error": f"unknown command: {action}"} except Exception as exc: return {"error": str(exc)} def _dispatch_fs( cmd: dict, store: _FileShareStore, ctx: dict, ) -> dict: """File-share commands authenticated via opaque per-session authorization context.""" try: action = cmd.get("cmd") registry_id_hex: str = ctx["registry_id_hex"] registry_tag: str = ctx["registry_tag"] role_id_hex: str = ctx["role_id_hex"] is_admin: bool = bool(ctx["is_admin"]) def _require_admin() -> None: if not is_admin: raise RuntimeError("admin role required for this command") if action == "whoami": return { "ok": True, "is_admin": is_admin, "auth_scope": "admin" if is_admin else "credential", } if action == "bucket_create": _require_admin() bid = cmd.get("bucket_id") or uuid.uuid4().hex store.bucket_create(bid, registry_id_hex) return {"ok": True, "bucket_id": bid} if action == "bucket_put_blob": _require_admin() bid = cmd["bucket_id"] blob_id = cmd["blob_id"] ciphertext = _unb64(cmd["ciphertext_b64"]) store.bucket_put_blob(bid, registry_id_hex, blob_id, ciphertext) return {"ok": True} if action == "bucket_set_role_acl": _require_admin() store.bucket_set_role_acl( cmd["bucket_id"], registry_id_hex, cmd["role_id_hex"], list(cmd.get("allowed_blob_ids", [])), ) return {"ok": True} if action == "bucket_put_role_grant": _require_admin() store.bucket_put_role_grant( cmd["bucket_id"], registry_id_hex, cmd["role_id_hex"], int(cmd["acl_version"]), cmd["eph_pk_b64"], cmd["ciphertext_b64"], ) return {"ok": True} if action == "bucket_get_role_acl": _require_admin() role_acl = store.bucket_role_acl(cmd["bucket_id"], cmd["role_id_hex"]) return {"ok": True, **role_acl} if action == "bucket_finalize": _require_admin() store.bucket_set_finalized(cmd["bucket_id"], registry_id_hex, True) return {"ok": True} if action == "bucket_delete": _require_admin() store.bucket_delete(cmd["bucket_id"], registry_id_hex) return {"ok": True} if action == "bucket_list_owned": _require_admin() return {"ok": True, "bucket_ids": store.buckets_owned_by(registry_id_hex)} if action == "fs_buckets": return { "ok": True, "bucket_ids": store.buckets_for_role_in_registry( role_id_hex, registry_id_hex, ), } if action == "fs_get_role_grant": bid = cmd["bucket_id"] if store.bucket_owner_registry_tag(bid) != registry_tag: raise RuntimeError("bucket does not belong to authenticated registry") if not store.bucket_is_finalized(bid): raise RuntimeError("bucket is not finalized") current_acl = store.bucket_role_acl(bid, role_id_hex) acl_version = int(current_acl.get("version", -1)) grants = [ g for g in store.bucket_get_role_grants(bid, role_id_hex) if isinstance(g.get("acl_version"), int) and g["acl_version"] == acl_version ] if not grants: raise RuntimeError("permissions updated; request a fresh role grant") return {"ok": True, "grants": grants} if action == "fs_get_blob": bid = cmd["bucket_id"] blob_id = cmd["blob_id"] if store.bucket_owner_registry_tag(bid) != registry_tag: raise RuntimeError("bucket does not belong to authenticated registry") if not store.bucket_is_finalized(bid): raise RuntimeError("bucket is not finalized") if not is_admin and not store.bucket_blob_allowed_for_role(bid, role_id_hex, blob_id): raise RuntimeError("blob access denied by role mask") data = store.bucket_get_blob(bid, blob_id) return {"ok": True, "ciphertext_b64": _b64(data)} return {"error": f"unknown command: {action}"} except Exception as exc: return {"error": str(exc)} def _authenticate_fs_identity( mgr: zkac.RegistryManager, store: _FileShareStore, proof: bytes, transcript_hash: bytes, ) -> dict[str, object] | None: """Resolve opaque auth context from proof without client-supplied identifiers.""" registry_ids = store.list_registry_ids() admin_matches: list[str] = [] role_matches: list[tuple[str, str]] = [] for rid_hex in registry_ids: try: rid = bytes.fromhex(rid_hex) except ValueError: continue try: if mgr.verify_admin(rid, proof, transcript_hash): admin_matches.append(rid_hex) except Exception: pass for role_id_hex in store.role_ids_for_registry(rid_hex): try: role_id = bytes.fromhex(role_id_hex) except ValueError: continue if role_id == zkac.admin_role_id(): continue try: if mgr.verify_presentation(rid, role_id, proof, transcript_hash): role_matches.append((rid_hex, role_id_hex)) except Exception: continue if len(admin_matches) == 1 and not role_matches: rid_hex = admin_matches[0] role_hex = zkac.admin_role_id().hex() return { "registry_id_hex": rid_hex, "registry_tag": store._registry_tag(rid_hex), "role_id_hex": role_hex, "role_tag": store._role_tag(role_hex), "is_admin": True, } if not admin_matches and len(role_matches) == 1: rid_hex, role_hex = role_matches[0] return { "registry_id_hex": rid_hex, "registry_tag": store._registry_tag(rid_hex), "role_id_hex": role_hex, "role_tag": store._role_tag(role_hex), "is_admin": False, } # Ambiguous or invalid proof. return None # ── per-connection handler ──────────────────────────────────────────── def _handle_conn( conn: socket.socket, addr: tuple, node: zkac.Node, mgr: zkac.RegistryManager, store: _FileShareStore, server_pk_b64: str, idle_timeout_s: float, slots: threading.BoundedSemaphore, ) -> None: try: conn.settimeout(idle_timeout_s) session = server_handshake_anon(conn, node) framed = FramedSession(conn, session) transcript_hash = bytes(session.transcript_hash()) hello = json.loads(framed.recv()) op = hello.get("op") if op == "mgmt": while True: try: cmd = json.loads(framed.recv()) except (ConnectionError, OSError): break resp = _dispatch_mgmt(cmd, mgr, store, server_pk_b64, transcript_hash) framed.send(json.dumps(resp).encode()) return if op == "fs": try: proof = _unb64(hello["bbs_auth_b64"]) except (KeyError, ValueError) as exc: framed.send(json.dumps({"error": f"invalid fs hello: {exc}"}).encode()) return auth = _authenticate_fs_identity(mgr, store, proof, transcript_hash) if auth is None: framed.send(json.dumps({"error": "auth failed"}).encode()) return framed.send(json.dumps({"ok": True, "status": "authenticated"}).encode()) ctx = auth while True: try: cmd = json.loads(framed.recv()) except (ConnectionError, OSError): break resp = _dispatch_fs(cmd, store, ctx) framed.send(json.dumps(resp).encode()) return framed.send(json.dumps({"error": f"unknown op: {op}"}).encode()) except (ConnectionError, BrokenPipeError, OSError): pass except Exception as exc: # Privacy model: never emit client endpoint or request-linked payloads. print(f"[fs-server] connection error ({_privacy_safe_error_tag(exc)})") finally: conn.close() slots.release() # ── entry point ────────────────────────────────────────────────────── def serve( data_dir: Path, host: str = "127.0.0.1", port: int = 9879, *, max_connections: int = 64, idle_timeout_s: float = 60.0, listen_backlog: int = 64, allow_non_loopback: bool = False, ) -> None: data_dir.mkdir(parents=True, exist_ok=True) store = _FileShareStore(data_dir) kp = store.load_or_create_keypair() store.set_privacy_key(bytes(kp.secret_key_bytes())) server_pk_b64 = _b64(kp.public_key().to_bytes()) pk_hex = kp.public_key().to_bytes().hex() node = zkac.Node(kp) mgr = zkac.RegistryManager() n = store.load_all_registries(mgr) print(f"[fs-server] data dir: {data_dir}") print(f"[fs-server] server transport public key (pin OOB): {pk_hex}") print(f"[fs-server] loaded {n} registries") print(f"[fs-server] listening on {host}:{port}") if not _is_loopback(host) and not allow_non_loopback: raise RuntimeError( "refusing to bind outside loopback. " "Use --allow-non-loopback only when exposure is intentional." ) if not _is_loopback(host): print(f"[fs-server] warning: binding outside loopback: {host}:{port}", file=sys.stderr) sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((host, port)) slots = threading.BoundedSemaphore(max_connections) sock.listen(listen_backlog) try: while True: conn, addr = sock.accept() if not slots.acquire(blocking=False): conn.close() continue threading.Thread( target=_handle_conn, args=(conn, addr, node, mgr, store, server_pk_b64, idle_timeout_s, slots), daemon=True, ).start() except KeyboardInterrupt: print("\n[fs-server] shutdown") finally: sock.close() def main() -> None: p = argparse.ArgumentParser(description="ZKAC opaque file-share server (demo)") p.add_argument("--host", default="127.0.0.1") p.add_argument("--port", type=int, default=9879) p.add_argument( "--data-dir", type=Path, default=Path(__file__).resolve().parent / "fs_data", help="server state (transport key, registries, opaque buckets)", ) p.add_argument("--idle-timeout", type=float, default=60.0) p.add_argument("--max-connections", type=int, default=64) p.add_argument("--allow-non-loopback", action="store_true") args = p.parse_args() serve( args.data_dir, host=args.host, port=args.port, max_connections=args.max_connections, idle_timeout_s=args.idle_timeout, allow_non_loopback=args.allow_non_loopback, ) if __name__ == "__main__": main()