ZKAC/demo/file_share_smoke.py
everbarry d5ae07973a polish and self-contain file-share demo UI
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 18:39:39 +02:00

253 lines
9.3 KiB
Python

#!/usr/bin/env python3
"""
End-to-end smoke test for the ZKAC file-share demo.
Exercises:
* admin (Alice) creates a registry on the file-share server,
* admin uploads a folder as an encrypted bucket with per-role visibility masks,
* Alice issues a ZKAC role credential to Bob via direct P2P grant,
* Alice pushes a per-recipient bucket role grant to Bob,
* Bob authenticates to the file-share server with his role credential,
downloads, and decrypts the files his role can see,
* server opacity: every byte at rest in the bucket directory is ciphertext
(no plaintext file content survives on disk).
All ZKAC operations go through ``zkac-node`` (subprocess) and a temporary
``ZKAC_HOME`` so the test is hermetic.
Run::
uv run python demo/file_share_smoke.py
"""
from __future__ import annotations
import os
import shutil
import socket
import sys
import tempfile
import threading
import time
import traceback
from pathlib import Path
DEMO_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(DEMO_DIR))
import zkac # noqa: E402
import file_share_client as fsc # noqa: E402
import file_share_credentials as fscred # noqa: E402
import file_share_server as fss # noqa: E402
import zkac_cli_adapter as cli # noqa: E402
def _free_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
finally:
s.close()
def _wait_until(predicate, timeout_s: float = 5.0, interval_s: float = 0.05) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(interval_s)
return False
def _make_files(folder: Path) -> dict[str, bytes]:
folder.mkdir(parents=True, exist_ok=True)
contents = {
"alpha.txt": b"public-summary: " + os.urandom(2048),
"nested/beta.bin": os.urandom(200 * 1024), # > one chunk
"secret.md": b"# top-secret\n" + os.urandom(8192),
}
paths: dict[str, bytes] = {}
for rel, data in contents.items():
target = folder / rel
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
paths[rel] = data
return paths
def _start_fs_server(host: str, port: int, data_dir: Path) -> threading.Thread:
t = threading.Thread(
target=fss.serve,
args=(data_dir,),
kwargs={"host": host, "port": port, "allow_non_loopback": False},
daemon=True,
)
t.start()
return t
def _server_transport_pubkey_hex(data_dir: Path) -> str:
import json
return zkac.PublicKey.from_bytes(
__import__("base64").b64decode(
json.loads((data_dir / "server_key.json").read_text())["public_b64"]
)
).to_bytes().hex()
def _open_admin_session(userid: str, server: str, server_pk_hex: str, registry_id: str) -> fsc.FileShareSession:
secrets = cli.load_identity_secrets(userid)
cred = cli.load_admin_credential(userid, registry_id)
return fsc.open_session(
server,
server_pk_hex=server_pk_hex,
user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]),
registry_id_hex=registry_id,
role_id=zkac.admin_role_id(),
credential=cred,
user_issuance_pk_hex=secrets["issuance_pk_hex"],
)
def _open_role_session(userid: str, server: str, server_pk_hex: str, registry_id: str, role_name: str) -> fsc.FileShareSession:
secrets = cli.load_identity_secrets(userid)
cred = cli.load_credential(userid, registry_id, role_name)
return fsc.open_session(
server,
server_pk_hex=server_pk_hex,
user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]),
registry_id_hex=registry_id,
role_id=zkac.role_id(role_name),
credential=cred,
user_issuance_pk_hex=secrets["issuance_pk_hex"],
)
def _scan_for_plaintext(haystack_root: Path, needles: list[bytes]) -> list[tuple[Path, int]]:
"""Brute-force grep for any ``needle`` bytes appearing in any file under ``haystack_root``."""
hits: list[tuple[Path, int]] = []
for p in haystack_root.rglob("*"):
if not p.is_file():
continue
data = p.read_bytes()
for n in needles:
if len(n) >= 64 and n in data:
hits.append((p, len(n)))
break
return hits
def main() -> int:
tmp = Path(tempfile.mkdtemp(prefix="zkac-fs-smoke-"))
print(f"[smoke] temp dir: {tmp}")
home = tmp / "zkac_home"
fs_data = tmp / "fs_data"
src = tmp / "src_folder"
out = tmp / "downloads"
home.mkdir()
fs_data.mkdir()
os.environ["ZKAC_HOME"] = str(home)
contents = _make_files(src)
print(f"[smoke] source folder: {src} ({len(contents)} files)")
port = _free_port()
server_addr = f"127.0.0.1:{port}"
_start_fs_server("127.0.0.1", port, fs_data)
if not _wait_until(lambda: (fs_data / "server_key.json").is_file()):
raise RuntimeError("server did not start")
if not _wait_until(lambda: socket.create_connection(("127.0.0.1", port), timeout=0.2)):
raise RuntimeError("server port not ready")
server_pk_hex = _server_transport_pubkey_hex(fs_data)
print(f"[smoke] file-share server ready @ {server_addr}, pk={server_pk_hex[:16]}")
cli.run_cli(["user", "create", "alice"]).raise_for_status()
cli.run_cli(["user", "create", "bob"]).raise_for_status()
cli.server_pin("alice", server_addr, server_pk_hex).raise_for_status()
cli.server_pin("bob", server_addr, server_pk_hex).raise_for_status()
print("[smoke] alice + bob created and pinned server")
rid = cli.registry_create("alice", server_addr, ["viewer", "editor"])
print(f"[smoke] registry created: {rid}")
# --- direct P2P grant: bob listens, alice grants ----------------------
listener_port = _free_port()
listener = cli.P2PListener("bob", host="127.0.0.1", port=listener_port, timeout_s=30.0)
listener.start()
if not _wait_until(lambda: listener.is_running()):
raise RuntimeError("bob listener did not start")
bob_contact = cli.show_user_contact("bob", peer=f"127.0.0.1:{listener_port}")
print(f"[smoke] bob listening on 127.0.0.1:{listener_port}")
try:
fscred.grant_role_p2p("alice", server_addr, rid, "viewer", bob_contact)
except RuntimeError as exc:
listener.stop()
print(f"[smoke] grant failed: {exc}\n[smoke] bob listener output:\n{listener.output()}")
raise
if not _wait_until(lambda: not listener.is_running(), timeout_s=10.0):
listener.stop()
raise RuntimeError(f"listener never exited after grant; output:\n{listener.output()}")
received = listener.parse_received()
print(f"[smoke] bob received: {received}")
assert received and received["role"] == "viewer"
assert received["registry_id"] == rid
# --- alice uploads bucket + sets masks --------------------------------
files_in_order = fsc.flatten_folder(src)
rel_paths = [str(p.relative_to(src.resolve())) for p in files_in_order]
print(f"[smoke] flattened: {rel_paths}")
# mask: viewer sees only first file (alpha.txt), editor sees everything.
viewer_mask = "1" + "0" * (len(files_in_order) - 1)
editor_mask = "1" * len(files_in_order)
with _open_admin_session("alice", server_addr, server_pk_hex, rid) as sess:
manifest = fsc.upload_bucket(
sess, src, server=server_addr, registry_id_hex=rid,
)
manifest.role_masks = {"viewer": viewer_mask, "editor": editor_mask}
# bob's issuance pk -> push viewer role grant
bob_secrets = cli.load_identity_secrets("bob")
fsc.push_role_grant(sess, manifest, "viewer", bob_secrets["issuance_pk_hex"])
fsc.save_manifest("alice", manifest)
print(f"[smoke] uploaded bucket {manifest.bucket_id} with role grants")
# --- bob downloads using his viewer credential ------------------------
with _open_role_session("bob", server_addr, server_pk_hex, rid, "viewer") as sess:
accessible = sess.fs_buckets()
assert accessible == [manifest.bucket_id], accessible
result = fsc.download_bucket(
sess, manifest.bucket_id,
issuance_secret_hex=bob_secrets["issuance_secret_hex"],
output_dir=out / manifest.bucket_id,
)
print(f"[smoke] bob downloaded files: {result['files_written']}")
# --- assertions -------------------------------------------------------
expected_visible = {rel_paths[0]} # only first file
actual = {str(Path(p).relative_to(out / manifest.bucket_id)) for p in result["files_written"]}
assert actual == expected_visible, f"visibility mismatch: {actual} vs {expected_visible}"
decrypted = (out / manifest.bucket_id / rel_paths[0]).read_bytes()
assert decrypted == contents[rel_paths[0]], "decrypted content does not match plaintext"
# opacity: no plaintext file content lives on disk under fs_data
plaintext_needles = list(contents.values())
leaks = _scan_for_plaintext(fs_data / "buckets", plaintext_needles)
assert not leaks, f"plaintext leaked into server storage: {leaks}"
print("[smoke] OK: visibility enforced, content matches, server storage opaque")
print(f"[smoke] cleanup {tmp}")
shutil.rmtree(tmp, ignore_errors=True)
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception:
traceback.print_exc()
sys.exit(1)