Compare commits

...

3 Commits

Author SHA1 Message Date
everbarry
259e904c64 up gitignore 2026-05-07 22:34:41 +02:00
everbarry
fe68752cc7 demo: privacy-harden file share and add guardrail tests
Harden fs auth and storage for a trustless-server model: proof-only hello,
opaque tagged bucket metadata, safer connection logging, and inbox UI without
raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop
tracking demo __pycache__ and fs_data artifacts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 22:31:37 +02:00
everbarry
d5ae07973a polish and self-contain file-share demo UI
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 18:39:39 +02:00
16 changed files with 3798 additions and 324 deletions

25
.github/workflows/fuzz-smoke.yml vendored Normal file
View File

@ -0,0 +1,25 @@
name: fuzz-smoke
on:
push:
branches: [master, main]
pull_request:
branches: [master, main]
jobs:
libfuzzer:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Install Rust
uses: dtolnay/rust-toolchain@stable
- name: Install cargo-fuzz
uses: taiki-e/install-action@cargo-fuzz
- name: Build fuzz targets (no sanitizer — stable)
run: cargo fuzz build -s none
working-directory: ${{ github.workspace }}
- name: Smoke all libFuzzer targets
run: bash scripts/fuzz-libfuzzer.sh
working-directory: ${{ github.workspace }}
env:
FUZZ_RUNS: "2000"

7
.gitignore vendored
View File

@ -24,3 +24,10 @@ cli/zkac_cli/__pycache__
# wasm-pack output for browser demo
demo/static/pkg/
# file-share demo runtime / local-only artifacts
demo/__pycache__/
demo/fs_data/
demo/.demo_state/
demo/creds/
demo/test_bucket/

View File

@ -2,9 +2,11 @@ README.md
pyproject.toml
zkac_cli/__init__.py
zkac_cli/client.py
zkac_cli/i2p_serve.py
zkac_cli/main.py
zkac_cli/paths.py
zkac_cli/server.py
zkac_cli/server_debug.py
zkac_cli/store.py
zkac_node.egg-info/PKG-INFO
zkac_node.egg-info/SOURCES.txt

View File

@ -1,2 +1,3 @@
[console_scripts]
zkac-node = zkac_cli.main:main
zkac-node-i2p-server = zkac_cli.i2p_serve:main

49
demo/README.md Normal file
View File

@ -0,0 +1,49 @@
# ZKAC File-Share Demo
This folder contains only the self-contained Textual file-share demo.
## Files
- `demo/file_share_server.py`: headless opaque server (registry mgmt + file-share channel).
- `demo/file_share_client.py`: upload/download + role-mask utilities.
- `demo/file_share_credentials.py`: P2P credential grant helper.
- `demo/file_share_tui.py`: Textual UI.
- `demo/zkac_cli_adapter.py`: subprocess bridge to `zkac-node`.
- `demo/file_share_smoke.py`: end-to-end smoke test.
- `demo/test_demo_privacy_guardrails.py`: pytest privacy regressions for the demo.
## Run
```bash
uv sync --extra demo
uv run python demo/file_share_server.py --port 9879
uv run python demo/file_share_tui.py
```
The demo uses `ZKAC_HOME=~/.ZKAC-FS` by default, so it stays isolated from other
local ZKAC usage.
## UI Flow
- `Login`
- `Connect` (reuses pinned server key when available)
- `Select Bucket` (list owned + permitted buckets, or create new)
- `Permissions` (edit per-role bitmask)
- `Share Permissions`
- `Listen` (optional port; blank means random)
- `Inbox`
`c` copies the latest generated contact bundle to clipboard.
## Verify
```bash
uv run python demo/file_share_smoke.py
pytest demo/test_demo_privacy_guardrails.py
```
## Future Work
- Further reduce at-rest metadata by removing persisted raw role-id indexes used
for proof candidate discovery after restart, while preserving reliable auth
recovery semantics.

View File

@ -1,2 +0,0 @@
*
!.gitignore

598
demo/file_share_client.py Normal file
View File

