ZKAC/demo/file_share_client.py
everbarry fe68752cc7 demo: privacy-harden file share and add guardrail tests
Harden fs auth and storage for a trustless-server model: proof-only hello,
opaque tagged bucket metadata, safer connection logging, and inbox UI without
raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop
tracking demo __pycache__ and fs_data artifacts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 22:31:37 +02:00

599 lines
21 KiB
Python

"""
ZKAC file-share client library (demo).
Reusable, UI-agnostic helpers used by ``file_share_tui.py`` and the smoke test
harness. The library performs:
* deterministic folder flattening and per-file content-key generation,
* per-role visibility bitmasks over the flattened index,
* opaque blob uploads to ``file_share_server.py``,
* role-credential authenticated download + decrypt against the same server.
Authenticated sessions piggy-back on the ZKAC TCP framing/handshake from
``zkac.tcp``; the ``zkac-node`` CLI is only used (separately) for identity,
registry and direct P2P credential grants.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
import socket
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Iterator
import zkac
from zkac.tcp import FramedSession, client_handshake_anon
CHUNK_SIZE = 128 * 1024 # plaintext bytes per stored blob (well under MAX_TCP_FRAME_BYTES)
DEFAULT_CONNECT_TIMEOUT_S = 8.0
# ── small helpers ────────────────────────────────────────────────────
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _parse_server(server: str) -> tuple[str, int]:
host, sep, port_s = server.rpartition(":")
if not sep:
raise ValueError(f"invalid server {server!r}, expected host:port")
port = int(port_s, 10)
if not 1 <= port <= 65535:
raise ValueError(f"port out of range: {port}")
return (host or "127.0.0.1"), port
# ── data classes ─────────────────────────────────────────────────────
@dataclass
class FileChunk:
blob_id: str
eph_pk_b64: str # ephemeral pubkey used by the encrypt step (HPKE-style)
@dataclass
class FileEntry:
rel_path: str
size: int
sha256_b64: str
content_secret_b64: str # 32-byte X25519 secret -> derives the file's recipient kp
chunks: list[FileChunk] = field(default_factory=list)
def to_grant_dict(self) -> dict:
return {
"rel_path": self.rel_path,
"size": self.size,
"sha256_b64": self.sha256_b64,
"content_secret_b64": self.content_secret_b64,
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in self.chunks],
}
@classmethod
def from_grant_dict(cls, data: dict) -> "FileEntry":
return cls(
rel_path=data["rel_path"],
size=int(data["size"]),
sha256_b64=data["sha256_b64"],
content_secret_b64=data["content_secret_b64"],
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in data["chunks"]],
)
@dataclass
class BucketManifest:
"""Admin-side bucket manifest, persisted locally on the bucket creator."""
bucket_id: str
server: str
registry_id_hex: str
files: list[FileEntry]
role_masks: dict[str, str] = field(default_factory=dict) # role -> bitmask string ('0'/'1')
recipients: list[dict] = field(default_factory=list) # local audit log only (no server identity binding)
def to_dict(self) -> dict:
return {
"bucket_id": self.bucket_id,
"server": self.server,
"registry_id_hex": self.registry_id_hex,
"files": [
{
"rel_path": f.rel_path,
"size": f.size,
"sha256_b64": f.sha256_b64,
"content_secret_b64": f.content_secret_b64,
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in f.chunks],
}
for f in self.files
],
"role_masks": self.role_masks,
"recipients": self.recipients,
}
@classmethod
def from_dict(cls, data: dict) -> "BucketManifest":
files = [
FileEntry(
rel_path=fd["rel_path"],
size=int(fd["size"]),
sha256_b64=fd["sha256_b64"],
content_secret_b64=fd["content_secret_b64"],
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in fd["chunks"]],
)
for fd in data.get("files", [])
]
return cls(
bucket_id=data["bucket_id"],
server=data["server"],
registry_id_hex=data["registry_id_hex"],
files=files,
role_masks=dict(data.get("role_masks", {})),
recipients=list(data.get("recipients", [])),
)
# ── folder flattening + chunking ─────────────────────────────────────
def flatten_folder(folder: Path) -> list[Path]:
"""Deterministic relative-path order: sorted, files only, skipping hidden entries."""
folder = folder.resolve()
if not folder.is_dir():
raise NotADirectoryError(folder)
out: list[Path] = []
for path in sorted(folder.rglob("*")):
if not path.is_file():
continue
if any(part.startswith(".") for part in path.relative_to(folder).parts):
continue
out.append(path)
return sorted(out, key=lambda p: str(p.relative_to(folder)))
def _read_chunks(path: Path, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]:
with path.open("rb") as fh:
while True:
buf = fh.read(chunk_size)
if not buf:
return
yield buf
def _sha256_file(path: Path) -> bytes:
h = hashlib.sha256()
for buf in _read_chunks(path, 64 * 1024):
h.update(buf)
return h.digest()
def encrypt_file_to_blobs(path: Path) -> tuple[FileEntry, list[bytes]]:
"""Generate a per-file content keypair, encrypt each chunk to its public key.
Each chunk uses ``zkac.encrypt_for_admin`` so the matching
``IssuanceKeypair.decrypt`` on the recipient side succeeds (both bind the
``user->admin`` HKDF label).
Returns the file entry plus the parallel list of ciphertext blobs.
"""
content_kp = zkac.IssuanceKeypair()
content_pk = bytes(content_kp.public_key_bytes())
content_sk = bytes(content_kp.secret_bytes())
sha = _sha256_file(path)
size = path.stat().st_size
entry = FileEntry(
rel_path="",
size=size,
sha256_b64=_b64(sha),
content_secret_b64=_b64(content_sk),
)
blobs: list[bytes] = []
for chunk in _read_chunks(path):
eph_pk, ciphertext = zkac.encrypt_for_admin(content_pk, chunk)
entry.chunks.append(FileChunk(blob_id=uuid.uuid4().hex, eph_pk_b64=_b64(bytes(eph_pk))))
blobs.append(bytes(ciphertext))
return entry, blobs
def decrypt_file_from_blobs(entry: FileEntry, blobs: Iterable[bytes]) -> bytes:
"""Reassemble + verify a file from its (already fetched) ciphertext blobs."""
content_kp = zkac.IssuanceKeypair.from_secret(_unb64(entry.content_secret_b64))
out = bytearray()
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
out.extend(bytes(content_kp.decrypt(_unb64(chunk_meta.eph_pk_b64), ciphertext)))
if hashlib.sha256(bytes(out)).digest() != _unb64(entry.sha256_b64):
raise RuntimeError(f"hash mismatch for {entry.rel_path!r}")
return bytes(out)
# ── role bitmasks ────────────────────────────────────────────────────
def normalize_mask(mask: str, file_count: int) -> str:
"""Pad/truncate a bitmask string of '0'/'1' to ``file_count`` and validate."""
cleaned = "".join(c for c in mask if c in "01")
if len(cleaned) > file_count:
raise ValueError(f"mask {mask!r} longer than {file_count} files")
cleaned = cleaned.ljust(file_count, "0")
return cleaned
def files_visible_to_mask(files: list[FileEntry], mask: str) -> list[FileEntry]:
return [f for f, bit in zip(files, mask) if bit == "1"]
def encode_grant_payload(
bucket_id: str,
role_name: str,
server: str,
registry_id_hex: str,
visible_files: list[FileEntry],
) -> bytes:
payload = {
"bucket_id": bucket_id,
"role_name": role_name,
"server": server,
"registry_id_hex": registry_id_hex,
"files": [f.to_grant_dict() for f in visible_files],
}
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
def encrypt_grant_to_recipient(payload: bytes, recipient_issuance_pk_hex: str) -> tuple[str, str]:
"""User->admin direction sealed box: ``zkac.encrypt_for_admin`` -> (eph_pk_b64, ciphertext_b64)."""
rec_pk = bytes.fromhex(recipient_issuance_pk_hex)
if len(rec_pk) != 32:
raise ValueError("recipient issuance public key must be 32 bytes")
eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk, payload)
return _b64(bytes(eph_pk)), _b64(bytes(ciphertext))
def decrypt_grant_for_recipient(
eph_pk_b64: str,
ciphertext_b64: str,
issuance_secret_hex: str,
) -> dict:
secret = bytes.fromhex(issuance_secret_hex)
if len(secret) != 32:
raise ValueError("issuance secret must be 32 bytes")
receiver = zkac.IssuanceKeypair.from_secret(secret)
plaintext = bytes(receiver.decrypt(_unb64(eph_pk_b64), _unb64(ciphertext_b64)))
return json.loads(plaintext.decode("utf-8"))
# ── encrypted authenticated session to file-share server ─────────────
class FileShareSession:
"""Anonymous handshake + ``op:'fs'`` BBS+ presentation -> framed JSON RPC."""
def __init__(self, sock: socket.socket, framed: FramedSession) -> None:
self._sock = sock
self._framed = framed
def _call(self, cmd: dict) -> dict:
self._framed.send(json.dumps(cmd).encode("utf-8"))
resp = json.loads(self._framed.recv())
if resp.get("error"):
raise RuntimeError(resp["error"])
return resp
def close(self) -> None:
try:
self._sock.close()
except OSError:
pass
def __enter__(self) -> "FileShareSession":
return self
def __exit__(self, *exc: object) -> None:
self.close()
# admin commands
def bucket_create(self, bucket_id: str | None = None) -> str:
cmd = {"cmd": "bucket_create"}
if bucket_id is not None:
cmd["bucket_id"] = bucket_id
return self._call(cmd)["bucket_id"]
def bucket_put_blob(self, bucket_id: str, blob_id: str, ciphertext: bytes) -> None:
self._call({
"cmd": "bucket_put_blob",
"bucket_id": bucket_id,
"blob_id": blob_id,
"ciphertext_b64": _b64(ciphertext),
})
def bucket_set_role_acl(self, bucket_id: str, role_id_hex: str, allowed_blob_ids: list[str]) -> None:
self._call({
"cmd": "bucket_set_role_acl",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
"allowed_blob_ids": allowed_blob_ids,
})
def bucket_put_role_grant(
self,
bucket_id: str,
role_id_hex: str,
acl_version: int,
eph_pk_b64: str,
ciphertext_b64: str,
) -> None:
self._call({
"cmd": "bucket_put_role_grant",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
"acl_version": int(acl_version),
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
})
def bucket_get_role_acl(self, bucket_id: str, role_id_hex: str) -> dict:
return self._call({
"cmd": "bucket_get_role_acl",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
})
def bucket_finalize(self, bucket_id: str) -> None:
self._call({"cmd": "bucket_finalize", "bucket_id": bucket_id})
def bucket_delete(self, bucket_id: str) -> None:
self._call({"cmd": "bucket_delete", "bucket_id": bucket_id})
def bucket_list_owned(self) -> list[str]:
return self._call({"cmd": "bucket_list_owned"})["bucket_ids"]
# any-role commands
def whoami(self) -> dict:
return self._call({"cmd": "whoami"})
def fs_buckets(self) -> list[str]:
return self._call({"cmd": "fs_buckets"})["bucket_ids"]
def fs_get_role_grant(self, bucket_id: str) -> dict:
return self._call({"cmd": "fs_get_role_grant", "bucket_id": bucket_id})
def fs_get_blob(self, bucket_id: str, blob_id: str) -> bytes:
return _unb64(self._call({"cmd": "fs_get_blob", "bucket_id": bucket_id, "blob_id": blob_id})["ciphertext_b64"])
def open_session(
server: str,
*,
server_pk_hex: str,
user_transport_secret: bytes,
registry_id_hex: str,
role_id: bytes,
credential: "zkac.Credential",
connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S,
) -> FileShareSession:
"""Connect, anonymous-handshake, then present a BBS+ proof bound to the transcript."""
host, port = _parse_server(server)
sock = socket.create_connection((host, port), timeout=connect_timeout_s)
sock.settimeout(None)
try:
node = zkac.Node(zkac.Keypair.from_secret_key(user_transport_secret))
server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex))
session = client_handshake_anon(sock, node, server_pk)
framed = FramedSession(sock, session)
transcript_hash = bytes(session.transcript_hash())
bbs_auth_proof = bytes(credential.present(transcript_hash))
framed.send(json.dumps({
"op": "fs",
"bbs_auth_b64": _b64(bbs_auth_proof),
}).encode("utf-8"))
hello = json.loads(framed.recv())
if hello.get("error"):
raise RuntimeError(hello["error"])
return FileShareSession(sock, framed)
except Exception:
sock.close()
raise
# ── higher-level admin helpers ───────────────────────────────────────
def upload_bucket(
sess: FileShareSession,
folder: Path,
*,
server: str,
registry_id_hex: str,
bucket_id: str | None = None,
) -> BucketManifest:
"""Encrypt every file under ``folder`` and stream blobs to the server."""
bid = sess.bucket_create(bucket_id)
paths = flatten_folder(folder)
files: list[FileEntry] = []
for p in paths:
entry, blobs = encrypt_file_to_blobs(p)
entry.rel_path = str(p.relative_to(folder.resolve()))
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
sess.bucket_put_blob(bid, chunk_meta.blob_id, ciphertext)
files.append(entry)
sess.bucket_finalize(bid)
return BucketManifest(
bucket_id=bid,
server=server,
registry_id_hex=registry_id_hex,
files=files,
)
def apply_role_masks_to_server(sess: FileShareSession, manifest: BucketManifest) -> None:
"""Push per-role blob ACLs so server enforces mask on fs_get_blob."""
if not manifest.role_masks:
return
for role_name, raw_mask in manifest.role_masks.items():
mask = normalize_mask(raw_mask, len(manifest.files))
visible = files_visible_to_mask(manifest.files, mask)
allowed_blob_ids = [c.blob_id for f in visible for c in f.chunks]
sess.bucket_set_role_acl(
manifest.bucket_id,
zkac.role_id(role_name).hex(),
allowed_blob_ids,
)
def push_role_grant(
sess: FileShareSession,
manifest: BucketManifest,
role_name: str,
recipient_issuance_pk_hex: str,
) -> None:
"""Upload anonymous encrypted role grant envelope (no recipient identifier stored)."""
if role_name not in manifest.role_masks:
raise RuntimeError(f"role {role_name!r} has no visibility mask in this bucket")
mask = normalize_mask(manifest.role_masks[role_name], len(manifest.files))
visible = files_visible_to_mask(manifest.files, mask)
role_id_hex = zkac.role_id(role_name).hex()
acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex)
acl_version = int(acl_meta.get("version", 0))
if acl_version <= 0:
raise RuntimeError("server ACL missing for role; apply permissions before sharing")
payload = encode_grant_payload(
manifest.bucket_id, role_name, manifest.server, manifest.registry_id_hex, visible,
)
eph_pk_b64, ct_b64 = encrypt_grant_to_recipient(payload, recipient_issuance_pk_hex)
sess.bucket_put_role_grant(
manifest.bucket_id,
role_id_hex,
acl_version,
eph_pk_b64,
ct_b64,
)
manifest.recipients.append({
"role_name": role_name,
"recipient_hint": recipient_issuance_pk_hex[:16],
})
def download_bucket(
sess: FileShareSession,
bucket_id: str,
*,
issuance_secret_hex: str,
role_grant_payload: dict | None = None,
output_dir: Path,
) -> dict:
"""Fetch + decrypt files from a role grant, optionally fetched from server."""
payload = role_grant_payload
if payload is None:
grants = sess.fs_get_role_grant(bucket_id).get("grants", [])
for grant in grants:
try:
payload = decrypt_grant_for_recipient(
grant["eph_pk_b64"],
grant["ciphertext_b64"],
issuance_secret_hex,
)
break
except Exception:
continue
if payload is None:
raise RuntimeError("no decryptable role grant for this credential")
if payload.get("bucket_id") != bucket_id:
raise RuntimeError("role grant bucket_id mismatch")
output_dir.mkdir(parents=True, exist_ok=True)
written: list[str] = []
for fd in payload.get("files", []):
entry = FileEntry.from_grant_dict(fd)
ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks]
plaintext = decrypt_file_from_blobs(entry, ciphertexts)
out_path = output_dir / entry.rel_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(plaintext)
written.append(str(out_path))
return {"role_name": payload.get("role_name"), "files_written": written}
def _received_grant_path(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> Path:
return state_dir(userid) / "received_grants" / registry_id_hex / role_name / f"{bucket_id}.json"
def save_received_role_grant(
userid: str,
registry_id_hex: str,
role_name: str,
bucket_id: str,
*,
eph_pk_b64: str,
ciphertext_b64: str,
) -> Path:
path = _received_grant_path(userid, registry_id_hex, role_name, bucket_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({
"registry_id_hex": registry_id_hex,
"role_name": role_name,
"bucket_id": bucket_id,
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
}, indent=2))
try:
os.chmod(path, 0o600)
except OSError:
pass
return path
def list_received_role_grants(userid: str, registry_id_hex: str, role_name: str) -> list[str]:
base = state_dir(userid) / "received_grants" / registry_id_hex / role_name
if not base.is_dir():
return []
return sorted(p.stem for p in base.glob("*.json"))
def load_received_role_grant(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> dict:
return json.loads(_received_grant_path(userid, registry_id_hex, role_name, bucket_id).read_text())
# ── local persistence (admin manifests) ──────────────────────────────
def state_dir(userid: str) -> Path:
# Keep demo state fully self-contained with demo ZKAC_HOME, rather than
# mixing with repository-local legacy state directories.
base_home = Path(os.environ.get("ZKAC_HOME", Path.home() / ".ZKAC-FS"))
base = base_home / userid / "file_share"
base.mkdir(parents=True, exist_ok=True)
try:
os.chmod(base, 0o700)
except OSError:
pass
return base
def manifest_path(userid: str, bucket_id: str) -> Path:
return state_dir(userid) / "buckets" / f"{bucket_id}.json"
def save_manifest(userid: str, manifest: BucketManifest) -> Path:
p = manifest_path(userid, manifest.bucket_id)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(manifest.to_dict(), indent=2))
try:
os.chmod(p, 0o600)
except OSError:
pass
return p
def load_manifest(userid: str, bucket_id: str) -> BucketManifest:
return BucketManifest.from_dict(json.loads(manifest_path(userid, bucket_id).read_text()))
def list_manifests(userid: str) -> list[str]:
base = state_dir(userid) / "buckets"
if not base.is_dir():
return []
return sorted(p.stem for p in base.glob("*.json"))