@ -0,0 +1,598 @@
"""
ZKAC file-share client library (demo).
Reusable, UI-agnostic helpers used by ``file_share_tui.py`` and the smoke test
harness. The library performs:
* deterministic folder flattening and per-file content-key generation,
* per-role visibility bitmasks over the flattened index,
* opaque blob uploads to ``file_share_server.py``,
* role-credential authenticated download + decrypt against the same server.
Authenticated sessions piggy-back on the ZKAC TCP framing/handshake from
``zkac.tcp``; the ``zkac-node`` CLI is only used (separately) for identity,
registry and direct P2P credential grants.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
import socket
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterable, Iterator
import zkac
from zkac.tcp import FramedSession, client_handshake_anon
CHUNK_SIZE = 128 * 1024 # plaintext bytes per stored blob (well under MAX_TCP_FRAME_BYTES)
DEFAULT_CONNECT_TIMEOUT_S = 8.0
# ── small helpers ────────────────────────────────────────────────────
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _parse_server(server: str) -> tuple[str, int]:
host, sep, port_s = server.rpartition(":")
if not sep:
raise ValueError(f"invalid server {server!r}, expected host:port")
port = int(port_s, 10)
if not 1 <= port <= 65535:
raise ValueError(f"port out of range: {port}")
return (host or "127.0.0.1"), port
# ── data classes ─────────────────────────────────────────────────────
@dataclass
class FileChunk:
blob_id: str
eph_pk_b64: str # ephemeral pubkey used by the encrypt step (HPKE-style)
@dataclass
class FileEntry:
rel_path: str
size: int
sha256_b64: str
content_secret_b64: str # 32-byte X25519 secret -> derives the file's recipient kp
chunks: list[FileChunk] = field(default_factory=list)
def to_grant_dict(self) -> dict:
return {
"rel_path": self.rel_path,
"size": self.size,
"sha256_b64": self.sha256_b64,
"content_secret_b64": self.content_secret_b64,
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in self.chunks],
}
@classmethod
def from_grant_dict(cls, data: dict) -> "FileEntry":
return cls(
rel_path=data["rel_path"],
size=int(data["size"]),
sha256_b64=data["sha256_b64"],
content_secret_b64=data["content_secret_b64"],
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in data["chunks"]],
)
@dataclass
class BucketManifest:
"""Admin-side bucket manifest, persisted locally on the bucket creator."""
bucket_id: str
server: str
registry_id_hex: str
files: list[FileEntry]
role_masks: dict[str, str] = field(default_factory=dict) # role -> bitmask string ('0'/'1')
recipients: list[dict] = field(default_factory=list) # local audit log only (no server identity binding)
def to_dict(self) -> dict:
return {
"bucket_id": self.bucket_id,
"server": self.server,
"registry_id_hex": self.registry_id_hex,
"files": [
{
"rel_path": f.rel_path,
"size": f.size,
"sha256_b64": f.sha256_b64,
"content_secret_b64": f.content_secret_b64,
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in f.chunks],
}
for f in self.files
],
"role_masks": self.role_masks,
"recipients": self.recipients,
}
@classmethod
def from_dict(cls, data: dict) -> "BucketManifest":
files = [
FileEntry(
rel_path=fd["rel_path"],
size=int(fd["size"]),
sha256_b64=fd["sha256_b64"],
content_secret_b64=fd["content_secret_b64"],
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in fd["chunks"]],
)
for fd in data.get("files", [])
]
return cls(
bucket_id=data["bucket_id"],
server=data["server"],
registry_id_hex=data["registry_id_hex"],
files=files,
role_masks=dict(data.get("role_masks", {})),
recipients=list(data.get("recipients", [])),
)
# ── folder flattening + chunking ─────────────────────────────────────
def flatten_folder(folder: Path) -> list[Path]:
"""Deterministic relative-path order: sorted, files only, skipping hidden entries."""
folder = folder.resolve()
if not folder.is_dir():
raise NotADirectoryError(folder)
out: list[Path] = []
for path in sorted(folder.rglob("*")):
if not path.is_file():
continue
if any(part.startswith(".") for part in path.relative_to(folder).parts):
continue
out.append(path)
return sorted(out, key=lambda p: str(p.relative_to(folder)))
def _read_chunks(path: Path, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]:
with path.open("rb") as fh:
while True:
buf = fh.read(chunk_size)
if not buf:
return
yield buf
def _sha256_file(path: Path) -> bytes:
h = hashlib.sha256()
for buf in _read_chunks(path, 64 * 1024):
h.update(buf)
return h.digest()
def encrypt_file_to_blobs(path: Path) -> tuple[FileEntry, list[bytes]]:
"""Generate a per-file content keypair, encrypt each chunk to its public key.
Each chunk uses ``zkac.encrypt_for_admin`` so the matching
``IssuanceKeypair.decrypt`` on the recipient side succeeds (both bind the
``user->admin`` HKDF label).
Returns the file entry plus the parallel list of ciphertext blobs.
"""
content_kp = zkac.IssuanceKeypair()
content_pk = bytes(content_kp.public_key_bytes())
content_sk = bytes(content_kp.secret_bytes())
sha = _sha256_file(path)
size = path.stat().st_size
entry = FileEntry(
rel_path="",
size=size,
sha256_b64=_b64(sha),
content_secret_b64=_b64(content_sk),
)
blobs: list[bytes] = []
for chunk in _read_chunks(path):
eph_pk, ciphertext = zkac.encrypt_for_admin(content_pk, chunk)
entry.chunks.append(FileChunk(blob_id=uuid.uuid4().hex, eph_pk_b64=_b64(bytes(eph_pk))))
blobs.append(bytes(ciphertext))
return entry, blobs
def decrypt_file_from_blobs(entry: FileEntry, blobs: Iterable[bytes]) -> bytes:
"""Reassemble + verify a file from its (already fetched) ciphertext blobs."""
content_kp = zkac.IssuanceKeypair.from_secret(_unb64(entry.content_secret_b64))
out = bytearray()
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
out.extend(bytes(content_kp.decrypt(_unb64(chunk_meta.eph_pk_b64), ciphertext)))
if hashlib.sha256(bytes(out)).digest() != _unb64(entry.sha256_b64):
raise RuntimeError(f"hash mismatch for {entry.rel_path!r}")
return bytes(out)
# ── role bitmasks ────────────────────────────────────────────────────
def normalize_mask(mask: str, file_count: int) -> str:
"""Pad/truncate a bitmask string of '0'/'1' to ``file_count`` and validate."""
cleaned = "".join(c for c in mask if c in "01")
if len(cleaned) > file_count:
raise ValueError(f"mask {mask!r} longer than {file_count} files")
cleaned = cleaned.ljust(file_count, "0")
return cleaned
def files_visible_to_mask(files: list[FileEntry], mask: str) -> list[FileEntry]:
return [f for f, bit in zip(files, mask) if bit == "1"]
def encode_grant_payload(
bucket_id: str,
role_name: str,
server: str,
registry_id_hex: str,
visible_files: list[FileEntry],
) -> bytes:
payload = {
"bucket_id": bucket_id,
"role_name": role_name,
"server": server,
"registry_id_hex": registry_id_hex,
"files": [f.to_grant_dict() for f in visible_files],
}
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
def encrypt_grant_to_recipient(payload: bytes, recipient_issuance_pk_hex: str) -> tuple[str, str]:
"""User->admin direction sealed box: ``zkac.encrypt_for_admin`` -> (eph_pk_b64, ciphertext_b64)."""
rec_pk = bytes.fromhex(recipient_issuance_pk_hex)
if len(rec_pk) != 32:
raise ValueError("recipient issuance public key must be 32 bytes")
eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk, payload)
return _b64(bytes(eph_pk)), _b64(bytes(ciphertext))
def decrypt_grant_for_recipient(
eph_pk_b64: str,
ciphertext_b64: str,
issuance_secret_hex: str,
) -> dict:
secret = bytes.fromhex(issuance_secret_hex)
if len(secret) != 32:
raise ValueError("issuance secret must be 32 bytes")
receiver = zkac.IssuanceKeypair.from_secret(secret)
plaintext = bytes(receiver.decrypt(_unb64(eph_pk_b64), _unb64(ciphertext_b64)))
return json.loads(plaintext.decode("utf-8"))
# ── encrypted authenticated session to file-share server ─────────────
class FileShareSession:
"""Anonymous handshake + ``op:'fs'`` BBS+ presentation -> framed JSON RPC."""
def __init__(self, sock: socket.socket, framed: FramedSession) -> None:
self._sock = sock
self._framed = framed
def _call(self, cmd: dict) -> dict:
self._framed.send(json.dumps(cmd).encode("utf-8"))
resp = json.loads(self._framed.recv())
if resp.get("error"):
raise RuntimeError(resp["error"])
return resp
def close(self) -> None:
try:
self._sock.close()
except OSError:
pass
def __enter__(self) -> "FileShareSession":
return self
def __exit__(self, *exc: object) -> None:
self.close()
# admin commands
def bucket_create(self, bucket_id: str | None = None) -> str:
cmd = {"cmd": "bucket_create"}
if bucket_id is not None:
cmd["bucket_id"] = bucket_id
return self._call(cmd)["bucket_id"]
def bucket_put_blob(self, bucket_id: str, blob_id: str, ciphertext: bytes) -> None:
self._call({
"cmd": "bucket_put_blob",
"bucket_id": bucket_id,
"blob_id": blob_id,
"ciphertext_b64": _b64(ciphertext),
})
def bucket_set_role_acl(self, bucket_id: str, role_id_hex: str, allowed_blob_ids: list[str]) -> None:
self._call({
"cmd": "bucket_set_role_acl",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
"allowed_blob_ids": allowed_blob_ids,
})
def bucket_put_role_grant(
self,
bucket_id: str,
role_id_hex: str,
acl_version: int,
eph_pk_b64: str,
ciphertext_b64: str,
) -> None:
self._call({
"cmd": "bucket_put_role_grant",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
"acl_version": int(acl_version),
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
})
def bucket_get_role_acl(self, bucket_id: str, role_id_hex: str) -> dict:
return self._call({
"cmd": "bucket_get_role_acl",
"bucket_id": bucket_id,
"role_id_hex": role_id_hex,
})
def bucket_finalize(self, bucket_id: str) -> None:
self._call({"cmd": "bucket_finalize", "bucket_id": bucket_id})
def bucket_delete(self, bucket_id: str) -> None:
self._call({"cmd": "bucket_delete", "bucket_id": bucket_id})
def bucket_list_owned(self) -> list[str]:
return self._call({"cmd": "bucket_list_owned"})["bucket_ids"]
# any-role commands
def whoami(self) -> dict:
return self._call({"cmd": "whoami"})
def fs_buckets(self) -> list[str]:
return self._call({"cmd": "fs_buckets"})["bucket_ids"]
def fs_get_role_grant(self, bucket_id: str) -> dict:
return self._call({"cmd": "fs_get_role_grant", "bucket_id": bucket_id})
def fs_get_blob(self, bucket_id: str, blob_id: str) -> bytes:
return _unb64(self._call({"cmd": "fs_get_blob", "bucket_id": bucket_id, "blob_id": blob_id})["ciphertext_b64"])
def open_session(
server: str,
*,
server_pk_hex: str,
user_transport_secret: bytes,
registry_id_hex: str,
role_id: bytes,
credential: "zkac.Credential",
connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S,
) -> FileShareSession:
"""Connect, anonymous-handshake, then present a BBS+ proof bound to the transcript."""
host, port = _parse_server(server)
sock = socket.create_connection((host, port), timeout=connect_timeout_s)
sock.settimeout(None)
try:
node = zkac.Node(zkac.Keypair.from_secret_key(user_transport_secret))
server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex))
session = client_handshake_anon(sock, node, server_pk)
framed = FramedSession(sock, session)
transcript_hash = bytes(session.transcript_hash())
bbs_auth_proof = bytes(credential.present(transcript_hash))
framed.send(json.dumps({
"op": "fs",
"bbs_auth_b64": _b64(bbs_auth_proof),
}).encode("utf-8"))
hello = json.loads(framed.recv())
if hello.get("error"):
raise RuntimeError(hello["error"])
return FileShareSession(sock, framed)
except Exception:
sock.close()
raise
# ── higher-level admin helpers ───────────────────────────────────────
def upload_bucket(
sess: FileShareSession,
folder: Path,
*,
server: str,
registry_id_hex: str,
bucket_id: str | None = None,
) -> BucketManifest:
"""Encrypt every file under ``folder`` and stream blobs to the server."""
bid = sess.bucket_create(bucket_id)
paths = flatten_folder(folder)
files: list[FileEntry] = []
for p in paths:
entry, blobs = encrypt_file_to_blobs(p)
entry.rel_path = str(p.relative_to(folder.resolve()))
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
sess.bucket_put_blob(bid, chunk_meta.blob_id, ciphertext)
files.append(entry)
sess.bucket_finalize(bid)
return BucketManifest(
bucket_id=bid,
server=server,
registry_id_hex=registry_id_hex,
files=files,
)
def apply_role_masks_to_server(sess: FileShareSession, manifest: BucketManifest) -> None:
"""Push per-role blob ACLs so server enforces mask on fs_get_blob."""
if not manifest.role_masks:
return
for role_name, raw_mask in manifest.role_masks.items():
mask = normalize_mask(raw_mask, len(manifest.files))
visible = files_visible_to_mask(manifest.files, mask)
allowed_blob_ids = [c.blob_id for f in visible for c in f.chunks]
sess.bucket_set_role_acl(
manifest.bucket_id,
zkac.role_id(role_name).hex(),
allowed_blob_ids,
)
def push_role_grant(
sess: FileShareSession,
manifest: BucketManifest,
role_name: str,
recipient_issuance_pk_hex: str,
) -> None:
"""Upload anonymous encrypted role grant envelope (no recipient identifier stored)."""
if role_name not in manifest.role_masks:
raise RuntimeError(f"role {role_name!r} has no visibility mask in this bucket")
mask = normalize_mask(manifest.role_masks[role_name], len(manifest.files))
visible = files_visible_to_mask(manifest.files, mask)
role_id_hex = zkac.role_id(role_name).hex()
acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex)
acl_version = int(acl_meta.get("version", 0))
if acl_version <= 0:
raise RuntimeError("server ACL missing for role; apply permissions before sharing")
payload = encode_grant_payload(
manifest.bucket_id, role_name, manifest.server, manifest.registry_id_hex, visible,
)
eph_pk_b64, ct_b64 = encrypt_grant_to_recipient(payload, recipient_issuance_pk_hex)
sess.bucket_put_role_grant(
manifest.bucket_id,
role_id_hex,
acl_version,
eph_pk_b64,
ct_b64,
)
manifest.recipients.append({
"role_name": role_name,
"recipient_hint": recipient_issuance_pk_hex[:16],
})
def download_bucket(
sess: FileShareSession,
bucket_id: str,
*,
issuance_secret_hex: str,
role_grant_payload: dict | None = None,
output_dir: Path,
) -> dict:
"""Fetch + decrypt files from a role grant, optionally fetched from server."""
payload = role_grant_payload
if payload is None:
grants = sess.fs_get_role_grant(bucket_id).get("grants", [])
for grant in grants:
try:
payload = decrypt_grant_for_recipient(
grant["eph_pk_b64"],
grant["ciphertext_b64"],
issuance_secret_hex,
)
break
except Exception:
continue
if payload is None:
raise RuntimeError("no decryptable role grant for this credential")
if payload.get("bucket_id") != bucket_id:
raise RuntimeError("role grant bucket_id mismatch")
output_dir.mkdir(parents=True, exist_ok=True)
written: list[str] = []
for fd in payload.get("files", []):
entry = FileEntry.from_grant_dict(fd)
ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks]
plaintext = decrypt_file_from_blobs(entry, ciphertexts)
out_path = output_dir / entry.rel_path
out_path.parent.mkdir(parents=True, exist_ok=True)
out_path.write_bytes(plaintext)
written.append(str(out_path))
return {"role_name": payload.get("role_name"), "files_written": written}
def _received_grant_path(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> Path:
return state_dir(userid) / "received_grants" / registry_id_hex / role_name / f"{bucket_id}.json"
def save_received_role_grant(
userid: str,
registry_id_hex: str,
role_name: str,
bucket_id: str,
*,
eph_pk_b64: str,
ciphertext_b64: str,
) -> Path:
path = _received_grant_path(userid, registry_id_hex, role_name, bucket_id)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps({
"registry_id_hex": registry_id_hex,
"role_name": role_name,
"bucket_id": bucket_id,
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
}, indent=2))
try:
os.chmod(path, 0o600)
except OSError:
pass
return path
def list_received_role_grants(userid: str, registry_id_hex: str, role_name: str) -> list[str]:
base = state_dir(userid) / "received_grants" / registry_id_hex / role_name
if not base.is_dir():
return []
return sorted(p.stem for p in base.glob("*.json"))
def load_received_role_grant(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> dict:
return json.loads(_received_grant_path(userid, registry_id_hex, role_name, bucket_id).read_text())
# ── local persistence (admin manifests) ──────────────────────────────
def state_dir(userid: str) -> Path:
# Keep demo state fully self-contained with demo ZKAC_HOME, rather than
# mixing with repository-local legacy state directories.
base_home = Path(os.environ.get("ZKAC_HOME", Path.home() / ".ZKAC-FS"))
base = base_home / userid / "file_share"
base.mkdir(parents=True, exist_ok=True)
try:
os.chmod(base, 0o700)
except OSError:
pass
return base
def manifest_path(userid: str, bucket_id: str) -> Path:
return state_dir(userid) / "buckets" / f"{bucket_id}.json"
def save_manifest(userid: str, manifest: BucketManifest) -> Path:
p = manifest_path(userid, manifest.bucket_id)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(manifest.to_dict(), indent=2))
try:
os.chmod(p, 0o600)
except OSError:
pass
return p
def load_manifest(userid: str, bucket_id: str) -> BucketManifest:
return BucketManifest.from_dict(json.loads(manifest_path(userid, bucket_id).read_text()))
def list_manifests(userid: str) -> list[str]:
base = state_dir(userid) / "buckets"
if not base.is_dir():
return []
return sorted(p.stem for p in base.glob("*.json"))

View File

@ -0,0 +1,338 @@
"""
Demo-local P2P credential grant helper.
The shipped ``zkac-node grant`` command pairs a sender that calls
``IssuanceKeypair.encrypt`` (HKDF info ``admin->user``) with a listener that
calls ``IssuanceKeypair.decrypt`` (HKDF info ``user->admin``). Those labels
intentionally differ in the protocol they describe the user/admin direction
of the issuance pipeline so the round-trip in the same direction must use
``encrypt_for_admin`` + ``IssuanceKeypair.decrypt``, which both bind the
``user->admin`` label.
This module performs the grant from the demo using the matched pair so the
encrypted payload decodes correctly under ``zkac-node p2p-listen``. The wire
format of the ``p2p_grant`` message is unchanged, only the encryption call.
We deliberately avoid importing ``cli/zkac_cli/*``; identity, admin and pin
material is read straight from the CLI's stable on-disk JSON formats via
``zkac_cli_adapter`` helpers.
"""
from __future__ import annotations
import base64
import json
import socket
import time
from pathlib import Path
import zkac
from zkac.tcp import FramedSession, client_handshake_anon
import file_share_client as fsc
import zkac_cli_adapter as cli
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _parse_server(server: str) -> tuple[str, int]:
host, sep, port_s = server.rpartition(":")
if not sep:
raise ValueError(f"invalid server {server!r}, expected host:port")
return (host or "127.0.0.1"), int(port_s, 10)
def _parse_contact_bundle(bundle: str) -> dict:
s = bundle.strip()
raw = base64.urlsafe_b64decode((s + "=" * (-len(s) % 4)).encode())
return json.loads(raw.decode("utf-8"))
def _resolve_server_pk_hex(userid: str, server: str) -> str:
"""Locate the user's pinned transport key for ``server`` on disk."""
server_pk_hex = cli.load_pinned_server_key(userid, server)
if not server_pk_hex:
raise FileNotFoundError(
f"no pinned server key for {server!r} under {userid!r}; "
f"run: zkac-node server pin {userid} {server} --key <hex>"
)
return server_pk_hex
def _fetch_registry_state(
userid: str,
server: str,
registry_id_hex: str,
admin_cred: zkac.Credential,
) -> tuple[bytes, bytes]:
"""Authenticated mgmt-channel ``get_registry`` (matches the CLI's mgmt protocol)."""
server_pk_hex = _resolve_server_pk_hex(userid, server)
transport_secret = bytes.fromhex(cli.load_identity_secrets(userid)["transport_secret_hex"])
host, port = _parse_server(server)
sock = socket.create_connection((host, port), timeout=8.0)
sock.settimeout(None)
try:
node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret))
server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex))
session = client_handshake_anon(sock, node, server_pk)
framed = FramedSession(sock, session)
framed.send(json.dumps({"op": "mgmt"}).encode())
transcript_hash = bytes(session.transcript_hash())
framed.send(json.dumps({
"cmd": "get_registry",
"registry_id": registry_id_hex,
"auth_registry_id": registry_id_hex,
"admin_proof_b64": _b64(bytes(admin_cred.present(transcript_hash))),
}).encode())
resp = json.loads(framed.recv())
if resp.get("error"):
raise RuntimeError(resp["error"])
return _unb64(resp["state_bytes_b64"]), _unb64(resp["state_cert_b64"])
finally:
sock.close()
# ── grant (sender side) ──────────────────────────────────────────────
def grant_role_p2p(
admin_userid: str,
server: str,
registry_id_hex: str,
role_name: str,
recipient_contact_bundle: str,
*,
bucket_role_grant: dict | None = None,
connect_timeout_s: float = 8.0,
) -> dict:
"""Issue a BBS+ role credential and ship it to the recipient over an authenticated TCP session.
Wire-compatible with ``zkac-node p2p-listen``: the ciphertext is produced via
``encrypt_for_admin`` so the listener's ``IssuanceKeypair.decrypt`` succeeds.
"""
parsed = _parse_contact_bundle(recipient_contact_bundle)
recipient_issuance_pk_hex = parsed["issuance_pk_hex"]
recipient_transport_pk_hex = parsed["transport_pk_hex"]
recipient_grant_token_b64 = parsed["grant_token_b64"]
peer = parsed.get("peer")
if not peer:
raise RuntimeError(
"contact bundle missing peer endpoint; ask recipient to regenerate "
"with `zkac-node user show <userid> --peer <host:port>`"
)
admin_data = cli.load_admin_material(admin_userid, registry_id_hex)
roles = admin_data.get("roles", [])
if role_name not in roles:
raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})")
role_epochs = admin_data.get("role_epochs", {})
if role_name not in role_epochs:
raise RuntimeError(f"missing epoch metadata for role {role_name!r}")
epoch = int(role_epochs[role_name])
bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(admin_data["bbs_issuer_secret_b64"]))
bbs_pk = bbs_issuer.public_key()
admin_cred = cli.load_admin_credential(admin_userid, registry_id_hex)
role_rid = zkac.role_id(role_name)
req = zkac.prepare_blind_request()
blind_sig = bbs_issuer.issue_blind(req.commitment_with_proof(), role_rid, epoch)
payload = json.dumps({
"registry_id": registry_id_hex,
"role_name": role_name,
"epoch": epoch,
"issuer_pk_b64": _b64(bytes(bbs_pk.to_bytes())),
"blind_sig_b64": _b64(bytes(blind_sig)),
"member_secret_b64": _b64(bytes(req.member_secret())),
"prover_blind_b64": _b64(bytes(req.prover_blind())),
"bucket_role_grant": bucket_role_grant or None,
}).encode()
rec_pk_bytes = bytes.fromhex(recipient_issuance_pk_hex)
if len(rec_pk_bytes) != 32:
raise RuntimeError("recipient issuance pubkey must decode to 32 bytes")
eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk_bytes, payload)
state_bytes, state_cert = _fetch_registry_state(
admin_userid, server, registry_id_hex, admin_cred,
)
sender_transport_secret = bytes.fromhex(
cli.load_identity_secrets(admin_userid)["transport_secret_hex"]
)
peer_host, peer_port = _parse_server(peer)
sock = socket.create_connection((peer_host, peer_port), timeout=connect_timeout_s)
sock.settimeout(None)
try:
node = zkac.Node(zkac.Keypair.from_secret_key(sender_transport_secret))
peer_pk = zkac.PublicKey.from_bytes(bytes.fromhex(recipient_transport_pk_hex))
session = client_handshake_anon(sock, node, peer_pk)
framed = FramedSession(sock, session)
ready = json.loads(framed.recv())
if ready.get("error") or ready.get("op") != "ready_for_grant":
raise RuntimeError(f"peer did not accept grant session: {ready}")
transcript_hash = bytes(session.transcript_hash())
admin_proof = bytes(admin_cred.present(transcript_hash))
framed.send(json.dumps({
"op": "p2p_grant",
"grant_token_b64": recipient_grant_token_b64,
"registry_id": registry_id_hex,
"registry_state_bytes_b64": _b64(state_bytes),
"registry_state_cert_b64": _b64(state_cert),
"role_name": role_name,
"eph_pk_b64": _b64(bytes(eph_pk)),
"ciphertext_b64": _b64(bytes(ciphertext)),
"admin_proof_b64": _b64(admin_proof),
}).encode())
ack = json.loads(framed.recv())
if ack.get("error"):
raise RuntimeError(ack["error"])
return {"status": ack.get("status", "ok"), "peer": peer}
finally:
sock.close()
# ── listen (recipient side) ──────────────────────────────────────────
def _save_credential_json(
userid: str,
registry_id_hex: str,
role_name: str,
payload: dict,
) -> Path:
"""Write the received credential to ``ZKAC_HOME/<userid>/credentials/<rid>_<role>.json``.
Mirrors the on-disk format used by ``zkac-node grant`` so subsequent
``zkac-node`` commands can pick up the credential normally.
"""
creds_dir = cli.zkac_home() / userid / "credentials"
creds_dir.mkdir(parents=True, exist_ok=True)
target = creds_dir / f"{registry_id_hex}_{role_name}.json"
target.write_text(json.dumps({
"blind_sig_b64": payload["blind_sig_b64"],
"member_secret_b64": payload["member_secret_b64"],
"prover_blind_b64": payload["prover_blind_b64"],
"role_name": payload["role_name"],
"epoch": payload["epoch"],
"issuer_pk_b64": payload["issuer_pk_b64"],
}, indent=2))
try:
target.chmod(0o600)
except OSError:
pass
return target
def listen_for_role_credential(
userid: str,
host: str,
port: int,
*,
timeout_s: float = 60.0,
) -> dict:
"""Block until one valid grant arrives, then save the credential locally.
Compatible with ``demo.file_share_credentials.grant_role_p2p`` and (because
the wire format matches) also useful for testing against the existing
``zkac-node grant`` listener API.
"""
from zkac.tcp import server_handshake_anon
secrets = cli.load_identity_secrets(userid)
transport_secret = bytes.fromhex(secrets["transport_secret_hex"])
issuance_secret = bytes.fromhex(secrets["issuance_secret_hex"])
expected_token_b64 = secrets["grant_token_b64"]
listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
listener.bind((host, port))
listener.listen(1)
deadline = time.monotonic() + timeout_s
receiver_kp = zkac.IssuanceKeypair.from_secret(issuance_secret)
try:
while True:
remaining = deadline - time.monotonic()
if remaining <= 0:
raise RuntimeError("timed out waiting for authenticated grant sender")
listener.settimeout(remaining)
conn, _addr = listener.accept()
try:
conn.settimeout(min(remaining, 30.0))
node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret))
session = server_handshake_anon(conn, node)
framed = FramedSession(conn, session)
framed.send(json.dumps({"ok": True, "op": "ready_for_grant"}).encode())
msg = json.loads(framed.recv())
if msg.get("error"):
raise RuntimeError(msg["error"])
if msg.get("op") != "p2p_grant":
raise RuntimeError("unexpected p2p message")
if msg.get("grant_token_b64") != expected_token_b64:
raise RuntimeError("grant pairing token mismatch")
registry_id_hex = msg["registry_id"]
role_name = msg["role_name"]
state_bytes = _unb64(msg["registry_state_bytes_b64"])
state_cert = _unb64(msg["registry_state_cert_b64"])
mgr = zkac.RegistryManager()
restored = mgr.restore(state_bytes, state_cert).hex()
if restored != registry_id_hex:
raise RuntimeError("registry snapshot does not match announced registry_id")
if not mgr.verify_admin(
bytes.fromhex(registry_id_hex),
_unb64(msg["admin_proof_b64"]),
bytes(session.transcript_hash()),
):
raise RuntimeError("sender admin proof failed")
payload = json.loads(receiver_kp.decrypt(
_unb64(msg["eph_pk_b64"]), _unb64(msg["ciphertext_b64"]),
))
if payload.get("registry_id") != registry_id_hex:
raise RuntimeError("payload registry_id mismatch")
if payload.get("role_name") != role_name:
raise RuntimeError("payload role mismatch")
cred = zkac.Credential.finalize(
_unb64(payload["blind_sig_b64"]),
_unb64(payload["member_secret_b64"]),
_unb64(payload["prover_blind_b64"]),
zkac.role_id(role_name),
int(payload["epoch"]),
zkac.BbsPublicKey.from_bytes(_unb64(payload["issuer_pk_b64"])),
)
nonce = bytes(session.transcript_hash())
if not mgr.verify_presentation(
bytes.fromhex(registry_id_hex),
zkac.role_id(role_name),
bytes(cred.present(nonce)),
nonce,
):
raise RuntimeError("granted credential does not verify")
_save_credential_json(userid, registry_id_hex, role_name, payload)
bucket_grant = payload.get("bucket_role_grant")
if isinstance(bucket_grant, dict):
bucket_id = bucket_grant.get("bucket_id")
eph_pk_b64 = bucket_grant.get("eph_pk_b64")
ciphertext_b64 = bucket_grant.get("ciphertext_b64")
if isinstance(bucket_id, str) and isinstance(eph_pk_b64, str) and isinstance(ciphertext_b64, str):
fsc.save_received_role_grant(
userid,
registry_id_hex,
role_name,
bucket_id,
eph_pk_b64=eph_pk_b64,
ciphertext_b64=ciphertext_b64,
)
framed.send(json.dumps({"ok": True, "status": "stored"}).encode())
return {"registry_id": registry_id_hex, "role": role_name}
finally:
conn.close()
finally:
listener.close()

815
demo/file_share_server.py Normal file
View File

@ -0,0 +1,815 @@
#!/usr/bin/env python3
"""
ZKAC opaque file-share server (demo).
Headless TCP service that combines:
* The same management-channel wire protocol as ``zkac-node serve`` so existing
``zkac-node registry create/get/update`` commands work unchanged.
* A new file-share channel that, after BBS+ role authentication, exposes
bucket primitives. The server only ever sees opaque ciphertext blobs and
encrypted role grants; file names, contents and per-role
visibility masks are never visible server-side.
Run with::
uv run python demo/file_share_server.py --host 127.0.0.1 --port 9879
The server transport public key is printed at start-up; pin it on each client
with ``zkac-node server pin <userid> 127.0.0.1:9879 --key <hex>``.
"""
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import os
import socket
import sys
import threading
import uuid
from pathlib import Path
from typing import Any
import zkac
from zkac.tcp import FramedSession, server_handshake_anon
# ── small helpers ────────────────────────────────────────────────────
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _chmod(path: Path, mode: int) -> None:
try:
os.chmod(path, mode)
except OSError:
pass
def _write_private_json(path: Path, payload: dict) -> None:
path.write_text(json.dumps(payload, indent=2))
_chmod(path, 0o600)
def _is_loopback(host: str) -> bool:
return host.strip().lower() in {"127.0.0.1", "::1", "localhost"}
def _safe_id(value: str) -> str:
"""Strict allowlist on identifiers used as filesystem names."""
if not isinstance(value, str) or not value:
raise ValueError("missing identifier")
if len(value) > 128 or any(c not in "0123456789abcdefABCDEF-" for c in value):
raise ValueError(f"invalid identifier {value!r}")
return value
def _privacy_safe_error_tag(exc: BaseException) -> str:
"""Return a coarse error label suitable for public service logs."""
return type(exc).__name__
# ── opaque on-disk store ─────────────────────────────────────────────
class _FileShareStore:
"""Persists registry snapshots and opaque bucket state under one data dir."""
def __init__(self, data_dir: Path) -> None:
self._dir = data_dir
self._reg_dir = data_dir / "registries"
self._buckets_dir = data_dir / "buckets"
self._privacy_key: bytes = b""
for d in (self._dir, self._reg_dir, self._buckets_dir):
d.mkdir(parents=True, exist_ok=True)
_chmod(d, 0o700)
self._lock = threading.Lock()
def set_privacy_key(self, key: bytes) -> None:
if not key:
raise RuntimeError("privacy key must not be empty")
self._privacy_key = bytes(key)
def _require_privacy_key(self) -> bytes:
if not self._privacy_key:
raise RuntimeError("privacy key not initialized")
return self._privacy_key
def _tag(self, kind: str, raw_id: str) -> str:
key = self._require_privacy_key()
raw = _safe_id(raw_id).encode("utf-8")
h = hashlib.sha256()
h.update(kind.encode("utf-8"))
h.update(b"\x00")
h.update(key)
h.update(b"\x00")
h.update(raw)
return h.hexdigest()
def _registry_tag(self, registry_id_hex: str) -> str:
return self._tag("registry", registry_id_hex)
def _role_tag(self, role_id_hex: str) -> str:
return self._tag("role", role_id_hex)
# transport key
def load_or_create_keypair(self) -> zkac.Keypair:
kf = self._dir / "server_key.json"
if kf.exists():
data = json.loads(kf.read_text())
return zkac.Keypair.from_secret_key(_unb64(data["secret_b64"]))
kp = zkac.Keypair()
_write_private_json(kf, {
"secret_b64": _b64(kp.secret_key_bytes()),
"public_b64": _b64(kp.public_key().to_bytes()),
})
return kp
# registries (mirrors zkac-node serve so the CLI works unchanged)
def save_registry(self, rid_hex: str, state_bytes: bytes, cert_bytes: bytes) -> None:
with self._lock:
(self._reg_dir / f"{rid_hex}.state").write_bytes(state_bytes)
(self._reg_dir / f"{rid_hex}.cert").write_bytes(cert_bytes)
def load_all_registries(self, mgr: zkac.RegistryManager) -> int:
n = 0
for p in sorted(self._reg_dir.glob("*.state")):
cert = self._reg_dir / f"{p.stem}.cert"
if not cert.exists():
continue
try:
mgr.restore(p.read_bytes(), cert.read_bytes())
n += 1
except Exception as exc:
print(f"[fs-server] skip registry {p.stem}: {exc}")
return n
def list_registry_ids(self) -> list[str]:
out: list[str] = []
for p in sorted(self._reg_dir.glob("*.state")):
cert = self._reg_dir / f"{p.stem}.cert"
if cert.exists():
out.append(p.stem)
return out
# buckets
def _bucket_dir(self, bucket_id: str) -> Path:
return self._buckets_dir / _safe_id(bucket_id)
def _bucket_meta_path(self, bucket_id: str) -> Path:
return self._bucket_dir(bucket_id) / "meta.json"
def _roles_index_path(self, registry_id_hex: str) -> Path:
return self._reg_dir / f"{_safe_id(registry_id_hex)}.roles.json"
def _remember_role_id(self, registry_id_hex: str, role_id_hex: str) -> None:
p = self._roles_index_path(registry_id_hex)
roles: set[str] = set()
if p.is_file():
try:
data = json.loads(p.read_text())
if isinstance(data, list):
roles = {_safe_id(str(v)) for v in data}
except Exception:
roles = set()
roles.add(_safe_id(role_id_hex))
_write_private_json(p, sorted(roles))
def bucket_create(self, bucket_id: str, owner_registry_id_hex: str) -> None:
bd = self._bucket_dir(bucket_id)
with self._lock:
if bd.exists():
raise RuntimeError("bucket already exists")
(bd / "blobs").mkdir(parents=True)
(bd / "role_grants").mkdir()
_chmod(bd, 0o700)
_write_private_json(self._bucket_meta_path(bucket_id), {
"bucket_id": bucket_id,
"owner_registry_tag": self._registry_tag(owner_registry_id_hex),
"finalized": False,
"role_acl": {}, # role_tag -> {"version": int, "allowed_blob_ids": [blob_id...]}
})
def bucket_meta(self, bucket_id: str) -> dict:
return json.loads(self._bucket_meta_path(bucket_id).read_text())
def bucket_is_finalized(self, bucket_id: str) -> bool:
return bool(self.bucket_meta(bucket_id).get("finalized", False))
def _bucket_require_owner(self, bucket_id: str, registry_id_hex: str) -> dict:
meta = self.bucket_meta(bucket_id)
expected_owner_tag = self._registry_tag(registry_id_hex)
if meta.get("owner_registry_tag") != expected_owner_tag:
raise RuntimeError("not bucket owner")
return meta
def bucket_set_finalized(self, bucket_id: str, registry_id_hex: str, finalized: bool) -> None:
with self._lock:
meta = self._bucket_require_owner(bucket_id, registry_id_hex)
meta["finalized"] = bool(finalized)
_write_private_json(self._bucket_meta_path(bucket_id), meta)
def bucket_delete(self, bucket_id: str, registry_id_hex: str) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
bd = self._bucket_dir(bucket_id)
for sub in ("blobs", "role_grants"):
d = bd / sub
if d.is_dir():
for p in d.rglob("*"):
if p.is_file():
try:
p.unlink()
except OSError:
pass
for p in sorted(d.rglob("*"), reverse=True):
if p.is_dir():
try:
p.rmdir()
except OSError:
pass
try:
d.rmdir()
except OSError:
pass
try:
self._bucket_meta_path(bucket_id).unlink()
except OSError:
pass
try:
bd.rmdir()
except OSError:
pass
def bucket_put_blob(
self,
bucket_id: str,
registry_id_hex: str,
blob_id: str,
ciphertext: bytes,
) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
(self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).write_bytes(ciphertext)
def bucket_set_role_acl(
self,
bucket_id: str,
registry_id_hex: str,
role_id_hex: str,
allowed_blob_ids: list[str],
) -> None:
with self._lock:
meta = self._bucket_require_owner(bucket_id, registry_id_hex)
role_acl = dict(meta.get("role_acl", {}))
key = self._role_tag(role_id_hex)
prev = role_acl.get(key, {})
prev_version = int(prev.get("version", 0)) if isinstance(prev, dict) else 0
role_acl[key] = {
"version": prev_version + 1,
"allowed_blob_ids": [_safe_id(b) for b in allowed_blob_ids],
}
meta["role_acl"] = role_acl
_write_private_json(self._bucket_meta_path(bucket_id), meta)
self._remember_role_id(registry_id_hex, role_id_hex)
def bucket_get_blob(self, bucket_id: str, blob_id: str) -> bytes:
return (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).read_bytes()
def bucket_blob_allowed_for_role(self, bucket_id: str, role_id_hex: str, blob_id: str) -> bool:
meta = self.bucket_meta(bucket_id)
role_acl = meta.get("role_acl", {})
role_meta = role_acl.get(self._role_tag(role_id_hex))
if not isinstance(role_meta, dict):
return False
allowed = role_meta.get("allowed_blob_ids")
if not isinstance(allowed, list):
return False
return _safe_id(blob_id) in {str(v) for v in allowed}
def bucket_role_acl(self, bucket_id: str, role_id_hex: str) -> dict:
meta = self.bucket_meta(bucket_id)
role_acl = meta.get("role_acl", {})
role_meta = role_acl.get(self._role_tag(role_id_hex), {})
if not isinstance(role_meta, dict):
return {"version": 0, "allowed_blob_ids": []}
version = int(role_meta.get("version", 0))
allowed = role_meta.get("allowed_blob_ids", [])
if not isinstance(allowed, list):
allowed = []
return {
"version": version,
"allowed_blob_ids": [_safe_id(str(v)) for v in allowed],
}
def buckets_for_role_in_registry(self, role_id_hex: str, registry_id_hex: str) -> list[str]:
"""Bucket ids where this authenticated role currently has non-empty ACL."""
role_id_hex = _safe_id(role_id_hex)
out: list[str] = []
for bd in sorted(self._buckets_dir.iterdir()):
bid = bd.name
try:
if not self.bucket_is_finalized(bid):
continue
if self.bucket_owner_registry_tag(bid) != self._registry_tag(registry_id_hex):
continue
acl = self.bucket_role_acl(bid, role_id_hex)
allowed = acl.get("allowed_blob_ids", [])
grants = self.bucket_get_role_grants(bid, role_id_hex)
current_acl_version = int(acl.get("version", -1))
has_fresh_grant = any(
isinstance(g.get("acl_version"), int) and g["acl_version"] == current_acl_version
for g in grants
)
if isinstance(allowed, list) and len(allowed) > 0 and has_fresh_grant:
out.append(bid)
except Exception:
continue
return out
def bucket_put_role_grant(
self,
bucket_id: str,
registry_id_hex: str,
role_id_hex: str,
acl_version: int,
eph_pk_b64: str,
ciphertext_b64: str,
) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
role_id_hex = _safe_id(role_id_hex)
role_tag = self._role_tag(role_id_hex)
payload = {
"bucket_id": bucket_id,
"role_tag": role_tag,
"acl_version": int(acl_version),
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
}
role_dir = self._bucket_dir(bucket_id) / "role_grants" / role_tag
role_dir.mkdir(parents=True, exist_ok=True)
target = role_dir / f"{uuid.uuid4().hex}.json"
_write_private_json(target, payload)
self._remember_role_id(registry_id_hex, role_id_hex)
def bucket_get_role_grants(self, bucket_id: str, role_id_hex: str) -> list[dict]:
role_dir = self._bucket_dir(bucket_id) / "role_grants" / self._role_tag(role_id_hex)
if not role_dir.is_dir():
return []
out: list[dict] = []
for p in sorted(role_dir.glob("*.json")):
try:
out.append(json.loads(p.read_text()))
except Exception:
continue
return out
def bucket_owner_registry_tag(self, bucket_id: str) -> str:
owner = self.bucket_meta(bucket_id).get("owner_registry_tag")
if not isinstance(owner, str) or not owner:
raise RuntimeError("bucket metadata missing owner_registry_tag")
return owner
def buckets_owned_by(self, registry_id_hex: str) -> list[str]:
out: list[str] = []
for bd in sorted(self._buckets_dir.iterdir()):
meta = bd / "meta.json"
if not meta.is_file():
continue
try:
owner_tag = self._registry_tag(registry_id_hex)
if json.loads(meta.read_text()).get("owner_registry_tag") == owner_tag:
out.append(bd.name)
except (OSError, json.JSONDecodeError):
continue
return out
def role_ids_for_registry(self, registry_id_hex: str) -> list[str]:
p = self._roles_index_path(registry_id_hex)
if not p.is_file():
return []
try:
data = json.loads(p.read_text())
if not isinstance(data, list):
return []
return sorted({_safe_id(str(v)) for v in data})
except Exception:
return []
# ── command dispatch (inside encrypted session) ──────────────────────
def _dispatch_mgmt(
cmd: dict,
mgr: zkac.RegistryManager,
store: _FileShareStore,
server_pk_b64: str,
transcript_hash: bytes,
) -> dict:
"""Registry management commands, wire-compatible with ``zkac-node`` CLI."""
try:
action = cmd.get("cmd")
rid_hex = cmd.get("auth_registry_id")
admin_proof_b64 = cmd.get("admin_proof_b64")
def _require_admin_for(target_rid_hex: str) -> None:
if rid_hex != target_rid_hex:
raise RuntimeError("auth_registry_id must match command registry_id")
if not isinstance(admin_proof_b64, str) or not admin_proof_b64:
raise RuntimeError("missing admin_proof_b64")
if not mgr.verify_admin(
bytes.fromhex(target_rid_hex),
_unb64(admin_proof_b64),
transcript_hash,
):
raise RuntimeError("admin authorization failed")
if action == "server_info":
return {"ok": True, "server_public_key_b64": server_pk_b64}
if action == "create_registry":
state_bytes = _unb64(cmd["state_bytes_b64"])
state_cert = _unb64(cmd["state_cert_b64"])
auth_rid = cmd.get("auth_registry_id")
if not isinstance(auth_rid, str):
raise RuntimeError("missing auth_registry_id")
if not isinstance(admin_proof_b64, str) or not admin_proof_b64:
raise RuntimeError("missing admin_proof_b64")
tmp_mgr = zkac.RegistryManager()
expected_rid = tmp_mgr.create(state_bytes, state_cert).hex()
if expected_rid != auth_rid:
raise RuntimeError("auth_registry_id does not match certified state")
if not tmp_mgr.verify_admin(
bytes.fromhex(expected_rid),
_unb64(admin_proof_b64),
transcript_hash,
):
raise RuntimeError("admin authorization failed for create_registry")
rid = mgr.create(state_bytes, state_cert)
store.save_registry(rid.hex(), state_bytes, state_cert)
return {"ok": True, "registry_id": rid.hex()}
if action == "get_registry":
rid_hex_cmd = cmd["registry_id"]
_require_admin_for(rid_hex_cmd)
rid = bytes.fromhex(rid_hex_cmd)
state_bytes, state_cert = mgr.get(rid)
return {
"ok": True,
"state_bytes_b64": _b64(state_bytes),
"state_cert_b64": _b64(state_cert),
}
if action == "update_registry":
rid_hex_cmd = cmd["registry_id"]
_require_admin_for(rid_hex_cmd)
rid = bytes.fromhex(rid_hex_cmd)
state_bytes = _unb64(cmd["state_bytes_b64"])
state_cert = _unb64(cmd["state_cert_b64"])
mgr.update(rid, state_bytes, state_cert)
store.save_registry(rid_hex_cmd, state_bytes, state_cert)
return {"ok": True}
return {"error": f"unknown command: {action}"}
except Exception as exc:
return {"error": str(exc)}
def _dispatch_fs(
cmd: dict,
store: _FileShareStore,
ctx: dict,
) -> dict:
"""File-share commands authenticated via opaque per-session authorization context."""
try:
action = cmd.get("cmd")
registry_id_hex: str = ctx["registry_id_hex"]
registry_tag: str = ctx["registry_tag"]
role_id_hex: str = ctx["role_id_hex"]
is_admin: bool = bool(ctx["is_admin"])
def _require_admin() -> None:
if not is_admin:
raise RuntimeError("admin role required for this command")
if action == "whoami":
return {
"ok": True,
"is_admin": is_admin,
"auth_scope": "admin" if is_admin else "credential",
}
if action == "bucket_create":
_require_admin()
bid = cmd.get("bucket_id") or uuid.uuid4().hex
store.bucket_create(bid, registry_id_hex)
return {"ok": True, "bucket_id": bid}
if action == "bucket_put_blob":
_require_admin()
bid = cmd["bucket_id"]
blob_id = cmd["blob_id"]
ciphertext = _unb64(cmd["ciphertext_b64"])
store.bucket_put_blob(bid, registry_id_hex, blob_id, ciphertext)
return {"ok": True}
if action == "bucket_set_role_acl":
_require_admin()
store.bucket_set_role_acl(
cmd["bucket_id"],
registry_id_hex,
cmd["role_id_hex"],
list(cmd.get("allowed_blob_ids", [])),
)
return {"ok": True}
if action == "bucket_put_role_grant":
_require_admin()
store.bucket_put_role_grant(
cmd["bucket_id"],
registry_id_hex,
cmd["role_id_hex"],
int(cmd["acl_version"]),
cmd["eph_pk_b64"],
cmd["ciphertext_b64"],
)
return {"ok": True}
if action == "bucket_get_role_acl":
_require_admin()
role_acl = store.bucket_role_acl(cmd["bucket_id"], cmd["role_id_hex"])
return {"ok": True, **role_acl}
if action == "bucket_finalize":
_require_admin()
store.bucket_set_finalized(cmd["bucket_id"], registry_id_hex, True)
return {"ok": True}
if action == "bucket_delete":
_require_admin()
store.bucket_delete(cmd["bucket_id"], registry_id_hex)
return {"ok": True}
if action == "bucket_list_owned":
_require_admin()
return {"ok": True, "bucket_ids": store.buckets_owned_by(registry_id_hex)}
if action == "fs_buckets":
return {
"ok": True,
"bucket_ids": store.buckets_for_role_in_registry(
role_id_hex,
registry_id_hex,
),
}
if action == "fs_get_role_grant":
bid = cmd["bucket_id"]
if store.bucket_owner_registry_tag(bid) != registry_tag:
raise RuntimeError("bucket does not belong to authenticated registry")
if not store.bucket_is_finalized(bid):
raise RuntimeError("bucket is not finalized")
current_acl = store.bucket_role_acl(bid, role_id_hex)
acl_version = int(current_acl.get("version", -1))
grants = [
g for g in store.bucket_get_role_grants(bid, role_id_hex)
if isinstance(g.get("acl_version"), int) and g["acl_version"] == acl_version
]
if not grants:
raise RuntimeError("permissions updated; request a fresh role grant")
return {"ok": True, "grants": grants}
if action == "fs_get_blob":
bid = cmd["bucket_id"]
blob_id = cmd["blob_id"]
if store.bucket_owner_registry_tag(bid) != registry_tag:
raise RuntimeError("bucket does not belong to authenticated registry")
if not store.bucket_is_finalized(bid):
raise RuntimeError("bucket is not finalized")
if not is_admin and not store.bucket_blob_allowed_for_role(bid, role_id_hex, blob_id):
raise RuntimeError("blob access denied by role mask")
data = store.bucket_get_blob(bid, blob_id)
return {"ok": True, "ciphertext_b64": _b64(data)}
return {"error": f"unknown command: {action}"}
except Exception as exc:
return {"error": str(exc)}
def _authenticate_fs_identity(
mgr: zkac.RegistryManager,
store: _FileShareStore,
proof: bytes,
transcript_hash: bytes,
) -> dict[str, object] | None:
"""Resolve opaque auth context from proof without client-supplied identifiers."""
registry_ids = store.list_registry_ids()
admin_matches: list[str] = []
role_matches: list[tuple[str, str]] = []
for rid_hex in registry_ids:
try:
rid = bytes.fromhex(rid_hex)
except ValueError:
continue
try:
if mgr.verify_admin(rid, proof, transcript_hash):
admin_matches.append(rid_hex)
except Exception:
pass
for role_id_hex in store.role_ids_for_registry(rid_hex):
try:
role_id = bytes.fromhex(role_id_hex)
except ValueError:
continue
if role_id == zkac.admin_role_id():
continue
try:
if mgr.verify_presentation(rid, role_id, proof, transcript_hash):
role_matches.append((rid_hex, role_id_hex))
except Exception:
continue
if len(admin_matches) == 1 and not role_matches:
rid_hex = admin_matches[0]
role_hex = zkac.admin_role_id().hex()
return {
"registry_id_hex": rid_hex,
"registry_tag": store._registry_tag(rid_hex),
"role_id_hex": role_hex,
"role_tag": store._role_tag(role_hex),
"is_admin": True,
}
if not admin_matches and len(role_matches) == 1:
rid_hex, role_hex = role_matches[0]
return {
"registry_id_hex": rid_hex,
"registry_tag": store._registry_tag(rid_hex),
"role_id_hex": role_hex,
"role_tag": store._role_tag(role_hex),
"is_admin": False,
}
# Ambiguous or invalid proof.
return None
# ── per-connection handler ────────────────────────────────────────────
def _handle_conn(
conn: socket.socket,
addr: tuple,
node: zkac.Node,
mgr: zkac.RegistryManager,
store: _FileShareStore,
server_pk_b64: str,
idle_timeout_s: float,
slots: threading.BoundedSemaphore,
) -> None:
try:
conn.settimeout(idle_timeout_s)
session = server_handshake_anon(conn, node)
framed = FramedSession(conn, session)
transcript_hash = bytes(session.transcript_hash())
hello = json.loads(framed.recv())
op = hello.get("op")
if op == "mgmt":
while True:
try:
cmd = json.loads(framed.recv())
except (ConnectionError, OSError):
break
resp = _dispatch_mgmt(cmd, mgr, store, server_pk_b64, transcript_hash)
framed.send(json.dumps(resp).encode())
return
if op == "fs":
try:
proof = _unb64(hello["bbs_auth_b64"])
except (KeyError, ValueError) as exc:
framed.send(json.dumps({"error": f"invalid fs hello: {exc}"}).encode())
return
auth = _authenticate_fs_identity(mgr, store, proof, transcript_hash)
if auth is None:
framed.send(json.dumps({"error": "auth failed"}).encode())
return
framed.send(json.dumps({"ok": True, "status": "authenticated"}).encode())
ctx = auth
while True:
try:
cmd = json.loads(framed.recv())
except (ConnectionError, OSError):
break
resp = _dispatch_fs(cmd, store, ctx)
framed.send(json.dumps(resp).encode())
return
framed.send(json.dumps({"error": f"unknown op: {op}"}).encode())
except (ConnectionError, BrokenPipeError, OSError):
pass
except Exception as exc:
# Privacy model: never emit client endpoint or request-linked payloads.
print(f"[fs-server] connection error ({_privacy_safe_error_tag(exc)})")
finally:
conn.close()
slots.release()
# ── entry point ──────────────────────────────────────────────────────
def serve(
data_dir: Path,
host: str = "127.0.0.1",
port: int = 9879,
*,
max_connections: int = 64,
idle_timeout_s: float = 60.0,
listen_backlog: int = 64,
allow_non_loopback: bool = False,
) -> None:
data_dir.mkdir(parents=True, exist_ok=True)
store = _FileShareStore(data_dir)
kp = store.load_or_create_keypair()
store.set_privacy_key(bytes(kp.secret_key_bytes()))
server_pk_b64 = _b64(kp.public_key().to_bytes())
pk_hex = kp.public_key().to_bytes().hex()
node = zkac.Node(kp)
mgr = zkac.RegistryManager()
n = store.load_all_registries(mgr)
print(f"[fs-server] data dir: {data_dir}")
print(f"[fs-server] server transport public key (pin OOB): {pk_hex}")
print(f"[fs-server] loaded {n} registries")
print(f"[fs-server] listening on {host}:{port}")
if not _is_loopback(host) and not allow_non_loopback:
raise RuntimeError(
"refusing to bind outside loopback. "
"Use --allow-non-loopback only when exposure is intentional."
)
if not _is_loopback(host):
print(f"[fs-server] warning: binding outside loopback: {host}:{port}", file=sys.stderr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
slots = threading.BoundedSemaphore(max_connections)
sock.listen(listen_backlog)
try:
while True:
conn, addr = sock.accept()
if not slots.acquire(blocking=False):
conn.close()
continue
threading.Thread(
target=_handle_conn,
args=(conn, addr, node, mgr, store, server_pk_b64, idle_timeout_s, slots),
daemon=True,
).start()
except KeyboardInterrupt:
print("\n[fs-server] shutdown")
finally:
sock.close()
def main() -> None:
p = argparse.ArgumentParser(description="ZKAC opaque file-share server (demo)")
p.add_argument("--host", default="127.0.0.1")
p.add_argument("--port", type=int, default=9879)
p.add_argument(
"--data-dir",
type=Path,
default=Path(__file__).resolve().parent / "fs_data",
help="server state (transport key, registries, opaque buckets)",
)
p.add_argument("--idle-timeout", type=float, default=60.0)
p.add_argument("--max-connections", type=int, default=64)
p.add_argument("--allow-non-loopback", action="store_true")
args = p.parse_args()
serve(
args.data_dir,
host=args.host,
port=args.port,
max_connections=args.max_connections,
idle_timeout_s=args.idle_timeout,
allow_non_loopback=args.allow_non_loopback,
)
if __name__ == "__main__":
main()

256
demo/file_share_smoke.py Normal file
View File

@ -0,0 +1,256 @@
#!/usr/bin/env python3
"""
End-to-end smoke test for the ZKAC file-share demo.
Exercises:
* admin (Alice) creates a registry on the file-share server,
* admin uploads a folder as an encrypted bucket with per-role visibility masks,
* Alice issues a ZKAC role credential to Bob via direct P2P grant,
* Alice uploads an anonymous role-grant envelope to the server,
* Bob authenticates to the file-share server with his role credential,
downloads, and decrypts the files his role can see,
* server opacity: every byte at rest in the bucket directory is ciphertext
(no plaintext file content survives on disk).
All ZKAC operations go through ``zkac-node`` (subprocess) and a temporary
``ZKAC_HOME`` so the test is hermetic.
Run::
uv run python demo/file_share_smoke.py
"""
from __future__ import annotations
import os
import shutil
import socket
import sys
import tempfile
import threading
import time
import traceback
from pathlib import Path
DEMO_DIR = Path(__file__).resolve().parent
sys.path.insert(0, str(DEMO_DIR))
import zkac # noqa: E402
import file_share_client as fsc # noqa: E402
import file_share_credentials as fscred # noqa: E402
import file_share_server as fss # noqa: E402
import zkac_cli_adapter as cli # noqa: E402
def _free_port() -> int:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
finally:
s.close()
def _wait_until(predicate, timeout_s: float = 5.0, interval_s: float = 0.05) -> bool:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
if predicate():
return True
time.sleep(interval_s)
return False
def _make_files(folder: Path) -> dict[str, bytes]:
folder.mkdir(parents=True, exist_ok=True)
contents = {
"alpha.txt": b"public-summary: " + os.urandom(2048),
"nested/beta.bin": os.urandom(200 * 1024), # > one chunk
"secret.md": b"# top-secret\n" + os.urandom(8192),
}
paths: dict[str, bytes] = {}
for rel, data in contents.items():
target = folder / rel
target.parent.mkdir(parents=True, exist_ok=True)
target.write_bytes(data)
paths[rel] = data
return paths
def _start_fs_server(host: str, port: int, data_dir: Path) -> threading.Thread:
t = threading.Thread(
target=fss.serve,
args=(data_dir,),
kwargs={"host": host, "port": port, "allow_non_loopback": False},
daemon=True,
)
t.start()
return t
def _server_transport_pubkey_hex(data_dir: Path) -> str:
import json
return zkac.PublicKey.from_bytes(
__import__("base64").b64decode(
json.loads((data_dir / "server_key.json").read_text())["public_b64"]
)
).to_bytes().hex()
def _open_admin_session(userid: str, server: str, server_pk_hex: str, registry_id: str) -> fsc.FileShareSession:
secrets = cli.load_identity_secrets(userid)
cred = cli.load_admin_credential(userid, registry_id)
return fsc.open_session(
server,
server_pk_hex=server_pk_hex,
user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]),
registry_id_hex=registry_id,
role_id=zkac.admin_role_id(),
credential=cred,
)
def _open_role_session(userid: str, server: str, server_pk_hex: str, registry_id: str, role_name: str) -> fsc.FileShareSession:
secrets = cli.load_identity_secrets(userid)
cred = cli.load_credential(userid, registry_id, role_name)
return fsc.open_session(
server,
server_pk_hex=server_pk_hex,
user_transport_secret=bytes.fromhex(secrets["transport_secret_hex"]),
registry_id_hex=registry_id,
role_id=zkac.role_id(role_name),
credential=cred,
)
def _scan_for_plaintext(haystack_root: Path, needles: list[bytes]) -> list[tuple[Path, int]]:
"""Brute-force grep for any ``needle`` bytes appearing in any file under ``haystack_root``."""
hits: list[tuple[Path, int]] = []
for p in haystack_root.rglob("*"):
if not p.is_file():
continue
data = p.read_bytes()
for n in needles:
if len(n) >= 64 and n in data:
hits.append((p, len(n)))
break
return hits
def main() -> int:
tmp = Path(tempfile.mkdtemp(prefix="zkac-fs-smoke-"))
print(f"[smoke] temp dir: {tmp}")
home = tmp / "zkac_home"
fs_data = tmp / "fs_data"
src = tmp / "src_folder"
out = tmp / "downloads"
home.mkdir()
fs_data.mkdir()
os.environ["ZKAC_HOME"] = str(home)
contents = _make_files(src)
print(f"[smoke] source folder: {src} ({len(contents)} files)")
port = _free_port()
server_addr = f"127.0.0.1:{port}"
_start_fs_server("127.0.0.1", port, fs_data)
if not _wait_until(lambda: (fs_data / "server_key.json").is_file()):
raise RuntimeError("server did not start")
if not _wait_until(lambda: socket.create_connection(("127.0.0.1", port), timeout=0.2)):
raise RuntimeError("server port not ready")
server_pk_hex = _server_transport_pubkey_hex(fs_data)
print(f"[smoke] file-share server ready @ {server_addr}, pk={server_pk_hex[:16]}")
cli.run_cli(["user", "create", "alice"]).raise_for_status()
cli.run_cli(["user", "create", "bob"]).raise_for_status()
cli.server_pin("alice", server_addr, server_pk_hex).raise_for_status()
cli.server_pin("bob", server_addr, server_pk_hex).raise_for_status()
print("[smoke] alice + bob created and pinned server")
rid = cli.registry_create("alice", server_addr, ["viewer", "editor"])
print(f"[smoke] registry created: {rid}")
# --- direct P2P grant: bob listens, alice grants ----------------------
listener_port = _free_port()
listener = cli.P2PListener("bob", host="127.0.0.1", port=listener_port, timeout_s=30.0)
listener.start()
if not _wait_until(lambda: listener.is_running()):
raise RuntimeError("bob listener did not start")
bob_contact = cli.show_user_contact("bob", peer=f"127.0.0.1:{listener_port}")
print(f"[smoke] bob listening on 127.0.0.1:{listener_port}")
# --- alice uploads bucket + sets masks --------------------------------
files_in_order = fsc.flatten_folder(src)
rel_paths = [str(p.relative_to(src.resolve())) for p in files_in_order]
print(f"[smoke] flattened: {rel_paths}")
# mask: viewer sees only first file (alpha.txt), editor sees everything.
viewer_mask = "1" + "0" * (len(files_in_order) - 1)
editor_mask = "1" * len(files_in_order)
bob_secrets = cli.load_identity_secrets("bob")
with _open_admin_session("alice", server_addr, server_pk_hex, rid) as sess:
manifest = fsc.upload_bucket(
sess, src, server=server_addr, registry_id_hex=rid,
)
manifest.role_masks = {"viewer": viewer_mask, "editor": editor_mask}
fsc.apply_role_masks_to_server(sess, manifest)
fsc.push_role_grant(sess, manifest, "viewer", bob_secrets["issuance_pk_hex"])
fsc.save_manifest("alice", manifest)
print(f"[smoke] uploaded bucket {manifest.bucket_id} with role ACLs + anonymous grant envelope")
try:
fscred.grant_role_p2p(
"alice",
server_addr,
rid,
"viewer",
bob_contact,
)
except RuntimeError as exc:
listener.stop()
print(f"[smoke] grant failed: {exc}\n[smoke] bob listener output:\n{listener.output()}")
raise
if not _wait_until(lambda: not listener.is_running(), timeout_s=10.0):
listener.stop()
raise RuntimeError(f"listener never exited after grant; output:\n{listener.output()}")
received = listener.parse_received()
print(f"[smoke] bob received: {received}")
assert received and received["role"] == "viewer"
assert received["registry_id"] == rid
# --- bob downloads using his viewer credential ------------------------
with _open_role_session("bob", server_addr, server_pk_hex, rid, "viewer") as sess:
accessible = sess.fs_buckets()
assert accessible == [manifest.bucket_id], accessible
result = fsc.download_bucket(
sess, manifest.bucket_id,
issuance_secret_hex=bob_secrets["issuance_secret_hex"],
output_dir=out / manifest.bucket_id,
)
print(f"[smoke] bob downloaded files: {result['files_written']}")
# --- assertions -------------------------------------------------------
expected_visible = {rel_paths[0]} # only first file
actual = {str(Path(p).relative_to(out / manifest.bucket_id)) for p in result["files_written"]}
assert actual == expected_visible, f"visibility mismatch: {actual} vs {expected_visible}"
decrypted = (out / manifest.bucket_id / rel_paths[0]).read_bytes()
assert decrypted == contents[rel_paths[0]], "decrypted content does not match plaintext"
# opacity: no plaintext file content lives on disk under fs_data
plaintext_needles = list(contents.values())
leaks = _scan_for_plaintext(fs_data / "buckets", plaintext_needles)
assert not leaks, f"plaintext leaked into server storage: {leaks}"
print("[smoke] OK: visibility enforced, content matches, server storage opaque")
print(f"[smoke] cleanup {tmp}")
shutil.rmtree(tmp, ignore_errors=True)
return 0
if __name__ == "__main__":
try:
sys.exit(main())
except Exception:
traceback.print_exc()
sys.exit(1)

1003
demo/file_share_tui.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,192 @@
import json
import sys
import threading
from pathlib import Path
import zkac
DEMO_DIR = Path(__file__).resolve().parent
if str(DEMO_DIR) not in sys.path:
sys.path.insert(0, str(DEMO_DIR))
import file_share_client as fsc # noqa: E402
import file_share_server as fss # noqa: E402
def _make_credential() -> tuple[bytes, zkac.Credential]:
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
role_id = zkac.role_id("viewer")
req = zkac.prepare_blind_request()
blind_sig = issuer.issue_blind(req.commitment_with_proof(), role_id, 1)
cred = zkac.Credential.finalize(
blind_sig,
req.member_secret(),
req.prover_blind(),
role_id,
1,
pk,
)
return role_id, cred
def test_open_session_fs_hello_contains_only_proof(monkeypatch):
role_id, credential = _make_credential()
sent_payloads: list[dict] = []
class _FakeSocket:
def settimeout(self, _value):
return None
def close(self):
return None
class _FakeHandshakeSession:
def transcript_hash(self) -> bytes:
return b"\x11" * 32
class _FakeFramedSession:
def __init__(self, _sock, _session):
pass
def send(self, payload: bytes) -> None:
sent_payloads.append(json.loads(payload.decode("utf-8")))
def recv(self) -> bytes:
return b'{"ok": true, "status": "authenticated"}'
monkeypatch.setattr(fsc.socket, "create_connection", lambda *_args, **_kwargs: _FakeSocket())
monkeypatch.setattr(fsc, "client_handshake_anon", lambda *_args, **_kwargs: _FakeHandshakeSession())
monkeypatch.setattr(fsc, "FramedSession", _FakeFramedSession)
sess = fsc.open_session(
"127.0.0.1:9879",
server_pk_hex=zkac.Keypair().public_key().to_bytes().hex(),
user_transport_secret=bytes(zkac.Keypair().secret_key_bytes()),
registry_id_hex="00" * 32,
role_id=role_id,
credential=credential,
)
sess.close()
assert len(sent_payloads) == 1
hello = sent_payloads[0]
assert set(hello.keys()) == {"op", "bbs_auth_b64"}
assert hello["op"] == "fs"
assert isinstance(hello["bbs_auth_b64"], str) and hello["bbs_auth_b64"]
def test_handle_conn_error_log_never_includes_peer_endpoint(monkeypatch, tmp_path, capsys):
class _DummyConn:
def settimeout(self, _value):
return None
def close(self):
return None
def _raise_handshake(_conn, _node):
raise RuntimeError("boom")
monkeypatch.setattr(fss, "server_handshake_anon", _raise_handshake)
slots = threading.BoundedSemaphore(1)
assert slots.acquire(blocking=False)
fss._handle_conn(
_DummyConn(),
("203.0.113.44", 4242),
zkac.Node(zkac.Keypair()),
zkac.RegistryManager(),
fss._FileShareStore(tmp_path),
"",
5.0,
slots,
)
out = capsys.readouterr().out
assert "[fs-server] connection error (RuntimeError)" in out
assert "203.0.113.44" not in out
assert ":4242" not in out
def test_bucket_metadata_uses_opaque_tags(tmp_path):
store = fss._FileShareStore(tmp_path)
store.set_privacy_key(b"privacy-key-32-bytes-minimum-seed")
registry_id = "aa" * 32
role_id = "bb" * 32
bucket_id = "cc" * 16
blob_id = "dd" * 16
store.bucket_create(bucket_id, registry_id)
meta = store.bucket_meta(bucket_id)
assert "owner_registry_id" not in meta
assert isinstance(meta.get("owner_registry_tag"), str)
assert len(meta["owner_registry_tag"]) == 64
store.bucket_set_role_acl(bucket_id, registry_id, role_id, [blob_id])
meta2 = store.bucket_meta(bucket_id)
acl_keys = list(meta2.get("role_acl", {}).keys())
assert acl_keys and acl_keys[0] != role_id
assert all(len(k) == 64 for k in acl_keys)
store.bucket_put_role_grant(bucket_id, registry_id, role_id, 1, "eph", "ct")
grants_root = tmp_path / "buckets" / bucket_id / "role_grants"
assert (grants_root / role_id).exists() is False
role_dirs = [p.name for p in grants_root.iterdir() if p.is_dir()]
assert role_dirs and all(len(d) == 64 for d in role_dirs)
def test_auth_scan_does_not_return_early():
class _Mgr:
def __init__(self):
self.admin_checks: list[str] = []
self.role_checks: list[tuple[str, str]] = []
def verify_admin(self, rid: bytes, _proof: bytes, _th: bytes) -> bool:
self.admin_checks.append(rid.hex())
return rid.hex() == ("11" * 32)
def verify_presentation(self, rid: bytes, role_id: bytes, _proof: bytes, _th: bytes) -> bool:
self.role_checks.append((rid.hex(), role_id.hex()))
return False
class _Store:
def list_registry_ids(self):
return ["11" * 32, "22" * 32]
def role_ids_for_registry(self, rid_hex: str):
if rid_hex == "11" * 32:
return ["33" * 32]
return ["44" * 32]
def _registry_tag(self, rid_hex: str):
return "r-" + rid_hex[:8]
def _role_tag(self, role_hex: str):
return "k-" + role_hex[:8]
mgr = _Mgr()
auth = fss._authenticate_fs_identity(mgr, _Store(), b"proof", b"nonce")
assert auth is not None and auth["is_admin"] is True
# Both registries must be checked even though first one matched admin.
assert mgr.admin_checks == ["11" * 32, "22" * 32]
# Role checks also run for all known role candidates.
assert mgr.role_checks == [("11" * 32, "33" * 32), ("22" * 32, "44" * 32)]
def test_dispatch_whoami_does_not_expose_registry_or_role():
resp = fss._dispatch_fs(
{"cmd": "whoami"},
store=None, # not used by whoami branch
ctx={
"registry_id_hex": "11" * 32,
"registry_tag": "r-tag",
"role_id_hex": "22" * 32,
"role_tag": "k-tag",
"is_admin": False,
},
)
assert resp["ok"] is True
assert "registry_id" not in resp
assert "role_id" not in resp
assert resp["auth_scope"] == "credential"

View File

@ -1,322 +0,0 @@
#!/usr/bin/env python3
"""
HTTP admin debug dashboard for ``zkac-node serve``.
Runs the ZKAC TCP node in a background thread and serves a read-only web UI on
another port. Intended for loopback + I2P server-tunnel forwarding.
**Security:** This page is fully transparent (registry metadata, live sessions,
public keys). Do not expose it to untrusted networks without tunnel ACLs.
Usage::
uv sync --extra demo
uv run python demo/zkac_admin_serve.py alice \\
--node-host 127.0.0.1 --node-port 9800 \\
--web-host 127.0.0.1 --web-port 8766
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import threading
import time
from pathlib import Path
from flask import Flask, Response, jsonify, render_template_string, request
from zkac_cli.paths import user_dir
from zkac_cli.server import serve
from zkac_cli.server_debug import (
ServerDebugState,
collect_registry_debug,
data_dir_tree,
server_key_meta,
)
_PAGE = """<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8"/>
<meta name="viewport" content="width=device-width, initial-scale=1"/>
{% if refresh_s %}
<meta http-equiv="refresh" content="{{ refresh_s }}"/>
{% endif %}
<title>ZKAC node admin debug</title>
<style>
:root {
--bg: #0f1419;
--panel: #1a2332;
--border: #2d3d52;
--text: #e7ecf3;
--muted: #8b9cb3;
--accent: #5b9bd5;
--ok: #6cbf6c;
--warn: #e6b35a;
}
* { box-sizing: border-box; }
body {
font-family: ui-sans-serif, system-ui, "Segoe UI", Roboto, sans-serif;
background: var(--bg);
color: var(--text);
margin: 0;
padding: 1.25rem 1.5rem 2rem;
line-height: 1.45;
}
h1 { font-size: 1.35rem; font-weight: 600; margin: 0 0 0.25rem; }
.sub { color: var(--muted); font-size: 0.9rem; margin-bottom: 1.25rem; }
.grid { display: grid; gap: 1rem; grid-template-columns: repeat(auto-fit, minmax(320px, 1fr)); }
section {
background: var(--panel);
border: 1px solid var(--border);
border-radius: 10px;
padding: 1rem 1.1rem;
}
section h2 {
margin: 0 0 0.65rem;
font-size: 1rem;
font-weight: 600;
color: var(--accent);
}
pre, .mono {
font-family: ui-monospace, "Cascadia Code", "SF Mono", Menlo, monospace;
font-size: 0.78rem;
white-space: pre-wrap;
word-break: break-all;
background: #0c1017;
border-radius: 6px;
padding: 0.65rem 0.75rem;
margin: 0;
border: 1px solid var(--border);
}
table { width: 100%; border-collapse: collapse; font-size: 0.82rem; }
th, td { text-align: left; padding: 0.35rem 0.5rem; border-bottom: 1px solid var(--border); }
th { color: var(--muted); font-weight: 500; }
.pill { display: inline-block; padding: 0.12rem 0.45rem; border-radius: 999px; font-size: 0.72rem; }
.pill-live { background: #1e3d2a; color: var(--ok); }
.pill-warn { background: #3d3518; color: var(--warn); }
a { color: var(--accent); }
.links { margin-top: 1rem; font-size: 0.88rem; color: var(--muted); }
</style>
</head>
<body>
<h1>ZKAC node admin debug</h1>
<p class="sub">
User <strong>{{ snap.userid }}</strong> · TCP
<span class="mono">{{ snap.listen.host }}:{{ snap.listen.port }}</span>
· uptime {{ snap.uptime_s }}s
{% if refresh_s %}
· auto-refresh {{ refresh_s }}s
{% endif %}
</p>
<div class="grid">
<section>
<h2>Status</h2>
<table>
<tr><th>Data directory</th><td class="mono">{{ snap.data_dir }}</td></tr>
<tr><th>Started (wall)</th><td class="mono">{{ snap.started_wall }}</td></tr>
<tr><th>Server public key</th><td class="mono">{{ snap.server_public_key_hex or "" }}</td></tr>
<tr><th>Registries (boot)</th><td>{{ snap.registries_loaded_boot }}</td></tr>
<tr><th>Active TCP sessions</th><td>
<span class="pill pill-live">{{ snap.active_connection_count }} live</span>
</td></tr>
<tr><th>Process</th><td class="mono">pid={{ proc.pid }} threads={{ proc.threads }}</td></tr>
</table>
</section>
<section>
<h2>Server key file</h2>
<pre>{{ sk_meta | tojson(indent=2) }}</pre>
</section>
<section style="grid-column: 1 / -1;">
<h2>Registries (on disk + parsed)</h2>
{% if reg %}
<table>
<tr>
<th>ID (file)</th>
<th>version</th>
<th>state hash</th>
<th>bytes (state / cert)</th>
<th>parse</th>
</tr>
{% for r in reg %}
<tr>
<td class="mono">{{ r.file_registry_id_hex[:16] }}</td>
<td>{{ r.get("version", "") }}</td>
<td class="mono">{% if r.get("state_hash_hex") %}{{ r.state_hash_hex[:24] }}{% else %}{% endif %}</td>
<td>{{ r.state_bytes }} / {{ r.cert_bytes }}</td>
<td>{% if r.parsed_ok %}<span class="pill pill-live">ok</span>{% else %}<span class="pill pill-warn">fail</span>{% endif %}</td>
</tr>
{% endfor %}
</table>
{% else %}
<p class="muted">No registry state files yet.</p>
{% endif %}
</section>
<section style="grid-column: 1 / -1;">
<h2>Session connections (live)</h2>
{% if snap.active_connections %}
<table>
<tr>
<th>id</th><th>peer</th><th>phase</th><th>op</th><th>transcript</th>
<th>auth registry</th><th>role</th><th>mgmt #</th><th>echo bytes</th>
</tr>
{% for c in snap.active_connections %}
<tr>
<td class="mono">{{ c.id }}</td>
<td class="mono">{{ c.peer }}</td>
<td>{{ c.phase }}</td>
<td>{{ c.hello_op or "" }}</td>
<td class="mono">{% if c.transcript_hash_hex %}{{ c.transcript_hash_hex[:20] }}{% else %}{% endif %}</td>
<td class="mono">{% if c.auth_registry_hex %}{{ c.auth_registry_hex[:16] }}{% else %}{% endif %}</td>
<td class="mono">{% if c.auth_role_hex %}{{ c.auth_role_hex[:16] }}{% else %}{% endif %}</td>
<td>{{ c.mgmt_commands or 0 }}</td>
<td>{{ c.bytes_echoed or 0 }}</td>
</tr>
{% endfor %}
</table>
{% else %}
<p class="muted">No active connections.</p>
{% endif %}
</section>
<section style="grid-column: 1 / -1;">
<h2>Recent connections</h2>
{% if snap.recent_connections %}
<table>
<tr>
<th>id</th><th>peer</th><th>phase</th><th>mgmt #</th><th>echo bytes</th><th>error</th>
</tr>
{% for c in snap.recent_connections %}
<tr>
<td class="mono">{{ c.id }}</td>
<td class="mono">{{ c.peer }}</td>
<td>{{ c.phase }}</td>
<td>{{ c.mgmt_commands or 0 }}</td>
<td>{{ c.bytes_echoed or 0 }}</td>
<td class="mono">{{ c.error or "" }}</td>
</tr>
{% endfor %}
</table>
{% else %}
<p class="muted">No recent disconnects recorded.</p>
{% endif %}
</section>
<section style="grid-column: 1 / -1;">
<h2>Data directory tree (debug)</h2>
<pre>{{ files | tojson(indent=2) }}</pre>
</section>
<section style="grid-column: 1 / -1;">
<h2>Full debug JSON</h2>
<p class="sub" style="margin-top:0">Machine-readable snapshot (same as <a href="/api/debug.json">/api/debug.json</a>).</p>
<pre>{{ full_json }}</pre>
</section>
</div>
<p class="links">
<a href="?refresh=3">Auto-refresh 3s</a> ·
<a href="?refresh=5">5s</a> ·
<a href="?">static</a> ·
<a href="/api/debug.json">debug.json</a>
</p>
</body>
</html>
"""
def _full_payload(debug: ServerDebugState, data_dir: Path) -> dict:
snap = debug.snapshot()
reg = collect_registry_debug(data_dir)
sk = server_key_meta(data_dir)
files = data_dir_tree(data_dir)
return {
"snapshot": snap,
"server_key_file": sk,
"registries": reg,
"data_dir_files": files,
"process": {"pid": os.getpid(), "threads": threading.active_count()},
"generated_wall": time.time(),
}
def create_app(debug: ServerDebugState, data_dir: Path) -> Flask:
app = Flask(__name__)
@app.get("/")
def index():
refresh = request.args.get("refresh", "").strip()
refresh_s = int(refresh) if refresh.isdigit() and 1 <= int(refresh) <= 60 else None
data = _full_payload(debug, data_dir)
snap = data["snapshot"]
proc = data["process"]
full_json = json.dumps(data, indent=2, sort_keys=True)
return render_template_string(
_PAGE,
snap=snap,
reg=data["registries"],
sk_meta=data["server_key_file"],
files=data["data_dir_files"],
full_json=full_json,
proc=proc,
refresh_s=refresh_s,
)
@app.get("/api/debug.json")
def api_debug():
return jsonify(_full_payload(debug, data_dir))
@app.get("/healthz")
def healthz():
return Response("ok", mimetype="text/plain")
return app
def main() -> None:
p = argparse.ArgumentParser(description="ZKAC node TCP + HTTP admin debug dashboard")
p.add_argument("userid", help="user whose ~/.zkac/<userid>/server/ holds node state")
p.add_argument("--data-dir", default=None, help="override server data directory")
p.add_argument("--node-host", default="127.0.0.1")
p.add_argument("--node-port", type=int, default=9800)
p.add_argument("--web-host", default="127.0.0.1")
p.add_argument("--web-port", type=int, default=8766)
args = p.parse_args()
data_dir = Path(args.data_dir) if args.data_dir else user_dir(args.userid) / "server"
data_dir = data_dir.resolve()
debug = ServerDebugState(userid=args.userid, data_dir=str(data_dir))
t = threading.Thread(
target=serve,
kwargs={
"data_dir": str(data_dir),
"host": args.node_host,
"port": args.node_port,
"debug": debug,
},
name="zkac-serve",
daemon=True,
)
t.start()
time.sleep(0.15)
if not t.is_alive():
print("ZKAC node thread died on startup; check stderr above.", file=sys.stderr)
sys.exit(1)
app = create_app(debug, data_dir)
print(f"ZKAC TCP node: {args.node_host}:{args.node_port} (data {data_dir})")
print(f"Admin debug UI: http://{args.web_host}:{args.web_port}/")
print("Warning: admin UI exposes live sessions and registry metadata.", file=sys.stderr)
app.run(host=args.web_host, port=args.web_port, debug=False, threaded=True)
if __name__ == "__main__":
main()

420
demo/zkac_cli_adapter.py Normal file
View File

@ -0,0 +1,420 @@
"""
Subprocess adapter around the existing ``zkac-node`` CLI.
The demo deliberately does not import ``cli/zkac_cli/*``: every interaction
with identity, registry, grant or p2p-listen flows runs through the CLI as a
black box, exactly the way a third-party application would integrate.
The only direct filesystem touch is reading the CLI-managed ``identity.json``
under ``ZKAC_HOME`` (default ``~/.ZKAC-FS/<userid>/`` for this demo). That file is the CLI's
public on-disk format and is needed locally to obtain the user's transport and
issuance secrets for the file-share session and role-grant decryption.
"""
from __future__ import annotations
import base64
import hashlib
import json
import os
import re
import shutil
import subprocess
import sys
import threading
from dataclasses import dataclass
from pathlib import Path
import zkac
ROOT = Path(__file__).resolve().parents[1]
CLI_DIR = ROOT / "cli"
DEFAULT_TIMEOUT_S = 60.0
DEFAULT_LISTEN_TIMEOUT_S = 600.0
DEFAULT_DEMO_ZKAC_HOME = Path.home() / ".ZKAC-FS"
# ── helpers ──────────────────────────────────────────────────────────
def _b64_decode(s: str) -> bytes:
return base64.b64decode(s)
def zkac_home() -> Path:
return Path(os.environ.get("ZKAC_HOME", DEFAULT_DEMO_ZKAC_HOME))
def user_identity_path(userid: str) -> Path:
return zkac_home() / userid / "identity.json"
def user_exists(userid: str) -> bool:
return user_identity_path(userid).is_file()
def list_local_users() -> list[str]:
home = zkac_home()
if not home.is_dir():
return []
return sorted(d.name for d in home.iterdir() if (d / "identity.json").is_file())
def _zkac_invocation() -> tuple[list[str], dict[str, str]]:
"""Resolve a runnable ``zkac-node`` command, falling back to the source tree."""
exe = shutil.which("zkac-node")
if exe:
return [exe], {}
return [sys.executable, "-m", "zkac_cli.main"], {"PYTHONPATH": str(CLI_DIR)}
@dataclass
class CliResult:
rc: int
stdout: str
stderr: str
def ok(self) -> bool:
return self.rc == 0
def raise_for_status(self) -> "CliResult":
if self.rc != 0:
raise RuntimeError(
f"zkac-node failed (rc={self.rc}): {self.stderr.strip() or self.stdout.strip()}"
)
return self
def run_cli(
args: list[str],
*,
timeout_s: float = DEFAULT_TIMEOUT_S,
extra_env: dict[str, str] | None = None,
) -> CliResult:
prefix, env_overrides = _zkac_invocation()
env = {
**os.environ,
**env_overrides,
"ZKAC_HOME": str(zkac_home()),
**(extra_env or {}),
}
try:
p = subprocess.run(
prefix + args,
capture_output=True,
text=True,
timeout=timeout_s,
cwd=str(ROOT),
env=env,
)
except subprocess.TimeoutExpired as exc:
return CliResult(rc=124, stdout=exc.stdout or "", stderr=f"timed out after {timeout_s}s")
return CliResult(rc=p.returncode, stdout=p.stdout, stderr=p.stderr)
# ── identity ─────────────────────────────────────────────────────────
@dataclass
class Identity:
userid: str
issuance_pk_hex: str
issuance_secret_hex: str
transport_pk_hex: str
transport_secret_hex: str
grant_token_b64: str
contact_bundle: str # value of ``zkac-node user show <userid> --peer ...``
def load_identity_secrets(userid: str) -> dict[str, str]:
"""Read the CLI-managed ``identity.json`` (creator: ``zkac-node user create``)."""
path = user_identity_path(userid)
if not path.is_file():
raise FileNotFoundError(f"unknown user {userid!r} (run: zkac-node user create {userid})")
data = json.loads(path.read_text())
def hexed(b64_field: str) -> str:
return _b64_decode(data[b64_field]).hex()
return {
"issuance_pk_hex": hexed("issuance_public_b64"),
"issuance_secret_hex": hexed("issuance_secret_b64"),
"transport_pk_hex": hexed("transport_public_b64"),
"transport_secret_hex": hexed("transport_secret_b64"),
"grant_token_b64": data["grant_token_b64"],
}
def create_user(userid: str) -> CliResult:
return run_cli(["user", "create", userid])
def show_user_contact(userid: str, peer: str | None = None) -> str:
args = ["user", "show", userid]
if peer:
args += ["--peer", peer]
res = run_cli(args).raise_for_status()
for line in res.stdout.splitlines():
line = line.strip()
# contact bundle is the indented blob after "share contact:"
if line and not line.startswith(("user:", "share contact:", "issuance pk:", "p2p transport pk:", "(", "registries owned:", "credentials:", "contact peer endpoint:", "owner admin")):
if re.fullmatch(r"[A-Za-z0-9_\-]+", line):
return line
raise RuntimeError("could not parse contact bundle from `zkac-node user show`")
def get_identity(userid: str, peer: str | None = None) -> Identity:
secrets = load_identity_secrets(userid)
contact = show_user_contact(userid, peer=peer)
return Identity(
userid=userid,
issuance_pk_hex=secrets["issuance_pk_hex"],
issuance_secret_hex=secrets["issuance_secret_hex"],
transport_pk_hex=secrets["transport_pk_hex"],
transport_secret_hex=secrets["transport_secret_hex"],
grant_token_b64=secrets["grant_token_b64"],
contact_bundle=contact,
)
# ── registry / pinning / grants ──────────────────────────────────────
def server_pin(userid: str, server: str, server_pk_hex: str) -> CliResult:
return run_cli(["server", "pin", userid, server, "--key", server_pk_hex])
def load_pinned_server_key(userid: str, server: str) -> str | None:
digest = hashlib.sha256(server.encode("utf-8")).hexdigest()
pin_file = zkac_home() / userid / "servers" / f"sha256_{digest}.json"
if not pin_file.is_file():
return None
data = json.loads(pin_file.read_text())
server_pk_b64 = data.get("server_public_key_b64")
if not isinstance(server_pk_b64, str):
return None
return _b64_decode(server_pk_b64).hex()
def registry_create(userid: str, server: str, roles: list[str]) -> str:
res = run_cli(["registry", "create", userid, server, "--roles", ",".join(roles)]).raise_for_status()
for line in res.stdout.splitlines():
line = line.strip()
if line.startswith("registry created:"):
return line.split(":", 1)[1].strip()
raise RuntimeError(f"could not parse registry id from output:\n{res.stdout}")
def registry_list(userid: str) -> list[dict]:
res = run_cli(["registry", "list", userid]).raise_for_status()
out: list[dict] = []
for line in res.stdout.splitlines():
s = line.strip()
m = re.match(
r"^([0-9a-fA-F]+)\s+@\s+(\S+)\s+roles=\[(.*)\]\s*$",
s,
)
if m:
roles_raw = m.group(3)
roles = [r.strip().strip("'\"") for r in roles_raw.split(",") if r.strip()]
out.append({"registry_id": m.group(1), "server": m.group(2), "roles": roles})
return out
def registry_add_roles(userid: str, server: str, registry_id: str, add_roles: list[str]) -> CliResult:
return run_cli([
"registry", "update", userid, server,
"--registry", registry_id,
"--add-roles", ",".join(add_roles),
]).raise_for_status()
def credentials_list(userid: str) -> list[dict]:
"""Return locally held BBS credentials as [{registry_id, role}]."""
res = run_cli(["credentials", "list", userid]).raise_for_status()
creds: list[dict] = []
in_local = False
for line in res.stdout.splitlines():
if line.startswith("local credentials:"):
in_local = True
continue
if not line.strip():
continue
if line.startswith(("owner admin capability:", "registries owned:")):
in_local = False
continue
if in_local:
s = line.strip()
if s == "(none)":
continue
if ":" in s:
rid, role = s.rsplit(":", 1)
creds.append({"registry_id": rid.strip(), "role": role.strip()})
return creds
def grant(
userid: str,
server: str,
registry_id: str,
role_name: str,
recipient_contact: str,
) -> CliResult:
return run_cli([
"grant", userid,
"--server", server,
"--registry", registry_id,
"--role", role_name,
"--to", recipient_contact,
]).raise_for_status()
# ── background p2p-listen ────────────────────────────────────────────
class P2PListener:
"""Background ``zkac-node p2p-listen`` process the TUI can stop and watch."""
def __init__(
self,
userid: str,
host: str = "127.0.0.1",
port: int = 0,
timeout_s: float = DEFAULT_LISTEN_TIMEOUT_S,
) -> None:
self.userid = userid
self.host = host
self.port = port if port else _pick_random_port(host)
self.timeout_s = timeout_s
self._proc: subprocess.Popen[str] | None = None
self._stdout_buf = ""
self._lock = threading.Lock()
@property
def address(self) -> str:
return f"{self.host}:{self.port}"
def start(self) -> None:
prefix, env_overrides = _zkac_invocation()
env = {
**os.environ,
**env_overrides,
"PYTHONUNBUFFERED": "1",
"ZKAC_HOME": str(zkac_home()),
}
args = prefix + [
"p2p-listen", self.userid,
"--host", self.host,
"--port", str(self.port),
"--timeout", str(self.timeout_s),
]
self._proc = subprocess.Popen(
args,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=str(ROOT),
env=env,
bufsize=1,
)
threading.Thread(target=self._drain, daemon=True).start()
def _drain(self) -> None:
if self._proc is None or self._proc.stdout is None:
return
for line in self._proc.stdout:
with self._lock:
self._stdout_buf += line
def is_running(self) -> bool:
return self._proc is not None and self._proc.poll() is None
def stop(self) -> None:
if self._proc is None:
return
if self._proc.poll() is None:
self._proc.terminate()
try:
self._proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
self._proc.kill()
self._proc.wait()
def output(self) -> str:
with self._lock:
return self._stdout_buf
def parse_received(self) -> dict | None:
"""Extract ``{registry_id, role}`` from CLI output once a grant has been received."""
out = self.output()
rid = role = None
for line in out.splitlines():
s = line.strip()
if s.startswith("registry:"):
rid = s.split(":", 1)[1].strip()
elif s.startswith("role:"):
role = s.split(":", 1)[1].strip()
if rid and role:
return {"registry_id": rid, "role": role}
return None
def _pick_random_port(host: str) -> int:
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
s.bind((host, 0))
return s.getsockname()[1]
finally:
s.close()
# ── credentials and admin material (CLI on-disk format, read-only) ────
def _user_dir(userid: str) -> Path:
return zkac_home() / userid
def load_credential(userid: str, registry_id_hex: str, role_name: str) -> zkac.Credential:
"""Reconstruct a finalized BBS credential previously stored by ``zkac-node grant``."""
p = _user_dir(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json"
if not p.is_file():
raise FileNotFoundError(
f"no credential for {role_name!r} on {registry_id_hex[:16]}…; "
"ask the registry admin to grant via `zkac-node grant`."
)
data = json.loads(p.read_text())
pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["issuer_pk_b64"]))
return zkac.Credential.finalize(
_b64_decode(data["blind_sig_b64"]),
_b64_decode(data["member_secret_b64"]),
_b64_decode(data["prover_blind_b64"]),
zkac.role_id(data["role_name"]),
int(data["epoch"]),
pk,
)
def load_admin_material(userid: str, registry_id_hex: str) -> dict:
p = _user_dir(userid) / "admin" / f"{registry_id_hex}.json"
if not p.is_file():
raise FileNotFoundError(
f"no admin material for {registry_id_hex[:16]}… under {userid!r}; "
"you are not the owner of this registry"
)
return json.loads(p.read_text())
def load_admin_credential(userid: str, registry_id_hex: str) -> zkac.Credential:
"""Reconstruct the registry-admin BBS credential (role = ``admin_role_id``)."""
data = load_admin_material(userid, registry_id_hex)
pk = zkac.BbsPublicKey.from_bytes(_b64_decode(data["bbs_issuer_public_b64"]))
return zkac.Credential.finalize(
_b64_decode(data["admin_blind_sig_b64"]),
_b64_decode(data["admin_member_secret_b64"]),
_b64_decode(data["admin_prover_blind_b64"]),
zkac.admin_role_id(),
0,
pk,
)
def is_registry_admin(userid: str, registry_id_hex: str) -> bool:
return (_user_dir(userid) / "admin" / f"{registry_id_hex}.json").is_file()

View File

@ -19,6 +19,7 @@ cli = ["zkac-node"]
demo = [
"flask>=3.0",
"flask-sock>=0.7",
"textual>=0.70",
"zkac-node",
]
dev = [

91
uv.lock generated
View File

@ -857,6 +857,35 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0a/dd/8050c947d435c8d4bc94e3252f4d8bb8a76cfb424f043a8680be637a57f1/kiwisolver-1.5.0-pp311-pypy311_pp73-win_amd64.whl", hash = "sha256:59cd8683f575d96df5bb48f6add94afc055012c29e28124fcae2b63661b9efb1", size = 73558, upload-time = "2026-03-09T13:15:52.112Z" },
]
[[package]]
name = "linkify-it-py"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "uc-micro-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/2e/c9/06ea13676ef354f0af6169587ae292d3e2406e212876a413bf9eece4eb23/linkify_it_py-2.1.0.tar.gz", hash = "sha256:43360231720999c10e9328dc3691160e27a718e280673d444c38d7d3aaa3b98b", size = 29158, upload-time = "2026-03-01T07:48:47.683Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b4/de/88b3be5c31b22333b3ca2f6ff1de4e863d8fe45aaea7485f591970ec1d3e/linkify_it_py-2.1.0-py3-none-any.whl", hash = "sha256:0d252c1594ecba2ecedc444053db5d3a9b7ec1b0dd929c8f1d74dce89f86c05e", size = 19878, upload-time = "2026-03-01T07:48:46.098Z" },
]
[[package]]
name = "markdown-it-py"
version = "4.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "mdurl" },
]
sdist = { url = "https://files.pythonhosted.org/packages/5c/5c/f3aedc83549aae71cd52b9e9687fe896e3dc6e966ba20eba04718605d198/markdown_it_py-4.1.0.tar.gz", hash = "sha256:760e3f87b2787c044c5138a5ba107b7c2be26c03b13cc7f8fe42756b65b1df6c", size = 81613, upload-time = "2026-05-06T16:32:13.649Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a8/88/802c82060c54bc7dde21eb0033e337838b8181a1323254aa9ec41cbfc3d1/markdown_it_py-4.1.0-py3-none-any.whl", hash = "sha256:d4939a62a2dd0cd9cb80a191a711ba1d39bac8ed5ef9e9966895b0171c01c46d", size = 90955, upload-time = "2026-05-06T16:32:12.184Z" },
]
[package.optional-dependencies]
linkify = [
{ name = "linkify-it-py" },
]
[[package]]
name = "markupsafe"
version = "3.0.3"
@ -1053,6 +1082,27 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/c5/2a/afe0193b673a79ffd2e01ad999511b7e9e6b49af02bb3759d82a78c3043d/maturin-1.13.1-py3-none-win_arm64.whl", hash = "sha256:2839024dcd65776abb4759e5bca29941971e095574162a4d335191da4be9ff24", size = 8905575, upload-time = "2026-04-09T15:14:03.891Z" },
]
[[package]]
name = "mdit-py-plugins"
version = "0.5.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b2/fd/a756d36c0bfba5f6e39a1cdbdbfdd448dc02692467d83816dff4592a1ebc/mdit_py_plugins-0.5.0.tar.gz", hash = "sha256:f4918cb50119f50446560513a8e311d574ff6aaed72606ddae6d35716fe809c6", size = 44655, upload-time = "2025-08-11T07:25:49.083Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/fb/86/dd6e5db36df29e76c7a7699123569a4a18c1623ce68d826ed96c62643cae/mdit_py_plugins-0.5.0-py3-none-any.whl", hash = "sha256:07a08422fc1936a5d26d146759e9155ea466e842f5ab2f7d2266dd084c8dab1f", size = 57205, upload-time = "2025-08-11T07:25:47.597Z" },
]
[[package]]
name = "mdurl"
version = "0.1.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d6/54/cfe61301667036ec958cb99bd3efefba235e65cdeb9c84d24a8293ba1d90/mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba", size = 8729, upload-time = "2022-08-14T12:40:10.846Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b3/38/89ba8ad64ae25be8de66a6d463314cf1eb366222074cfda9ee839c56a4b4/mdurl-0.1.2-py3-none-any.whl", hash = "sha256:84008a41e51615a49fc9966191ff91509e3c40b939176e643fd50a5c2196b8f8", size = 9979, upload-time = "2022-08-14T12:40:09.779Z" },
]
[[package]]
name = "mistune"
version = "3.2.1"
@ -1607,6 +1657,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/2c/58/ca301544e1fa93ed4f80d724bf5b194f6e4b945841c5bfd555878eea9fcb/referencing-0.37.0-py3-none-any.whl", hash = "sha256:381329a9f99628c9069361716891d34ad94af76e461dcb0335825aecc7692231", size = 26766, upload-time = "2025-10-13T15:30:47.625Z" },
]
[[package]]
name = "rich"
version = "15.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py" },
{ name = "pygments" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c0/8f/0722ca900cc807c13a6a0c696dacf35430f72e0ec571c4275d2371fca3e9/rich-15.0.0.tar.gz", hash = "sha256:edd07a4824c6b40189fb7ac9bc4c52536e9780fbbfbddf6f1e2502c31b068c36", size = 230680, upload-time = "2026-04-12T08:24:00.75Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/82/3b/64d4899d73f91ba49a8c18a8ff3f0ea8f1c1d75481760df8c68ef5235bf5/rich-15.0.0-py3-none-any.whl", hash = "sha256:33bd4ef74232fb73fe9279a257718407f169c09b78a87ad3d296f548e27de0bb", size = 310654, upload-time = "2026-04-12T08:24:02.83Z" },
]
[[package]]
name = "rpds-py"
version = "0.30.0"
@ -1773,6 +1836,23 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/f1/7b/ce1eafaf1a76852e2ec9b22edecf1daa58175c090266e9f6c64afcd81d91/stack_data-0.6.3-py3-none-any.whl", hash = "sha256:d5558e0c25a4cb0853cddad3d77da9891a08cb85dd9f9f91b9f8cd66e511e695", size = 24521, upload-time = "2023-09-30T13:58:03.53Z" },
]
[[package]]
name = "textual"
version = "8.2.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "markdown-it-py", extra = ["linkify"] },
{ name = "mdit-py-plugins" },
{ name = "platformdirs" },
{ name = "pygments" },
{ name = "rich" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/62/1e/1eedc5bac184d00aaa5f9a99095f7e266af3ec46fa926c1051be5d358da1/textual-8.2.5.tar.gz", hash = "sha256:6c894e65a879dadb4f6cf46ddcfedb0173ff7e0cb1fe605ff7b357a597bdbc90", size = 1851596, upload-time = "2026-04-30T08:02:58.956Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/cd/01/c4555f9c8a692ff83d84930150540f743ce94c89234f9e9a15ff4baba3a8/textual-8.2.5-py3-none-any.whl", hash = "sha256:247d2aa2faf222749c321f88a736247f37ee2c023604079c7490bfacddfcd4b2", size = 727050, upload-time = "2026-04-30T08:03:01.421Z" },
]
[[package]]
name = "tinycss2"
version = "1.4.0"
@ -1874,6 +1954,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "uc-micro-py"
version = "2.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/78/67/9a363818028526e2d4579334460df777115bdec1bb77c08f9db88f6389f2/uc_micro_py-2.0.0.tar.gz", hash = "sha256:c53691e495c8db60e16ffc4861a35469b0ba0821fe409a8a7a0a71864d33a811", size = 6611, upload-time = "2026-03-01T06:31:27.526Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/61/73/d21edf5b204d1467e06500080a50f79d49ef2b997c79123a536d4a17d97c/uc_micro_py-2.0.0-py3-none-any.whl", hash = "sha256:3603a3859af53e5a39bc7677713c78ea6589ff188d70f4fee165db88e22b242c", size = 6383, upload-time = "2026-03-01T06:31:26.257Z" },
]
[[package]]
name = "wcwidth"
version = "0.6.0"
@ -1931,6 +2020,7 @@ cli = [
demo = [
{ name = "flask" },
{ name = "flask-sock" },
{ name = "textual" },
{ name = "zkac-node" },
]
dev = [
@ -1950,6 +2040,7 @@ requires-dist = [
{ name = "flask-sock", marker = "extra == 'demo'", specifier = ">=0.7" },
{ name = "ipykernel", specifier = ">=6.31.0" },
{ name = "maturin", marker = "extra == 'dev'", specifier = ">=1.0,<2.0" },
{ name = "textual", marker = "extra == 'demo'", specifier = ">=0.70" },
{ name = "zkac-node", marker = "extra == 'cli'", editable = "cli" },
{ name = "zkac-node", marker = "extra == 'demo'", editable = "cli" },
{ name = "zkac-node", marker = "extra == 'dev'", editable = "cli" },