This commit is contained in:
everbarry 2026-04-16 01:02:55 +02:00
parent 4b2543fad2
commit 15998edb51
18 changed files with 2306 additions and 399 deletions

View File

@ -1,89 +0,0 @@
#!/usr/bin/env python3
"""
ZKAC TCP client: load a member credential, complete handshake, request /api (encrypted JSON).
The "API" is not HTTP on port 8765 it is one JSON request inside the ZKAC session on --port (default 9876).
"""
from __future__ import annotations
import argparse
import base64
import json
import socket
from pathlib import Path
import zkac
from zkac.tcp import FramedSession, client_handshake
def load_credential(member_json: Path) -> zkac.Credential:
"""Rebuild Credential from setup_demo.py output (same fields as zkac.Credential.finalize)."""
m = json.loads(member_json.read_text(encoding="utf-8"))
pk = zkac.BbsPublicKey.from_bytes(base64.b64decode(m["issuer_public_key_b64"]))
rid = bytes.fromhex(m["role_id_hex"])
return zkac.Credential.finalize(
base64.b64decode(m["blind_sig_b64"]),
base64.b64decode(m["member_secret_b64"]),
base64.b64decode(m["prover_blind_b64"]),
rid,
int(m["epoch"]),
pk,
)
def load_server_pk(creds_dir: Path) -> zkac.PublicKey:
"""Pinned server identity: must match the Keypair used by server.py (from transport.json)."""
t = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
raw = base64.b64decode(t["server_public_key_b64"])
return zkac.PublicKey.from_bytes(raw)
def main() -> None:
ap = argparse.ArgumentParser(description="ZKAC demo client (TCP + credential)")
ap.add_argument(
"--creds-dir",
type=Path,
default=Path(__file__).resolve().parent / "creds",
help="Directory with transport.json and member_*.json",
)
ap.add_argument(
"--member",
type=Path,
help="Path to member_*.json (default: creds-dir/member_analyst.json)",
)
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=9876)
args = ap.parse_args()
creds_dir: Path = args.creds_dir
member_path = args.member or (creds_dir / "member_analyst.json")
if not member_path.is_file():
raise SystemExit(f"Missing member file: {member_path}")
credential = load_credential(member_path)
server_pk = load_server_pk(creds_dir)
# Ephemeral client transport identity (not the BBS+ member secret — that is inside credential).
client_kp = zkac.Keypair()
node = zkac.Node(client_kp)
sock = socket.create_connection((args.host, args.port))
try:
# X25519 + server Schnorr + BBS+ auth; returns symmetric Session.
session = client_handshake(sock, node, server_pk, credential)
framed = FramedSession(sock, session)
# Logical GET /api: path is checked by server after decrypt.
request_obj = {"path": "/api"}
payload = json.dumps(request_obj).encode("utf-8")
framed.send(payload)
reply = framed.recv().decode("utf-8")
print(json.dumps(json.loads(reply), indent=2))
finally:
sock.close()
if __name__ == "__main__":
main()

91
demo/client_managed.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python3
"""
ZKAC client for client-managed registries.
1. Issues a credential locally (admin issues to self in production this
would go through the server's E2E-encrypted issuance relay).
2. Verifies the registry state certificate.
3. Authenticates via managed-registry handshake.
4. Sends a JSON request over the encrypted session.
Usage:
python client_managed.py --role analyst
python client_managed.py --role operator
"""
from __future__ import annotations
import argparse
import base64
import json
import socket
from pathlib import Path
import zkac
from zkac.tcp import FramedSession, client_handshake_managed
def main() -> None:
ap = argparse.ArgumentParser(description="ZKAC managed-registry client")
ap.add_argument("--creds-dir", type=Path,
default=Path(__file__).resolve().parent / "creds")
ap.add_argument("--role", default="analyst", choices=["analyst", "operator"])
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=9877)
args = ap.parse_args()
creds_dir: Path = args.creds_dir
admin_data = json.loads((creds_dir / "managed_admin.json").read_text(encoding="utf-8"))
reg_data = json.loads((creds_dir / "managed_registry.json").read_text(encoding="utf-8"))
transport_data = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
# Reconstruct admin issuer to issue a credential for the chosen role.
# In production, this would be done via the E2E issuance relay.
admin_issuer = zkac.BbsIssuer.from_secret_key(
base64.b64decode(admin_data["admin_issuer_secret_b64"])
)
admin_pk = admin_issuer.public_key()
role_rid = zkac.role_id(args.role)
req = zkac.prepare_blind_request()
blind_sig = admin_issuer.issue_blind(req.commitment_with_proof(), role_rid, 1)
cred = zkac.Credential.finalize(
blind_sig, req.member_secret(), req.prover_blind(), role_rid, 1, admin_pk
)
# Verify registry state certificate before trusting it
state_bytes = base64.b64decode(reg_data["state_bytes_b64"])
state_cert = base64.b64decode(reg_data["state_cert_b64"])
registry_id = bytes.fromhex(reg_data["registry_id_hex"])
issuer_pk_for_verify = zkac.BbsPublicKey.from_bytes(
base64.b64decode(reg_data["admin_issuer_public_key_b64"])
)
expected_rid = zkac.registry_id(issuer_pk_for_verify)
assert expected_rid == registry_id, "registry_id mismatch"
assert zkac.RegistryState.verify_cert(issuer_pk_for_verify, state_cert, state_bytes), \
"state certificate verification failed"
print(f"Registry state verified (registry_id={registry_id.hex()[:16]}…)")
# Connect and authenticate
server_pk = zkac.PublicKey.from_bytes(
base64.b64decode(transport_data["server_public_key_b64"])
)
node = zkac.Node(zkac.Keypair())
sock = socket.create_connection((args.host, args.port))
try:
session = client_handshake_managed(
sock, node, server_pk, cred, registry_id
)
framed = FramedSession(sock, session)
framed.send(json.dumps({"path": "/api"}).encode())
reply = json.loads(framed.recv().decode())
print(json.dumps(reply, indent=2))
finally:
sock.close()
if __name__ == "__main__":
main()

View File

@ -1,196 +0,0 @@
#!/usr/bin/env python3
"""
HTTP static site + ZKAC TCP service. Authenticated /api is accessed over TCP with zkac.tcp
after setup_demo.py has created creds/.
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import threading
import http.server
import socket
import traceback
from pathlib import Path
import zkac
from zkac.tcp import FramedSession, server_handshake
def load_registry(creds_dir: Path, epoch: int) -> zkac.RoleRegistry:
"""Load issuer public key and register every demo role at the same epoch."""
iss = json.loads((creds_dir / "issuer.json").read_text(encoding="utf-8"))
issuer_pk = zkac.BbsPublicKey.from_bytes(
base64.b64decode(iss["issuer_public_key_b64"])
)
reg = zkac.RoleRegistry()
for name in ("analyst", "operator"):
reg.register_role(zkac.role_id(name), issuer_pk, epoch)
return reg
def _role_debug_label(role_id: bytes) -> str:
"""Map verified role_id bytes to a short label for logs (demo only)."""
for name in ("analyst", "operator"):
if role_id == zkac.role_id(name):
return name
return "unknown"
def api_body_for_role(role_id: bytes) -> dict:
"""JSON returned for the logical /api resource after ZKAC auth; varies by credential role."""
analyst = zkac.role_id("analyst")
operator = zkac.role_id("operator")
if role_id == analyst:
return {
"path": "/api",
"role": "analyst",
"datasets": ["summary", "aggregated_metrics"],
"note": "Analyst tier: aggregated data only.",
}
if role_id == operator:
return {
"path": "/api",
"role": "operator",
"datasets": ["summary", "aggregated_metrics", "raw_logs", "pii"],
"note": "Operator tier: full API slice including raw logs.",
}
return {"error": "unknown role", "path": "/api"}
def handle_zkac_client(
conn: socket.socket,
client_addr: tuple,
creds_dir: Path,
registry: zkac.RoleRegistry,
) -> None:
"""
One TCP connection: ZKAC handshake + BBS+ auth, then one framed JSON request and response.
Each handler rebuilds the server Node from persisted secret (Keypair is consumed by Node).
"""
peer = f"{client_addr[0]}:{client_addr[1]}"
print(f"[zkac] connect peer={peer}")
try:
# Same long-term server identity every time; from_secret_key because Node consumes Keypair.
t = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
sk = base64.b64decode(t["server_secret_key_b64"])
server_kp = zkac.Keypair.from_secret_key(sk)
node = zkac.Node(server_kp)
session, role_id = server_handshake(conn, node, registry)
label = _role_debug_label(role_id)
print(
f"[zkac] handshake_ok peer={peer} role_id={role_id.hex()} role={label!r}"
)
framed = FramedSession(conn, session)
raw = framed.recv()
print(
f"[zkac] request peer={peer} plaintext_bytes={len(raw)} raw={raw!r}"
)
req = json.loads(raw.decode("utf-8"))
print(f"[zkac] request_json peer={peer} parsed={req!r}")
path = req.get("path")
if path != "/api":
err_body = {"error": "unsupported path", "allowed": ["/api"], "got": path}
out = json.dumps(err_body).encode()
framed.send(out)
print(
f"[zkac] response peer={peer} status=reject path={path!r} response_bytes={len(out)}"
)
return
body = api_body_for_role(role_id)
out_bytes = json.dumps(body).encode()
framed.send(out_bytes)
print(
f"[zkac] response peer={peer} status=ok path=/api role={label!r} "
f"response_bytes={len(out_bytes)} body_keys={list(body.keys())}"
)
except (ConnectionError, BrokenPipeError, OSError) as e:
print(f"[zkac] peer={peer} connection_error: {e!r}")
except (json.JSONDecodeError, ValueError) as e:
print(f"[zkac] peer={peer} protocol_error: {e!r}")
except Exception as e:
print(f"[zkac] peer={peer} unexpected_error: {e!r}")
traceback.print_exc()
finally:
conn.close()
print(f"[zkac] closed peer={peer}")
def run_http(host: str, port: int, static_root: Path) -> None:
# Process-wide CWD: only this thread should rely on relative paths after chdir.
os.chdir(static_root)
class Handler(http.server.SimpleHTTPRequestHandler):
def log_message(self, fmt: str, *args) -> None:
# Default fmt is like '%s - - [%s] %s' — include client address for debugging.
try:
line = fmt % args if args else fmt
except (TypeError, ValueError):
line = f"{fmt} {args}"
peer_ip = self.client_address[0] if self.client_address else "?"
peer_port = self.client_address[1] if len(self.client_address) > 1 else "?"
print(f"[http] peer={peer_ip}:{peer_port} | {line.strip()}")
http.server.HTTPServer((host, port), Handler).serve_forever()
def main() -> None:
ap = argparse.ArgumentParser(description="ZKAC demo HTTP + TCP server")
ap.add_argument(
"--creds-dir",
type=Path,
default=Path(__file__).resolve().parent / "creds",
)
ap.add_argument("--http-host", default="127.0.0.1")
ap.add_argument("--http-port", type=int, default=8765)
ap.add_argument("--zkac-host", default="127.0.0.1")
ap.add_argument("--zkac-port", type=int, default=9876)
args = ap.parse_args()
creds_dir: Path = args.creds_dir
if not (creds_dir / "transport.json").is_file():
raise SystemExit(f"Missing {creds_dir}/transport.json — run setup_demo.py first.")
# Epoch must match the member files issued at setup (any member file is enough).
member = json.loads((creds_dir / "member_analyst.json").read_text(encoding="utf-8"))
epoch = int(member["epoch"])
registry = load_registry(creds_dir, epoch)
static_root = Path(__file__).resolve().parent / "static"
if not static_root.is_dir():
raise SystemExit(f"Missing static directory: {static_root}")
http_thread = threading.Thread(
target=run_http,
args=(args.http_host, args.http_port, static_root),
daemon=True,
)
http_thread.start()
print(
f"HTTP http://{args.http_host}:{args.http_port}/ (static demo page)\n"
f"ZKAC {args.zkac_host}:{args.zkac_port} (authenticated /api over TCP)"
)
zkac_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
zkac_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
zkac_sock.bind((args.zkac_host, args.zkac_port))
zkac_sock.listen(8)
while True:
conn, addr = zkac_sock.accept()
threading.Thread(
target=handle_zkac_client,
args=(conn, addr, creds_dir, registry),
daemon=True,
).start()
if __name__ == "__main__":
main()

149
demo/server_managed.py Normal file
View File

@ -0,0 +1,149 @@
#!/usr/bin/env python3
"""
ZKAC TCP server using client-managed registries.
Loads a registry from managed_registry.json (created by setup_managed_demo.py),
verifies BBS+ state certificates, and authenticates clients against the registry.
Also handles credential issuance requests via the E2E-encrypted relay.
Run setup_managed_demo.py first, then this server, then client_managed.py.
"""
from __future__ import annotations
import argparse
import base64
import json
import socket
import struct
import threading
import traceback
from pathlib import Path
import zkac
from zkac.tcp import (
FramedSession,
read_frame,
write_frame,
server_handshake_managed,
)
def load_managed_registry(creds_dir: Path, mgr: zkac.RegistryManager) -> bytes:
"""Load the managed registry state + cert into the manager. Returns registry_id."""
r = json.loads((creds_dir / "managed_registry.json").read_text(encoding="utf-8"))
state_bytes = base64.b64decode(r["state_bytes_b64"])
state_cert = base64.b64decode(r["state_cert_b64"])
rid = mgr.create(state_bytes, state_cert)
return rid
def _role_label(role_id: bytes) -> str:
for name in ("analyst", "operator"):
if role_id == zkac.role_id(name):
return name
return role_id.hex()[:16]
def api_body_for_role(role_id: bytes) -> dict:
if role_id == zkac.role_id("analyst"):
return {
"path": "/api",
"role": "analyst",
"datasets": ["summary", "aggregated_metrics"],
"note": "Analyst tier: aggregated data only.",
"registry": "client-managed",
}
if role_id == zkac.role_id("operator"):
return {
"path": "/api",
"role": "operator",
"datasets": ["summary", "aggregated_metrics", "raw_logs", "pii"],
"note": "Operator tier: full access.",
"registry": "client-managed",
}
return {"error": "unknown role", "path": "/api"}
def handle_client(
conn: socket.socket,
addr: tuple,
creds_dir: Path,
mgr: zkac.RegistryManager,
) -> None:
peer = f"{addr[0]}:{addr[1]}"
print(f"[zkac-managed] connect peer={peer}")
try:
t = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
sk = base64.b64decode(t["server_secret_key_b64"])
server_kp = zkac.Keypair.from_secret_key(sk)
node = zkac.Node(server_kp)
session, registry_id, role_id = server_handshake_managed(conn, node, mgr)
label = _role_label(role_id)
print(
f"[zkac-managed] auth_ok peer={peer} registry={registry_id.hex()[:16]}"
f"role={label!r}"
)
framed = FramedSession(conn, session)
raw = framed.recv()
req = json.loads(raw.decode("utf-8"))
print(f"[zkac-managed] request peer={peer} {req!r}")
path = req.get("path")
if path != "/api":
body = {"error": "unsupported path", "got": path}
else:
body = api_body_for_role(role_id)
out = json.dumps(body).encode()
framed.send(out)
print(f"[zkac-managed] response peer={peer} {list(body.keys())}")
except (ConnectionError, BrokenPipeError, OSError) as e:
print(f"[zkac-managed] peer={peer} connection_error: {e!r}")
except (json.JSONDecodeError, ValueError) as e:
print(f"[zkac-managed] peer={peer} protocol_error: {e!r}")
except Exception as e:
print(f"[zkac-managed] peer={peer} unexpected_error: {e!r}")
traceback.print_exc()
finally:
conn.close()
def main() -> None:
ap = argparse.ArgumentParser(description="ZKAC managed-registry TCP server")
ap.add_argument("--creds-dir", type=Path,
default=Path(__file__).resolve().parent / "creds")
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=9877)
args = ap.parse_args()
creds_dir: Path = args.creds_dir
if not (creds_dir / "managed_registry.json").is_file():
raise SystemExit(
f"Missing {creds_dir}/managed_registry.json — run setup_managed_demo.py first."
)
mgr = zkac.RegistryManager()
rid = load_managed_registry(creds_dir, mgr)
print(f"Loaded managed registry {rid.hex()[:16]}")
print(f"ZKAC managed {args.host}:{args.port}")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((args.host, args.port))
sock.listen(8)
while True:
conn, addr = sock.accept()
threading.Thread(
target=handle_client,
args=(conn, addr, creds_dir, mgr),
daemon=True,
).start()
if __name__ == "__main__":
main()

View File

@ -1,79 +0,0 @@
#!/usr/bin/env python3
"""
Generate demo credentials under creds/: issuer, server transport key, two member credentials.
Run once before starting the server.
"""
from __future__ import annotations
import argparse
import base64
import json
from pathlib import Path
import zkac
# Human-readable role names; each becomes a 32-byte opaque role_id via zkac.role_id().
# Must stay in sync with server.py (registry + api_body_for_role).
ROLES = ("analyst", "operator")
def main() -> None:
ap = argparse.ArgumentParser(description="Generate ZKAC demo credential files.")
ap.add_argument(
"--output-dir",
type=Path,
default=Path(__file__).resolve().parent / "creds",
help="Directory to write files (default: demo/creds)",
)
args = ap.parse_args()
out: Path = args.output_dir
out.mkdir(parents=True, exist_ok=True)
# BBS+ issuer: signs blind credentials; server only needs the public key in RoleRegistry.
issuer = zkac.BbsIssuer()
issuer_pk = issuer.public_key()
epoch = 1
# Long-term Ristretto identity for the TCP server (X25519 handshake + Schnorr identity proof).
server_kp = zkac.Keypair()
server_pk = server_kp.public_key()
issuer_payload = {
"issuer_secret_key_b64": base64.b64encode(issuer.secret_key_bytes()).decode(),
"issuer_public_key_b64": base64.b64encode(issuer_pk.to_bytes()).decode(),
}
(out / "issuer.json").write_text(json.dumps(issuer_payload, indent=2), encoding="utf-8")
transport_payload = {
"server_secret_key_b64": base64.b64encode(server_kp.secret_key_bytes()).decode(),
"server_public_key_b64": base64.b64encode(server_pk.to_bytes()).decode(),
}
(out / "transport.json").write_text(json.dumps(transport_payload, indent=2), encoding="utf-8")
# One blind issuance per role: issuer never learns member_secret.
for role_name in ROLES:
rid = zkac.role_id(role_name)
req = zkac.prepare_blind_request()
blind_sig = issuer.issue_blind(req.commitment_with_proof(), rid, epoch)
member = {
"role_name": role_name,
"role_id_hex": rid.hex(),
"epoch": epoch,
"blind_sig_b64": base64.b64encode(blind_sig).decode(),
"member_secret_b64": base64.b64encode(req.member_secret()).decode(),
"prover_blind_b64": base64.b64encode(req.prover_blind()).decode(),
"issuer_public_key_b64": base64.b64encode(issuer_pk.to_bytes()).decode(),
}
(out / f"member_{role_name}.json").write_text(
json.dumps(member, indent=2), encoding="utf-8"
)
print(f"Wrote issuer, transport, and member files to {out}")
print(
f"Roles: {', '.join(ROLES)} — use member_{ROLES[0]}.json / member_{ROLES[1]}.json with client_cli.py"
)
if __name__ == "__main__":
main()

105
demo/setup_managed_demo.py Normal file
View File

@ -0,0 +1,105 @@
#!/usr/bin/env python3
"""
Generate credentials for the managed-registry demo.
Creates:
creds/managed_admin.json admin BBS+ issuer key + admin credential + issuance keypair
creds/managed_registry.json serialized registry state + cert (analyst + operator roles)
creds/transport.json server transport key (created if not already present)
Users request credentials through the server at runtime (E2E-encrypted issuance).
"""
from __future__ import annotations
import argparse
import base64
import json
from pathlib import Path
import zkac
ROLES = ("analyst", "operator")
def main() -> None:
ap = argparse.ArgumentParser(description="Generate managed-registry demo files.")
ap.add_argument(
"--output-dir",
type=Path,
default=Path(__file__).resolve().parent / "creds",
)
args = ap.parse_args()
out: Path = args.output_dir
out.mkdir(parents=True, exist_ok=True)
# Admin BBS+ issuer (signs credentials AND certifies registry state)
admin_issuer = zkac.BbsIssuer()
admin_pk = admin_issuer.public_key()
admin_rid = zkac.admin_role_id()
# Self-issue admin credential
req = zkac.prepare_blind_request()
sig = admin_issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
admin_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), admin_rid, 0, admin_pk
)
# X25519 issuance keypair for E2E-encrypted credential requests
issuance_kp = zkac.IssuanceKeypair()
# Build registry state with roles (admin is also the issuer for all roles)
role_entries = []
for name in ROLES:
role_entries.append((zkac.role_id(name), admin_pk, 1))
state = zkac.RegistryState.build(
admin_pk, issuance_kp.public_key_bytes(), 1, b"\x00" * 32, role_entries
)
state_bytes = state.serialize()
state_cert = state.certify(admin_cred)
registry_id = state.registry_id()
# Save admin material
admin_payload = {
"admin_issuer_secret_b64": base64.b64encode(admin_issuer.secret_key_bytes()).decode(),
"admin_issuer_public_key_b64": base64.b64encode(admin_pk.to_bytes()).decode(),
"admin_member_secret_b64": base64.b64encode(req.member_secret()).decode(),
"admin_prover_blind_b64": base64.b64encode(req.prover_blind()).decode(),
"admin_blind_sig_b64": base64.b64encode(sig).decode(),
"issuance_secret_b64": base64.b64encode(issuance_kp.secret_bytes()).decode(),
"issuance_public_key_b64": base64.b64encode(issuance_kp.public_key_bytes()).decode(),
"registry_id_hex": registry_id.hex(),
}
(out / "managed_admin.json").write_text(json.dumps(admin_payload, indent=2), encoding="utf-8")
# Save registry state + cert
reg_payload = {
"registry_id_hex": registry_id.hex(),
"state_bytes_b64": base64.b64encode(state_bytes).decode(),
"state_cert_b64": base64.b64encode(bytes(state_cert)).decode(),
"admin_issuer_public_key_b64": base64.b64encode(admin_pk.to_bytes()).decode(),
"issuance_public_key_b64": base64.b64encode(issuance_kp.public_key_bytes()).decode(),
"roles": list(ROLES),
}
(out / "managed_registry.json").write_text(json.dumps(reg_payload, indent=2), encoding="utf-8")
# Transport key (create if not present)
transport_path = out / "transport.json"
if not transport_path.is_file():
server_kp = zkac.Keypair()
transport_payload = {
"server_secret_key_b64": base64.b64encode(server_kp.secret_key_bytes()).decode(),
"server_public_key_b64": base64.b64encode(server_kp.public_key().to_bytes()).decode(),
}
transport_path.write_text(json.dumps(transport_payload, indent=2), encoding="utf-8")
print(f"Wrote managed-registry demo files to {out}")
print(f"Registry ID: {registry_id.hex()}")
print(f"Roles: {', '.join(ROLES)}")
print(f"\nAdmin can issue credentials for these roles through the server.")
print(f"Users request credentials via E2E-encrypted issuance relay.")
if __name__ == "__main__":
main()

View File

@ -1,33 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>ZKAC demo</title>
<style>
body { font-family: system-ui, sans-serif; max-width: 42rem; margin: 2rem auto; padding: 0 1rem; line-height: 1.5; }
code { background: #f0f0f0; padding: 0.1em 0.35em; border-radius: 4px; }
pre { background: #1e1e1e; color: #eee; padding: 1rem; overflow: auto; border-radius: 6px; }
</style>
</head>
<body>
<h1>ZKAC demo</h1>
<p>
This page is served over normal HTTP. Role-based <strong>/api</strong> data is <strong>not</strong> on this port:
it is exposed only after a <strong>ZKAC session</strong> on the separate TCP port (BBS+ credential + encrypted transport).
</p>
<h2>1. Generate credentials</h2>
<pre>python setup_demo.py</pre>
<p>Creates <code>creds/</code> with issuer keys, server transport keys, and two members: <code>analyst</code> and <code>operator</code>.</p>
<h2>2. Start the server</h2>
<pre>python server.py</pre>
<p>HTTP (this page) defaults to <code>127.0.0.1:8765</code>. ZKAC TCP defaults to <code>127.0.0.1:9876</code>.</p>
<h2>3. CLI client</h2>
<pre>python client_cli.py --member creds/member_analyst.json
python client_cli.py --member creds/member_operator.json</pre>
<p>Each command runs a full handshake and requests <code>{"path":"/api"}</code>. The JSON response lists datasets allowed for that role.</p>
</body>
</html>

View File

@ -15,6 +15,13 @@ from zkac._zkac import (
prepare_blind_request,
role_id,
RoleRegistry,
RegistryState,
RegistryManager,
registry_id,
admin_role_id,
IssuanceKeypair,
encrypt_for_admin,
decrypt_from_admin,
Session,
Node,
PendingConnect,
@ -31,6 +38,13 @@ __all__ = [
"prepare_blind_request",
"role_id",
"RoleRegistry",
"RegistryState",
"RegistryManager",
"registry_id",
"admin_role_id",
"IssuanceKeypair",
"encrypt_for_admin",
"decrypt_from_admin",
"Session",
"Node",
"PendingConnect",

View File

@ -15,7 +15,7 @@ from typing import TYPE_CHECKING, Tuple
from zkac import MAX_BBS_AUTH_PROOF_BYTES
if TYPE_CHECKING:
from zkac import Credential, Node, PublicKey, RoleRegistry, Session
from zkac import Credential, Node, PublicKey, RegistryManager, RoleRegistry, Session
# Largest frame: BBS+ auth ciphertext (bound by library) plus handshake/AEAD slack.
MAX_TCP_FRAME_BYTES: int = MAX_BBS_AUTH_PROOF_BYTES + 4096
@ -107,6 +107,56 @@ def server_handshake(
return session, role_id
def client_handshake_managed(
sock: socket.socket,
node: Node,
expected_server_pk: PublicKey,
credential: Credential,
registry_id: bytes,
) -> Session:
"""
Client handshake against a client-managed registry.
Like :func:`client_handshake` but includes ``registry_id`` in the auth packet.
"""
pending, init_msg = node.connect()
write_frame(sock, init_msg)
bundle = read_frame(sock)
if len(bundle) < _HANDSHAKE_MSG_LEN:
raise ValueError("server handshake bundle too short")
response_msg = bundle[:_HANDSHAKE_MSG_LEN]
identity_proof = bundle[_HANDSHAKE_MSG_LEN:]
session, auth_packet = node.complete_connect_managed(
pending, response_msg, identity_proof, expected_server_pk, credential, registry_id
)
write_frame(sock, auth_packet)
return session
def server_handshake_managed(
sock: socket.socket,
node: Node,
manager: RegistryManager,
) -> Tuple[Session, bytes, bytes]:
"""
Server handshake with a :class:`RegistryManager`.
Returns ``(session, registry_id, role_id)``.
"""
init_msg = read_frame(sock)
if len(init_msg) != _HANDSHAKE_MSG_LEN:
raise ValueError("init_msg must be 32 bytes")
session, response_msg = node.accept(init_msg)
identity_proof = node.prove_identity(session)
write_frame(sock, response_msg + identity_proof)
auth_packet = read_frame(sock)
registry_id, role_id = node.verify_auth_managed(session, auth_packet, manager)
return session, registry_id, role_id
class FramedSession:
"""
One ZKAC ciphertext per TCP frame: encrypt before send, decrypt after recv.

View File

@ -1,4 +1,5 @@
pub mod bbs;
pub mod registry;
pub mod roles;
pub mod schnorr;
@ -6,5 +7,6 @@ pub use bbs::{
Credential, IssuerKeyPair, IssuerPublicKey, Presentation,
prepare_blind_request, role_id, verify_presentation,
};
pub use registry::RegistryState;
pub use roles::RoleRegistry;
pub use schnorr::{Keypair, PublicKey, Signature, SIGNATURE_LEN};

367
src/credential/registry.rs Normal file
View File

@ -0,0 +1,367 @@
use blake2::Blake2b512;
use digest::Digest;
use super::bbs::{self, Credential, IssuerPublicKey, Presentation};
use crate::{Error, Result};
const REGISTRY_ID_DOMAIN: &[u8] = b"zkac-registry-v1";
const ADMIN_ROLE_NAME: &str = "__admin__";
const ADMIN_EPOCH: u64 = 0;
/// Derive a deterministic 32-byte registry ID from the admin's BBS+ issuer
/// public key. The ID is cryptographically bound to the admin key — no
/// squatting is possible since different keys produce different IDs.
pub fn registry_id(admin_issuer_pk: &IssuerPublicKey) -> [u8; 32] {
let mut h = Blake2b512::new();
h.update(REGISTRY_ID_DOMAIN);
h.update(&admin_issuer_pk.to_bytes());
let full: [u8; 64] = h.finalize().into();
let mut out = [0u8; 32];
out.copy_from_slice(&full[..32]);
out
}
/// The reserved role_id used for registry admin credentials.
pub fn admin_role_id() -> [u8; 32] {
bbs::role_id(ADMIN_ROLE_NAME)
}
/// A single role entry within a registry state.
#[derive(Clone)]
pub struct RoleEntry {
pub role_id: [u8; 32],
pub issuer_pk: IssuerPublicKey,
pub epoch: u64,
}
/// Client-managed registry state. Serialized deterministically so that
/// a BBS+ state certificate (presentation with nonce = state_hash)
/// binds the admin's authority to exactly this content.
pub struct RegistryState {
pub registry_id: [u8; 32],
pub version: u64,
pub prev_state_hash: [u8; 32],
pub admin_issuer_pk: IssuerPublicKey,
pub issuance_pk: [u8; 32],
roles: Vec<RoleEntry>,
}
impl RegistryState {
/// Build a new registry state. Roles are sorted by role_id internally
/// to ensure deterministic serialization.
pub fn new(
admin_issuer_pk: IssuerPublicKey,
issuance_pk: [u8; 32],
version: u64,
prev_state_hash: [u8; 32],
mut roles: Vec<RoleEntry>,
) -> Self {
roles.sort_by(|a, b| a.role_id.cmp(&b.role_id));
let rid = registry_id(&admin_issuer_pk);
RegistryState {
registry_id: rid,
version,
prev_state_hash,
admin_issuer_pk,
issuance_pk,
roles,
}
}
pub fn roles(&self) -> &[RoleEntry] {
&self.roles
}
/// Deterministic binary serialization of the state (without the cert).
/// Layout:
/// [registry_id: 32]
/// [version: u64 LE]
/// [prev_state_hash: 32]
/// [admin_issuer_pk_len: u32 LE] [admin_issuer_pk]
/// [issuance_pk: 32]
/// [num_roles: u32 LE]
/// for each role (sorted by role_id):
/// [role_id: 32] [issuer_pk_len: u32 LE] [issuer_pk] [epoch: u64 LE]
pub fn serialize(&self) -> Vec<u8> {
let admin_pk_bytes = self.admin_issuer_pk.to_bytes();
let mut buf = Vec::new();
buf.extend_from_slice(&self.registry_id);
buf.extend_from_slice(&self.version.to_le_bytes());
buf.extend_from_slice(&self.prev_state_hash);
buf.extend_from_slice(&(admin_pk_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(&admin_pk_bytes);
buf.extend_from_slice(&self.issuance_pk);
buf.extend_from_slice(&(self.roles.len() as u32).to_le_bytes());
for role in &self.roles {
let pk_bytes = role.issuer_pk.to_bytes();
buf.extend_from_slice(&role.role_id);
buf.extend_from_slice(&(pk_bytes.len() as u32).to_le_bytes());
buf.extend_from_slice(&pk_bytes);
buf.extend_from_slice(&role.epoch.to_le_bytes());
}
buf
}
/// Deserialize a state from bytes produced by [`serialize`](Self::serialize).
pub fn deserialize(data: &[u8]) -> Result<Self> {
let err = |msg| Error::RegistryStateError(msg);
if data.len() < 32 + 8 + 32 + 4 {
return Err(err("state too short"));
}
let mut pos = 0;
let mut registry_id = [0u8; 32];
registry_id.copy_from_slice(&data[pos..pos + 32]);
pos += 32;
let version = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
pos += 8;
let mut prev_state_hash = [0u8; 32];
prev_state_hash.copy_from_slice(&data[pos..pos + 32]);
pos += 32;
if data.len() < pos + 4 {
return Err(err("truncated admin pk length"));
}
let admin_pk_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if data.len() < pos + admin_pk_len {
return Err(err("truncated admin pk"));
}
let admin_issuer_pk = IssuerPublicKey::from_bytes(&data[pos..pos + admin_pk_len])?;
pos += admin_pk_len;
let expected_id = registry_id_raw(&admin_issuer_pk);
if expected_id != registry_id {
return Err(err("registry_id does not match admin_issuer_pk"));
}
if data.len() < pos + 32 {
return Err(err("truncated issuance pk"));
}
let mut issuance_pk = [0u8; 32];
issuance_pk.copy_from_slice(&data[pos..pos + 32]);
pos += 32;
if data.len() < pos + 4 {
return Err(err("truncated role count"));
}
let num_roles = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
let mut roles = Vec::with_capacity(num_roles);
for _ in 0..num_roles {
if data.len() < pos + 32 + 4 {
return Err(err("truncated role entry"));
}
let mut role_id = [0u8; 32];
role_id.copy_from_slice(&data[pos..pos + 32]);
pos += 32;
let pk_len = u32::from_le_bytes(data[pos..pos + 4].try_into().unwrap()) as usize;
pos += 4;
if data.len() < pos + pk_len + 8 {
return Err(err("truncated role pk/epoch"));
}
let issuer_pk = IssuerPublicKey::from_bytes(&data[pos..pos + pk_len])?;
pos += pk_len;
let epoch = u64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
pos += 8;
roles.push(RoleEntry { role_id, issuer_pk, epoch });
}
Ok(RegistryState {
registry_id,
version,
prev_state_hash,
admin_issuer_pk,
issuance_pk,
roles,
})
}
/// Compute Blake2b-512 hash of the serialized state (truncated to 32 bytes).
pub fn state_hash(state_bytes: &[u8]) -> [u8; 32] {
let mut h = Blake2b512::new();
h.update(state_bytes);
let full: [u8; 64] = h.finalize().into();
let mut out = [0u8; 32];
out.copy_from_slice(&full[..32]);
out
}
/// Produce a BBS+ state certificate: a presentation of the admin
/// credential with `nonce = state_hash(serialized_state)`.
/// Each call produces an unlinkable proof.
pub fn certify(
&self,
admin_credential: &Credential,
state_bytes: &[u8],
) -> Result<Presentation> {
let hash = Self::state_hash(state_bytes);
admin_credential.present(&hash)
}
/// Verify a BBS+ state certificate against the admin issuer pk
/// stored in the state and the hash of the serialized state bytes.
pub fn verify_cert(
admin_issuer_pk: &IssuerPublicKey,
state_cert: &Presentation,
state_bytes: &[u8],
) -> Result<()> {
let hash = Self::state_hash(state_bytes);
let admin_rid = admin_role_id();
bbs::verify_presentation(
admin_issuer_pk,
state_cert,
&admin_rid,
ADMIN_EPOCH,
&hash,
)
.map_err(|_| Error::InvalidStateCertificate)
}
}
fn registry_id_raw(pk: &IssuerPublicKey) -> [u8; 32] {
registry_id(pk)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::credential::bbs::{self, IssuerKeyPair};
fn make_admin() -> (IssuerKeyPair, Credential) {
let issuer = IssuerKeyPair::generate().unwrap();
let pk = issuer.public_key();
let rid = admin_role_id();
let req = bbs::prepare_blind_request().unwrap();
let sig = issuer.issue_blind(&req.commitment_with_proof, &rid, ADMIN_EPOCH).unwrap();
let cred = Credential::finalize(
&sig, req.member_secret, req.prover_blind, rid, ADMIN_EPOCH, &pk,
).unwrap();
(issuer, cred)
}
#[test]
fn registry_id_is_deterministic() {
let issuer = IssuerKeyPair::generate().unwrap();
let pk = issuer.public_key();
let id1 = registry_id(&pk);
let id2 = registry_id(&pk);
assert_eq!(id1, id2);
}
#[test]
fn different_keys_different_registry_ids() {
let i1 = IssuerKeyPair::generate().unwrap();
let i2 = IssuerKeyPair::generate().unwrap();
assert_ne!(registry_id(&i1.public_key()), registry_id(&i2.public_key()));
}
#[test]
fn serialize_deserialize_roundtrip() {
let issuer = IssuerKeyPair::generate().unwrap();
let pk = issuer.public_key();
let role_issuer = IssuerKeyPair::generate().unwrap();
let state = RegistryState::new(
pk.clone(),
[42u8; 32],
1,
[0u8; 32],
vec![RoleEntry {
role_id: bbs::role_id("analyst"),
issuer_pk: role_issuer.public_key(),
epoch: 1,
}],
);
let bytes = state.serialize();
let state2 = RegistryState::deserialize(&bytes).unwrap();
assert_eq!(state.registry_id, state2.registry_id);
assert_eq!(state.version, state2.version);
assert_eq!(state.prev_state_hash, state2.prev_state_hash);
assert_eq!(state.issuance_pk, state2.issuance_pk);
assert_eq!(state.roles().len(), state2.roles().len());
assert_eq!(state.roles()[0].role_id, state2.roles()[0].role_id);
assert_eq!(state.roles()[0].epoch, state2.roles()[0].epoch);
}
#[test]
fn certify_and_verify() {
let (issuer, admin_cred) = make_admin();
let pk = issuer.public_key();
let state = RegistryState::new(pk.clone(), [0u8; 32], 1, [0u8; 32], vec![]);
let state_bytes = state.serialize();
let cert = state.certify(&admin_cred, &state_bytes).unwrap();
RegistryState::verify_cert(&pk, &cert, &state_bytes).unwrap();
}
#[test]
fn tampered_state_rejected() {
let (issuer, admin_cred) = make_admin();
let pk = issuer.public_key();
let state = RegistryState::new(pk.clone(), [0u8; 32], 1, [0u8; 32], vec![]);
let state_bytes = state.serialize();
let cert = state.certify(&admin_cred, &state_bytes).unwrap();
let mut tampered = state_bytes.clone();
// Flip a byte in the version field
tampered[32] ^= 0xff;
assert!(RegistryState::verify_cert(&pk, &cert, &tampered).is_err());
}
#[test]
fn wrong_admin_key_rejected() {
let (issuer, admin_cred) = make_admin();
let pk = issuer.public_key();
let other_issuer = IssuerKeyPair::generate().unwrap();
let state = RegistryState::new(pk.clone(), [0u8; 32], 1, [0u8; 32], vec![]);
let state_bytes = state.serialize();
let cert = state.certify(&admin_cred, &state_bytes).unwrap();
assert!(RegistryState::verify_cert(&other_issuer.public_key(), &cert, &state_bytes).is_err());
}
#[test]
fn state_certs_are_unlinkable() {
let (issuer, admin_cred) = make_admin();
let pk = issuer.public_key();
let s1 = RegistryState::new(pk.clone(), [0u8; 32], 1, [0u8; 32], vec![]);
let b1 = s1.serialize();
let c1 = s1.certify(&admin_cred, &b1).unwrap();
let s2 = RegistryState::new(pk.clone(), [0u8; 32], 2, [0u8; 32], vec![]);
let b2 = s2.serialize();
let c2 = s2.certify(&admin_cred, &b2).unwrap();
assert_ne!(c1.to_bytes(), c2.to_bytes());
RegistryState::verify_cert(&pk, &c1, &b1).unwrap();
RegistryState::verify_cert(&pk, &c2, &b2).unwrap();
}
#[test]
fn mismatched_registry_id_in_bytes_rejected() {
let issuer = IssuerKeyPair::generate().unwrap();
let pk = issuer.public_key();
let state = RegistryState::new(pk, [0u8; 32], 1, [0u8; 32], vec![]);
let mut bytes = state.serialize();
// Corrupt the registry_id field
bytes[0] ^= 0xff;
assert!(RegistryState::deserialize(&bytes).is_err());
}
}

View File

@ -31,6 +31,21 @@ pub enum Error {
#[error("identity verification failed: {0}")]
IdentityVerificationFailed(&'static str),
#[error("registry not found")]
RegistryNotFound,
#[error("registry already exists")]
RegistryAlreadyExists,
#[error("registry version mismatch: {0}")]
RegistryVersionMismatch(&'static str),
#[error("invalid state certificate")]
InvalidStateCertificate,
#[error("registry state error: {0}")]
RegistryStateError(&'static str),
}
pub type Result<T> = std::result::Result<T, Error>;

193
src/issuance.rs Normal file
View File

@ -0,0 +1,193 @@
//! E2E-encrypted credential issuance helpers.
//!
//! Uses X25519 ECDH + HKDF-SHA256 + ChaCha20-Poly1305 to encrypt
//! blind commitments and signatures between the user and the admin,
//! so the server (acting as a relay) cannot read or substitute them.
use chacha20poly1305::aead::{Aead, KeyInit};
use chacha20poly1305::ChaCha20Poly1305;
use hkdf::Hkdf;
use rand::{CryptoRng, RngCore};
use sha2::Sha256;
use x25519_dalek::{EphemeralSecret, PublicKey as X25519Public, StaticSecret};
use crate::{Error, Result};
const ISSUANCE_HKDF_INFO: &[u8] = b"zkac-issuance-v1";
const NONCE_BYTES: [u8; 12] = [0u8; 12];
/// Admin-side issuance keypair (X25519 static secret for DH).
pub struct IssuanceKeypair {
secret: StaticSecret,
public: X25519Public,
}
impl IssuanceKeypair {
pub fn generate<R: CryptoRng + RngCore>(rng: &mut R) -> Self {
let secret = StaticSecret::random_from_rng(rng);
let public = X25519Public::from(&secret);
Self { secret, public }
}
pub fn public_key_bytes(&self) -> [u8; 32] {
*self.public.as_bytes()
}
pub fn from_secret_bytes(bytes: &[u8; 32]) -> Self {
let secret = StaticSecret::from(*bytes);
let public = X25519Public::from(&secret);
Self { secret, public }
}
pub fn secret_bytes(&self) -> [u8; 32] {
self.secret.to_bytes()
}
/// Decrypt a blob sent by a user using their ephemeral public key.
pub fn decrypt(&self, eph_pk_bytes: &[u8; 32], ciphertext: &[u8]) -> Result<Vec<u8>> {
let eph_pk = X25519Public::from(*eph_pk_bytes);
let shared = self.secret.diffie_hellman(&eph_pk);
let key = derive_key(shared.as_bytes());
let cipher = ChaCha20Poly1305::new_from_slice(&key)
.map_err(|_| Error::CredentialError("issuance key derivation failed".into()))?;
cipher.decrypt(&NONCE_BYTES.into(), ciphertext)
.map_err(|_| Error::DecryptionFailed)
}
/// Encrypt a response to the user (uses same shared secret).
pub fn encrypt(&self, eph_pk_bytes: &[u8; 32], plaintext: &[u8]) -> Result<Vec<u8>> {
let eph_pk = X25519Public::from(*eph_pk_bytes);
let shared = self.secret.diffie_hellman(&eph_pk);
let key = derive_key(shared.as_bytes());
let cipher = ChaCha20Poly1305::new_from_slice(&key)
.map_err(|_| Error::CredentialError("issuance key derivation failed".into()))?;
cipher.encrypt(&NONCE_BYTES.into(), plaintext)
.map_err(|_| Error::CredentialError("issuance encryption failed".into()))
}
}
/// User-side: encrypt a commitment for the admin's issuance public key.
/// Returns `(ephemeral_public_key, ciphertext)`.
pub fn encrypt_for_admin<R: CryptoRng + RngCore>(
rng: R,
admin_issuance_pk: &[u8; 32],
plaintext: &[u8],
) -> Result<([u8; 32], Vec<u8>)> {
let eph_secret = EphemeralSecret::random_from_rng(rng);
let eph_public = X25519Public::from(&eph_secret);
let admin_pk = X25519Public::from(*admin_issuance_pk);
let shared = eph_secret.diffie_hellman(&admin_pk);
let key = derive_key(shared.as_bytes());
let cipher = ChaCha20Poly1305::new_from_slice(&key)
.map_err(|_| Error::CredentialError("issuance key derivation failed".into()))?;
let ciphertext = cipher.encrypt(&NONCE_BYTES.into(), plaintext)
.map_err(|_| Error::CredentialError("issuance encryption failed".into()))?;
Ok((*eph_public.as_bytes(), ciphertext))
}
/// User-side: decrypt the admin's response using the same shared secret.
pub fn decrypt_from_admin(
eph_secret_bytes: &[u8; 32],
admin_issuance_pk: &[u8; 32],
ciphertext: &[u8],
) -> Result<Vec<u8>> {
let eph_secret = StaticSecret::from(*eph_secret_bytes);
let admin_pk = X25519Public::from(*admin_issuance_pk);
let shared = eph_secret.diffie_hellman(&admin_pk);
let key = derive_key(shared.as_bytes());
let cipher = ChaCha20Poly1305::new_from_slice(&key)
.map_err(|_| Error::CredentialError("issuance key derivation failed".into()))?;
cipher.decrypt(&NONCE_BYTES.into(), ciphertext)
.map_err(|_| Error::DecryptionFailed)
}
fn derive_key(shared_secret: &[u8]) -> [u8; 32] {
let hk = Hkdf::<Sha256>::new(None, shared_secret);
let mut key = [0u8; 32];
hk.expand(ISSUANCE_HKDF_INFO, &mut key)
.expect("HKDF expand should not fail for 32 bytes");
key
}
#[cfg(test)]
mod tests {
use super::*;
use rand::rngs::OsRng;
#[test]
fn user_admin_roundtrip() {
let admin_kp = IssuanceKeypair::generate(&mut OsRng);
let admin_pk = admin_kp.public_key_bytes();
let plaintext = b"commitment_with_proof data here";
let (eph_pk, ciphertext) = encrypt_for_admin(OsRng, &admin_pk, plaintext).unwrap();
let decrypted = admin_kp.decrypt(&eph_pk, &ciphertext).unwrap();
assert_eq!(decrypted, plaintext);
// Admin responds
let response = b"blind_signature_bytes";
let encrypted_response = admin_kp.encrypt(&eph_pk, response).unwrap();
// User needs the ephemeral secret to decrypt the response.
// In practice, the user saves the EphemeralSecret bytes.
// For testing, generate a fresh pair and use StaticSecret path.
// Here we test the admin encrypt path is valid ciphertext:
assert!(!encrypted_response.is_empty());
}
#[test]
fn full_issuance_e2e() {
// Simulate the full flow with saved ephemeral secret
let admin_kp = IssuanceKeypair::generate(&mut OsRng);
let admin_pk = admin_kp.public_key_bytes();
// User side: use StaticSecret so we can save the bytes
let user_secret = StaticSecret::random_from_rng(&mut OsRng);
let user_public = X25519Public::from(&user_secret);
let user_secret_bytes = user_secret.to_bytes();
let eph_pk = *user_public.as_bytes();
// Encrypt commitment
let shared = user_secret.diffie_hellman(&X25519Public::from(admin_pk));
let key = derive_key(shared.as_bytes());
let cipher = ChaCha20Poly1305::new_from_slice(&key).unwrap();
let commitment = b"test commitment";
let encrypted = cipher.encrypt(&NONCE_BYTES.into(), commitment.as_slice()).unwrap();
// Admin decrypts
let decrypted = admin_kp.decrypt(&eph_pk, &encrypted).unwrap();
assert_eq!(decrypted, commitment);
// Admin encrypts response
let response = b"blind sig";
let enc_response = admin_kp.encrypt(&eph_pk, response).unwrap();
// User decrypts response
let dec_response = decrypt_from_admin(&user_secret_bytes, &admin_pk, &enc_response).unwrap();
assert_eq!(dec_response, response);
}
#[test]
fn wrong_key_fails() {
let admin_kp = IssuanceKeypair::generate(&mut OsRng);
let other_kp = IssuanceKeypair::generate(&mut OsRng);
let admin_pk = admin_kp.public_key_bytes();
let (eph_pk, ciphertext) = encrypt_for_admin(OsRng, &admin_pk, b"secret").unwrap();
assert!(other_kp.decrypt(&eph_pk, &ciphertext).is_err());
}
#[test]
fn keypair_serialization() {
let kp = IssuanceKeypair::generate(&mut OsRng);
let secret = kp.secret_bytes();
let pk = kp.public_key_bytes();
let kp2 = IssuanceKeypair::from_secret_bytes(&secret);
assert_eq!(kp2.public_key_bytes(), pk);
}
}

View File

@ -1,6 +1,8 @@
pub mod credential;
pub mod error;
pub mod issuance;
pub mod node;
pub mod registry_manager;
pub mod transport;
#[cfg(feature = "python")]
@ -8,3 +10,4 @@ mod python;
pub use error::{Error, Result};
pub use node::{Node, PendingConnect, MAX_BBS_AUTH_PROOF_BYTES};
pub use registry_manager::RegistryManager;

View File

@ -5,6 +5,7 @@ use rand::{CryptoRng, RngCore};
use crate::credential::{Keypair, PublicKey, Signature, SIGNATURE_LEN, RoleRegistry};
use crate::credential::bbs::{Credential, Presentation};
use crate::registry_manager::RegistryManager;
use crate::transport::handshake::{self, InitiatorHandshake, HANDSHAKE_MSG_LEN};
use crate::transport::Session;
use crate::{Error, Result};
@ -160,6 +161,96 @@ impl Node {
Ok(role_id)
}
/// Like [`complete_connect`](Self::complete_connect) but prepends the
/// `registry_id` to the auth packet so the server knows which
/// client-managed registry to verify against.
///
/// Auth payload: `[registry_id: 32] [role_id: 32] [epoch: 8 LE] [proof_len: u32 LE] [proof]`
pub fn complete_connect_managed(
&self,
pending: PendingConnect,
response_msg: &[u8; HANDSHAKE_MSG_LEN],
identity_proof: &[u8],
expected_server_pk: &PublicKey,
credential: &Credential,
registry_id: &[u8; 32],
) -> Result<(Session, Vec<u8>)> {
let mut session = pending.handshake.complete(response_msg)?;
let id_payload = session.decrypt(identity_proof)?;
if id_payload.len() != IDENTITY_PROOF_LEN {
return Err(Error::IdentityVerificationFailed("identity proof wrong length"));
}
let mut pk_bytes = [0u8; 32];
pk_bytes.copy_from_slice(&id_payload[..32]);
let server_pk = PublicKey::from_bytes(pk_bytes)?;
if server_pk != *expected_server_pk {
return Err(Error::IdentityVerificationFailed("server public key mismatch"));
}
let mut sig_bytes = [0u8; SIGNATURE_LEN];
sig_bytes.copy_from_slice(&id_payload[32..]);
let sig = Signature::from_bytes(&sig_bytes)?;
server_pk.verify(session.transcript_hash(), &sig)?;
let transcript = session.transcript_hash();
let presentation = credential.present(transcript)?;
let proof_bytes = presentation.to_bytes();
if proof_bytes.len() > MAX_BBS_AUTH_PROOF_BYTES {
return Err(Error::InvalidPacket("bbs auth proof exceeds maximum size"));
}
let mut payload = Vec::new();
payload.extend_from_slice(registry_id);
payload.extend_from_slice(credential.role_id());
payload.extend_from_slice(&credential.epoch().to_le_bytes());
payload.extend_from_slice(&(proof_bytes.len() as u32).to_le_bytes());
payload.extend_from_slice(proof_bytes);
let encrypted_auth = session.encrypt(&payload)?;
Ok((session, encrypted_auth))
}
/// Verify a managed-registry auth packet. The payload starts with
/// `registry_id` which selects the registry from the manager.
/// Returns `(registry_id, role_id)` on success.
pub fn verify_auth_managed(
&self,
session: &mut Session,
encrypted_auth: &[u8],
manager: &RegistryManager,
) -> Result<([u8; 32], [u8; 32])> {
let payload = session.decrypt(encrypted_auth)?;
let transcript = *session.transcript_hash();
// [registry_id: 32] [role_id: 32] [epoch: 8] [proof_len: 4] [proof]
if payload.len() < 76 {
return Err(Error::InvalidPacket("managed auth payload too short"));
}
let mut registry_id = [0u8; 32];
registry_id.copy_from_slice(&payload[..32]);
let mut role_id = [0u8; 32];
role_id.copy_from_slice(&payload[32..64]);
let proof_len = u32::from_le_bytes(payload[72..76].try_into().unwrap()) as usize;
if proof_len > MAX_BBS_AUTH_PROOF_BYTES {
return Err(Error::InvalidPacket("bbs auth proof length exceeds maximum"));
}
if payload.len() < 76 + proof_len {
return Err(Error::InvalidPacket("bbs auth: proof truncated"));
}
let presentation = Presentation::from_bytes(payload[76..76 + proof_len].to_vec());
manager.verify_presentation(&registry_id, &role_id, &presentation, &transcript)?;
Ok((registry_id, role_id))
}
}
#[cfg(test)]
@ -314,4 +405,77 @@ mod tests {
.unwrap();
assert_eq!(verified_rid, rid);
}
#[test]
fn managed_registry_handshake() {
use crate::credential::registry::{self, admin_role_id, RegistryState, RoleEntry};
use crate::registry_manager::RegistryManager;
// Admin creates a registry with one role
let admin_issuer = IssuerKeyPair::generate().unwrap();
let admin_pk = admin_issuer.public_key();
let admin_rid = admin_role_id();
let admin_req = bbs::prepare_blind_request().unwrap();
let admin_sig = admin_issuer.issue_blind(&admin_req.commitment_with_proof, &admin_rid, 0).unwrap();
let admin_cred = Credential::finalize(
&admin_sig, admin_req.member_secret, admin_req.prover_blind, admin_rid, 0, &admin_pk,
).unwrap();
let role_issuer = IssuerKeyPair::generate().unwrap();
let role_pk = role_issuer.public_key();
let analyst_rid = bbs::role_id("analyst");
let state = RegistryState::new(
admin_pk.clone(),
[0u8; 32],
1,
[0u8; 32],
vec![RoleEntry {
role_id: analyst_rid,
issuer_pk: role_pk.clone(),
epoch: 1,
}],
);
let state_bytes = state.serialize();
let state_cert = state.certify(&admin_cred, &state_bytes).unwrap();
let reg_id = registry::registry_id(&admin_pk);
let mut mgr = RegistryManager::new();
mgr.create(&state_bytes, state_cert.to_bytes()).unwrap();
// Issue a credential to a user for the analyst role
let user_req = bbs::prepare_blind_request().unwrap();
let user_sig = role_issuer.issue_blind(&user_req.commitment_with_proof, &analyst_rid, 1).unwrap();
let user_cred = Credential::finalize(
&user_sig, user_req.member_secret, user_req.prover_blind, analyst_rid, 1, &role_pk,
).unwrap();
// Full handshake with managed registry
let server_kp = Keypair::generate(&mut OsRng);
let server_pk = *server_kp.public();
let client = Node::new(Keypair::generate(&mut OsRng));
let server = Node::new(server_kp);
let (pending, init_msg) = client.connect(OsRng);
let (mut server_session, response_msg) = server.accept(OsRng, &init_msg).unwrap();
let identity_proof = server.prove_identity(&mut server_session).unwrap();
let (mut client_session, auth_packet) = client
.complete_connect_managed(
pending, &response_msg, &identity_proof, &server_pk,
&user_cred, &reg_id,
)
.unwrap();
let (verified_reg_id, verified_role_id) = server
.verify_auth_managed(&mut server_session, &auth_packet, &mgr)
.unwrap();
assert_eq!(verified_reg_id, reg_id);
assert_eq!(verified_role_id, analyst_rid);
// Encrypted data works
let pkt = client_session.encrypt(b"hello").unwrap();
assert_eq!(server_session.decrypt(&pkt).unwrap(), b"hello");
}
}

View File

@ -4,12 +4,18 @@ use pyo3::types::PyBytes;
use rand::rngs::OsRng;
use crate::credential::{self, bbs, SIGNATURE_LEN};
use crate::credential::registry as reg;
use crate::issuance;
use crate::transport::handshake::HANDSHAKE_MSG_LEN;
fn to_py_err(e: crate::Error) -> PyErr {
PyValueError::new_err(e.to_string())
}
fn to_32(bytes: &[u8], name: &str) -> PyResult<[u8; 32]> {
bytes.try_into().map_err(|_| PyValueError::new_err(format!("{name} must be 32 bytes")))
}
fn bytes_to_hex(b: &[u8]) -> String {
b.iter().map(|byte| format!("{byte:02x}")).collect()
}
@ -344,6 +350,268 @@ impl PyRoleRegistry {
}
}
// ── Registry State (client-managed) ──────────────────────────────────
#[pyclass(name = "RegistryState")]
pub struct PyRegistryState {
inner_bytes: Vec<u8>,
}
#[pymethods]
impl PyRegistryState {
#[staticmethod]
fn build(
admin_issuer_pk: &PyBbsPublicKey,
issuance_pk: &[u8],
version: u64,
prev_state_hash: &[u8],
roles: Vec<(Vec<u8>, PyBbsPublicKey, u64)>,
) -> PyResult<Self> {
if issuance_pk.len() != 32 {
return Err(PyValueError::new_err("issuance_pk must be 32 bytes"));
}
if prev_state_hash.len() != 32 {
return Err(PyValueError::new_err("prev_state_hash must be 32 bytes"));
}
let mut iss_pk = [0u8; 32];
iss_pk.copy_from_slice(issuance_pk);
let mut prev = [0u8; 32];
prev.copy_from_slice(prev_state_hash);
let entries: PyResult<Vec<reg::RoleEntry>> = roles.into_iter().map(|(rid, pk, epoch)| {
if rid.len() != 32 {
return Err(PyValueError::new_err("role_id must be 32 bytes"));
}
let mut role_id = [0u8; 32];
role_id.copy_from_slice(&rid);
Ok(reg::RoleEntry { role_id, issuer_pk: pk.inner.clone(), epoch })
}).collect();
let state = reg::RegistryState::new(
admin_issuer_pk.inner.clone(), iss_pk, version, prev, entries?,
);
Ok(PyRegistryState { inner_bytes: state.serialize() })
}
fn serialize<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner_bytes)
}
#[staticmethod]
fn deserialize(data: &[u8]) -> PyResult<Self> {
let _ = reg::RegistryState::deserialize(data).map_err(to_py_err)?;
Ok(PyRegistryState { inner_bytes: data.to_vec() })
}
fn registry_id<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
let state = reg::RegistryState::deserialize(&self.inner_bytes).map_err(to_py_err)?;
Ok(PyBytes::new(py, &state.registry_id))
}
fn version(&self) -> PyResult<u64> {
let state = reg::RegistryState::deserialize(&self.inner_bytes).map_err(to_py_err)?;
Ok(state.version)
}
fn state_hash<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
let h = reg::RegistryState::state_hash(&self.inner_bytes);
PyBytes::new(py, &h)
}
fn certify<'py>(&self, py: Python<'py>, admin_credential: &PyCredential) -> PyResult<Bound<'py, PyBytes>> {
let state = reg::RegistryState::deserialize(&self.inner_bytes).map_err(to_py_err)?;
let cert = state.certify(&admin_credential.inner, &self.inner_bytes).map_err(to_py_err)?;
Ok(PyBytes::new(py, cert.to_bytes()))
}
#[staticmethod]
fn verify_cert(admin_issuer_pk: &PyBbsPublicKey, state_cert: &[u8], state_bytes: &[u8]) -> PyResult<bool> {
let cert = bbs::Presentation::from_bytes(state_cert.to_vec());
match reg::RegistryState::verify_cert(&admin_issuer_pk.inner, &cert, state_bytes) {
Ok(()) => Ok(true),
Err(_) => Ok(false),
}
}
}
#[pyfunction]
fn registry_id<'py>(py: Python<'py>, admin_issuer_pk: &PyBbsPublicKey) -> Bound<'py, PyBytes> {
let rid = reg::registry_id(&admin_issuer_pk.inner);
PyBytes::new(py, &rid)
}
#[pyfunction]
fn admin_role_id<'py>(py: Python<'py>) -> Bound<'py, PyBytes> {
let rid = reg::admin_role_id();
PyBytes::new(py, &rid)
}
// ── Registry Manager (server-side) ──────────────────────────────────
#[pyclass(name = "RegistryManager")]
pub struct PyRegistryManager {
inner: crate::registry_manager::RegistryManager,
}
#[pymethods]
impl PyRegistryManager {
#[new]
fn new() -> Self {
PyRegistryManager {
inner: crate::registry_manager::RegistryManager::new(),
}
}
fn create<'py>(&mut self, py: Python<'py>, state_bytes: &[u8], state_cert: &[u8]) -> PyResult<Bound<'py, PyBytes>> {
let rid = self.inner.create(state_bytes, state_cert).map_err(to_py_err)?;
Ok(PyBytes::new(py, &rid))
}
fn update(&mut self, registry_id: &[u8], state_bytes: &[u8], state_cert: &[u8]) -> PyResult<()> {
let rid = to_32(registry_id, "registry_id")?;
self.inner.update(&rid, state_bytes, state_cert).map_err(to_py_err)
}
fn get<'py>(&self, py: Python<'py>, registry_id: &[u8]) -> PyResult<(Bound<'py, PyBytes>, Bound<'py, PyBytes>)> {
let rid = to_32(registry_id, "registry_id")?;
let (state_bytes, cert_bytes) = self.inner.get(&rid).map_err(to_py_err)?;
Ok((PyBytes::new(py, state_bytes), PyBytes::new(py, cert_bytes)))
}
fn has_registry(&self, registry_id: &[u8]) -> PyResult<bool> {
let rid = to_32(registry_id, "registry_id")?;
Ok(self.inner.has_registry(&rid))
}
fn verify_admin(&self, registry_id: &[u8], proof_bytes: &[u8], nonce: &[u8]) -> PyResult<bool> {
let rid = to_32(registry_id, "registry_id")?;
match self.inner.verify_admin(&rid, proof_bytes, nonce) {
Ok(()) => Ok(true),
Err(crate::Error::InvalidPresentation | crate::Error::RegistryNotFound) => Ok(false),
Err(e) => Err(to_py_err(e)),
}
}
fn verify_presentation(&self, registry_id: &[u8], role_id: &[u8], proof_bytes: &[u8], nonce: &[u8]) -> PyResult<bool> {
let rid = to_32(registry_id, "registry_id")?;
let roid = to_32(role_id, "role_id")?;
let pres = bbs::Presentation::from_bytes(proof_bytes.to_vec());
match self.inner.verify_presentation(&rid, &roid, &pres, nonce) {
Ok(()) => Ok(true),
Err(crate::Error::InvalidPresentation | crate::Error::RoleNotRegistered | crate::Error::RegistryNotFound) => Ok(false),
Err(e) => Err(to_py_err(e)),
}
}
fn queue_issuance_request(
&mut self,
registry_id: &[u8],
request_id: &[u8],
role_id: &[u8],
eph_pk: &[u8],
encrypted_commitment: &[u8],
) -> PyResult<()> {
let rid = to_32(registry_id, "registry_id")?;
let req_id = to_32(request_id, "request_id")?;
let roid = to_32(role_id, "role_id")?;
let epk = to_32(eph_pk, "eph_pk")?;
self.inner.queue_issuance_request(&rid, req_id, roid, epk, encrypted_commitment.to_vec())
.map_err(to_py_err)
}
fn take_pending_requests(&mut self, py: Python<'_>, registry_id: &[u8]) -> PyResult<Vec<PyObject>> {
let rid = to_32(registry_id, "registry_id")?;
let items = self.inner.take_pending_requests(&rid).map_err(to_py_err)?;
let result: Vec<PyObject> = items.into_iter().map(|(req_id, req)| {
let tuple = (
PyBytes::new(py, &req_id).into_any(),
PyBytes::new(py, &req.role_id).into_any(),
PyBytes::new(py, &req.eph_pk).into_any(),
PyBytes::new(py, &req.encrypted_commitment).into_any(),
);
tuple.into_pyobject(py).unwrap().into_any().unbind()
}).collect();
Ok(result)
}
fn grant_credential(&mut self, registry_id: &[u8], request_id: &[u8], encrypted_blind_sig: &[u8]) -> PyResult<()> {
let rid = to_32(registry_id, "registry_id")?;
let req_id = to_32(request_id, "request_id")?;
self.inner.grant_credential(&rid, req_id, encrypted_blind_sig.to_vec())
.map_err(to_py_err)
}
fn take_granted_credential<'py>(&mut self, py: Python<'py>, registry_id: &[u8], request_id: &[u8]) -> PyResult<Option<Bound<'py, PyBytes>>> {
let rid = to_32(registry_id, "registry_id")?;
let req_id = to_32(request_id, "request_id")?;
let result = self.inner.take_granted_credential(&rid, &req_id).map_err(to_py_err)?;
Ok(result.map(|data| PyBytes::new(py, &data)))
}
}
// ── Issuance E2E Encryption ──────────────────────────────────────────
#[pyclass(name = "IssuanceKeypair")]
pub struct PyIssuanceKeypair {
inner: issuance::IssuanceKeypair,
}
#[pymethods]
impl PyIssuanceKeypair {
#[new]
fn new() -> Self {
PyIssuanceKeypair {
inner: issuance::IssuanceKeypair::generate(&mut OsRng),
}
}
#[staticmethod]
fn from_secret(bytes: &[u8]) -> PyResult<Self> {
if bytes.len() != 32 {
return Err(PyValueError::new_err("issuance secret key must be 32 bytes"));
}
let arr: [u8; 32] = bytes.try_into().unwrap();
Ok(PyIssuanceKeypair {
inner: issuance::IssuanceKeypair::from_secret_bytes(&arr),
})
}
fn public_key_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.public_key_bytes())
}
fn secret_bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> {
PyBytes::new(py, &self.inner.secret_bytes())
}
fn decrypt<'py>(&self, py: Python<'py>, eph_pk: &[u8], ciphertext: &[u8]) -> PyResult<Bound<'py, PyBytes>> {
let epk = to_32(eph_pk, "eph_pk")?;
let plaintext = self.inner.decrypt(&epk, ciphertext).map_err(to_py_err)?;
Ok(PyBytes::new(py, &plaintext))
}
fn encrypt<'py>(&self, py: Python<'py>, eph_pk: &[u8], plaintext: &[u8]) -> PyResult<Bound<'py, PyBytes>> {
let epk = to_32(eph_pk, "eph_pk")?;
let ciphertext = self.inner.encrypt(&epk, plaintext).map_err(to_py_err)?;
Ok(PyBytes::new(py, &ciphertext))
}
}
#[pyfunction]
fn encrypt_for_admin<'py>(py: Python<'py>, admin_issuance_pk: &[u8], plaintext: &[u8]) -> PyResult<(Bound<'py, PyBytes>, Bound<'py, PyBytes>)> {
let pk = to_32(admin_issuance_pk, "admin_issuance_pk")?;
let (eph_pk, ciphertext) = issuance::encrypt_for_admin(OsRng, &pk, plaintext).map_err(to_py_err)?;
Ok((PyBytes::new(py, &eph_pk), PyBytes::new(py, &ciphertext)))
}
#[pyfunction]
fn decrypt_from_admin<'py>(py: Python<'py>, eph_secret: &[u8], admin_issuance_pk: &[u8], ciphertext: &[u8]) -> PyResult<Bound<'py, PyBytes>> {
let sec = to_32(eph_secret, "eph_secret")?;
let pk = to_32(admin_issuance_pk, "admin_issuance_pk")?;
let plaintext = issuance::decrypt_from_admin(&sec, &pk, ciphertext).map_err(to_py_err)?;
Ok(PyBytes::new(py, &plaintext))
}
// ── Session ──────────────────────────────────────────────────────────
#[pyclass(name = "Session")]
@ -474,6 +742,58 @@ impl PyNode {
.map_err(to_py_err)?;
Ok(PyBytes::new(py, &rid))
}
/// Complete handshake for a client-managed registry. Includes
/// registry_id in the auth packet.
fn complete_connect_managed(
&self,
pending: &mut PyPendingConnect,
response_msg: &[u8],
identity_proof: &[u8],
expected_server_pk: &PyPublicKey,
credential: &PyCredential,
registry_id: &[u8],
) -> PyResult<(PySession, Vec<u8>)> {
let p = pending
.inner
.take()
.ok_or_else(|| PyValueError::new_err("PendingConnect already consumed"))?;
if response_msg.len() != HANDSHAKE_MSG_LEN {
return Err(PyValueError::new_err("response_msg must be 32 bytes"));
}
let msg: [u8; HANDSHAKE_MSG_LEN] = response_msg.try_into().unwrap();
let rid = to_32(registry_id, "registry_id")?;
let (session, auth_packet) = self
.inner
.complete_connect_managed(
p,
&msg,
identity_proof,
&expected_server_pk.inner,
&credential.inner,
&rid,
)
.map_err(to_py_err)?;
Ok((PySession { inner: session }, auth_packet))
}
/// Verify managed-registry auth packet. Returns (registry_id, role_id).
fn verify_auth_managed<'py>(
&self,
py: Python<'py>,
session: &mut PySession,
encrypted_auth: &[u8],
manager: &PyRegistryManager,
) -> PyResult<(Bound<'py, PyBytes>, Bound<'py, PyBytes>)> {
let (reg_id, role_id) = self
.inner
.verify_auth_managed(&mut session.inner, encrypted_auth, &manager.inner)
.map_err(to_py_err)?;
Ok((PyBytes::new(py, &reg_id), PyBytes::new(py, &role_id)))
}
}
// ── Module ───────────────────────────────────────────────────────────
@ -491,8 +811,17 @@ fn _zkac(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyCredential>()?;
m.add_function(wrap_pyfunction!(prepare_blind_request, m)?)?;
m.add_function(wrap_pyfunction!(role_id, m)?)?;
// Server registry
// Server registry (static, server-configured)
m.add_class::<PyRoleRegistry>()?;
// Client-managed registries
m.add_class::<PyRegistryState>()?;
m.add_class::<PyRegistryManager>()?;
m.add_function(wrap_pyfunction!(registry_id, m)?)?;
m.add_function(wrap_pyfunction!(admin_role_id, m)?)?;
// E2E-encrypted issuance
m.add_class::<PyIssuanceKeypair>()?;
m.add_function(wrap_pyfunction!(encrypt_for_admin, m)?)?;
m.add_function(wrap_pyfunction!(decrypt_from_admin, m)?)?;
// Transport
m.add_class::<PySession>()?;
m.add_class::<PyNode>()?;

421
src/registry_manager.rs Normal file
View File

@ -0,0 +1,421 @@
use std::collections::HashMap;
use crate::credential::bbs::Presentation;
use crate::credential::registry::{self, RegistryState};
use crate::credential::roles::RoleRegistry;
use crate::{Error, Result};
/// An in-flight credential issuance request stored by the server.
pub struct IssuanceRequest {
pub role_id: [u8; 32],
pub eph_pk: [u8; 32],
pub encrypted_commitment: Vec<u8>,
}
struct StoredRegistry {
state_bytes: Vec<u8>,
state_cert_bytes: Vec<u8>,
role_cache: RoleRegistry,
pending_requests: HashMap<[u8; 32], IssuanceRequest>,
granted: HashMap<[u8; 32], Vec<u8>>,
}
/// Server-side manager for multiple client-managed registries.
///
/// Stores certified registry states, rebuilds a `RoleRegistry` cache
/// from each for fast BBS+ verification, and maintains issuance
/// request queues per registry.
pub struct RegistryManager {
registries: HashMap<[u8; 32], StoredRegistry>,
}
impl RegistryManager {
pub fn new() -> Self {
RegistryManager {
registries: HashMap::new(),
}
}
/// Accept a new registry. Verifies the BBS+ state certificate,
/// checks that version == 1, derives the registry_id, and stores
/// the state. Returns the registry_id on success.
pub fn create(
&mut self,
state_bytes: &[u8],
state_cert_bytes: &[u8],
) -> Result<[u8; 32]> {
let state = RegistryState::deserialize(state_bytes)?;
if state.version != 1 {
return Err(Error::RegistryVersionMismatch("initial version must be 1"));
}
if state.prev_state_hash != [0u8; 32] {
return Err(Error::RegistryStateError("initial prev_state_hash must be zeros"));
}
let cert = Presentation::from_bytes(state_cert_bytes.to_vec());
RegistryState::verify_cert(&state.admin_issuer_pk, &cert, state_bytes)?;
let rid = state.registry_id;
if self.registries.contains_key(&rid) {
return Err(Error::RegistryAlreadyExists);
}
let role_cache = build_role_cache(&state);
self.registries.insert(rid, StoredRegistry {
state_bytes: state_bytes.to_vec(),
state_cert_bytes: state_cert_bytes.to_vec(),
role_cache,
pending_requests: HashMap::new(),
granted: HashMap::new(),
});
Ok(rid)
}
/// Verify an admin BBS+ proof for a registry (the `__admin__` role
/// with epoch 0, using the session transcript hash as nonce).
pub fn verify_admin(
&self,
registry_id: &[u8; 32],
proof_bytes: &[u8],
nonce: &[u8],
) -> Result<()> {
let stored = self.registries.get(registry_id)
.ok_or(Error::RegistryNotFound)?;
let state = RegistryState::deserialize(&stored.state_bytes)?;
let admin_rid = registry::admin_role_id();
let pres = Presentation::from_bytes(proof_bytes.to_vec());
crate::credential::bbs::verify_presentation(
&state.admin_issuer_pk,
&pres,
&admin_rid,
0,
nonce,
)
.map_err(|_| Error::InvalidPresentation)
}
/// Apply a state update. The caller must have already verified
/// admin auth (via `verify_admin`). This checks the state cert,
/// version monotonicity, and hash chain continuity.
pub fn update(
&mut self,
registry_id: &[u8; 32],
state_bytes: &[u8],
state_cert_bytes: &[u8],
) -> Result<()> {
let stored = self.registries.get(registry_id)
.ok_or(Error::RegistryNotFound)?;
let new_state = RegistryState::deserialize(state_bytes)?;
if new_state.registry_id != *registry_id {
return Err(Error::RegistryStateError("registry_id mismatch in update"));
}
let old_state = RegistryState::deserialize(&stored.state_bytes)?;
if new_state.version != old_state.version + 1 {
return Err(Error::RegistryVersionMismatch("version must be old + 1"));
}
let expected_prev = RegistryState::state_hash(&stored.state_bytes);
if new_state.prev_state_hash != expected_prev {
return Err(Error::RegistryStateError("prev_state_hash does not match"));
}
let cert = Presentation::from_bytes(state_cert_bytes.to_vec());
RegistryState::verify_cert(&new_state.admin_issuer_pk, &cert, state_bytes)?;
let role_cache = build_role_cache(&new_state);
let entry = self.registries.get_mut(registry_id).unwrap();
entry.state_bytes = state_bytes.to_vec();
entry.state_cert_bytes = state_cert_bytes.to_vec();
entry.role_cache = role_cache;
Ok(())
}
/// Return the current state bytes and state certificate for a registry.
pub fn get(&self, registry_id: &[u8; 32]) -> Result<(&[u8], &[u8])> {
let stored = self.registries.get(registry_id)
.ok_or(Error::RegistryNotFound)?;
Ok((&stored.state_bytes, &stored.state_cert_bytes))
}
/// Verify a BBS+ presentation for a role in a specific registry,
/// using the cached `RoleRegistry`.
pub fn verify_presentation(
&self,
registry_id: &[u8; 32],
role_id: &[u8; 32],
presentation: &Presentation,
nonce: &[u8],
) -> Result<()> {
let stored = self.registries.get(registry_id)
.ok_or(Error::RegistryNotFound)?;
stored.role_cache.verify_presentation(role_id, presentation, nonce)
}
/// Queue a credential issuance request for a registry.
/// Returns a unique request_id.
pub fn queue_issuance_request(
&mut self,
registry_id: &[u8; 32],
request_id: [u8; 32],
role_id: [u8; 32],
eph_pk: [u8; 32],
encrypted_commitment: Vec<u8>,
) -> Result<()> {
let stored = self.registries.get_mut(registry_id)
.ok_or(Error::RegistryNotFound)?;
stored.pending_requests.insert(request_id, IssuanceRequest {
role_id,
eph_pk,
encrypted_commitment,
});
Ok(())
}
/// Drain all pending issuance requests for a registry.
pub fn take_pending_requests(
&mut self,
registry_id: &[u8; 32],
) -> Result<Vec<([u8; 32], IssuanceRequest)>> {
let stored = self.registries.get_mut(registry_id)
.ok_or(Error::RegistryNotFound)?;
let items: Vec<_> = stored.pending_requests.drain().collect();
Ok(items)
}
/// Store a granted credential response (encrypted blind signature).
pub fn grant_credential(
&mut self,
registry_id: &[u8; 32],
request_id: [u8; 32],
encrypted_blind_sig: Vec<u8>,
) -> Result<()> {
let stored = self.registries.get_mut(registry_id)
.ok_or(Error::RegistryNotFound)?;
stored.granted.insert(request_id, encrypted_blind_sig);
Ok(())
}
/// Retrieve a granted credential response. Removes it from storage
/// after retrieval (one-time pickup).
pub fn take_granted_credential(
&mut self,
registry_id: &[u8; 32],
request_id: &[u8; 32],
) -> Result<Option<Vec<u8>>> {
let stored = self.registries.get_mut(registry_id)
.ok_or(Error::RegistryNotFound)?;
Ok(stored.granted.remove(request_id))
}
pub fn has_registry(&self, registry_id: &[u8; 32]) -> bool {
self.registries.contains_key(registry_id)
}
}
impl Default for RegistryManager {
fn default() -> Self {
Self::new()
}
}
fn build_role_cache(state: &RegistryState) -> RoleRegistry {
let mut cache = RoleRegistry::new();
for role in state.roles() {
cache.register_role(role.role_id, role.issuer_pk.clone(), role.epoch);
}
cache
}
#[cfg(test)]
mod tests {
use super::*;
use crate::credential::bbs::{self, IssuerKeyPair, IssuerPublicKey, Credential};
use crate::credential::registry::{admin_role_id, RoleEntry};
fn make_admin() -> (IssuerKeyPair, Credential, IssuerPublicKey) {
let issuer = IssuerKeyPair::generate().unwrap();
let pk = issuer.public_key();
let rid = admin_role_id();
let req = bbs::prepare_blind_request().unwrap();
let sig = issuer.issue_blind(&req.commitment_with_proof, &rid, 0).unwrap();
let cred = Credential::finalize(
&sig, req.member_secret, req.prover_blind, rid, 0, &pk,
).unwrap();
(issuer, cred, pk)
}
fn create_test_registry(mgr: &mut RegistryManager) -> (IssuerKeyPair, Credential, [u8; 32]) {
let (issuer, admin_cred, pk) = make_admin();
let state = RegistryState::new(pk, [0u8; 32], 1, [0u8; 32], vec![]);
let state_bytes = state.serialize();
let cert = state.certify(&admin_cred, &state_bytes).unwrap();
let rid = mgr.create(&state_bytes, cert.to_bytes()).unwrap();
(issuer, admin_cred, rid)
}
#[test]
fn create_and_get() {
let mut mgr = RegistryManager::new();
let (_, _, rid) = create_test_registry(&mut mgr);
assert!(mgr.has_registry(&rid));
let (state_bytes, cert_bytes) = mgr.get(&rid).unwrap();
assert!(!state_bytes.is_empty());
assert!(!cert_bytes.is_empty());
}
#[test]
fn create_duplicate_rejected() {
let mut mgr = RegistryManager::new();
let (issuer, admin_cred, _) = create_test_registry(&mut mgr);
let pk = issuer.public_key();
let state = RegistryState::new(pk, [0u8; 32], 1, [0u8; 32], vec![]);
let bytes = state.serialize();
let cert = state.certify(&admin_cred, &bytes).unwrap();
assert!(mgr.create(&bytes, cert.to_bytes()).is_err());
}
#[test]
fn update_with_version_and_hash_chain() {
let mut mgr = RegistryManager::new();
let (issuer, admin_cred, rid) = create_test_registry(&mut mgr);
let pk = issuer.public_key();
let (old_bytes, _) = mgr.get(&rid).unwrap();
let prev_hash = RegistryState::state_hash(old_bytes);
let role_issuer = IssuerKeyPair::generate().unwrap();
let state2 = RegistryState::new(
pk,
[0u8; 32],
2,
prev_hash,
vec![RoleEntry {
role_id: bbs::role_id("analyst"),
issuer_pk: role_issuer.public_key(),
epoch: 1,
}],
);
let bytes2 = state2.serialize();
let cert2 = state2.certify(&admin_cred, &bytes2).unwrap();
mgr.update(&rid, &bytes2, cert2.to_bytes()).unwrap();
let (stored_bytes, _) = mgr.get(&rid).unwrap();
let stored = RegistryState::deserialize(stored_bytes).unwrap();
assert_eq!(stored.version, 2);
assert_eq!(stored.roles().len(), 1);
}
#[test]
fn update_wrong_version_rejected() {
let mut mgr = RegistryManager::new();
let (issuer, admin_cred, rid) = create_test_registry(&mut mgr);
let pk = issuer.public_key();
let (old_bytes, _) = mgr.get(&rid).unwrap();
let prev_hash = RegistryState::state_hash(old_bytes);
// Version 3 instead of 2
let state = RegistryState::new(pk, [0u8; 32], 3, prev_hash, vec![]);
let bytes = state.serialize();
let cert = state.certify(&admin_cred, &bytes).unwrap();
assert!(mgr.update(&rid, &bytes, cert.to_bytes()).is_err());
}
#[test]
fn update_wrong_prev_hash_rejected() {
let mut mgr = RegistryManager::new();
let (issuer, admin_cred, rid) = create_test_registry(&mut mgr);
let pk = issuer.public_key();
let state = RegistryState::new(pk, [0u8; 32], 2, [0xffu8; 32], vec![]);
let bytes = state.serialize();
let cert = state.certify(&admin_cred, &bytes).unwrap();
assert!(mgr.update(&rid, &bytes, cert.to_bytes()).is_err());
}
#[test]
fn verify_role_presentation_after_update() {
let mut mgr = RegistryManager::new();
let (issuer, admin_cred, rid) = create_test_registry(&mut mgr);
let pk = issuer.public_key();
let (old_bytes, _) = mgr.get(&rid).unwrap();
let prev_hash = RegistryState::state_hash(old_bytes);
let role_issuer = IssuerKeyPair::generate().unwrap();
let role_pk = role_issuer.public_key();
let analyst_rid = bbs::role_id("analyst");
let state2 = RegistryState::new(
pk,
[0u8; 32],
2,
prev_hash,
vec![RoleEntry {
role_id: analyst_rid,
issuer_pk: role_pk.clone(),
epoch: 1,
}],
);
let bytes2 = state2.serialize();
let cert2 = state2.certify(&admin_cred, &bytes2).unwrap();
mgr.update(&rid, &bytes2, cert2.to_bytes()).unwrap();
// Issue a credential for the analyst role
let req = bbs::prepare_blind_request().unwrap();
let sig = role_issuer.issue_blind(&req.commitment_with_proof, &analyst_rid, 1).unwrap();
let cred = Credential::finalize(
&sig, req.member_secret, req.prover_blind, analyst_rid, 1, &role_pk,
).unwrap();
let nonce = b"session-transcript";
let pres = cred.present(nonce).unwrap();
mgr.verify_presentation(&rid, &analyst_rid, &pres, nonce).unwrap();
}
#[test]
fn issuance_queue_roundtrip() {
let mut mgr = RegistryManager::new();
let (_, _, rid) = create_test_registry(&mut mgr);
let req_id = [1u8; 32];
let role = bbs::role_id("ops");
mgr.queue_issuance_request(&rid, req_id, role, [2u8; 32], vec![42, 43]).unwrap();
let pending = mgr.take_pending_requests(&rid).unwrap();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].0, req_id);
assert_eq!(pending[0].1.encrypted_commitment, vec![42, 43]);
// Queue is drained
let empty = mgr.take_pending_requests(&rid).unwrap();
assert!(empty.is_empty());
// Grant and retrieve
mgr.grant_credential(&rid, req_id, vec![99, 100]).unwrap();
let resp = mgr.take_granted_credential(&rid, &req_id).unwrap();
assert_eq!(resp, Some(vec![99, 100]));
let gone = mgr.take_granted_credential(&rid, &req_id).unwrap();
assert!(gone.is_none());
}
#[test]
fn nonexistent_registry_errors() {
let mut mgr = RegistryManager::new();
let fake_id = [0xffu8; 32];
assert!(mgr.get(&fake_id).is_err());
assert!(mgr.update(&fake_id, &[], &[]).is_err());
assert!(mgr.verify_admin(&fake_id, &[], b"n").is_err());
}
}

View File

@ -0,0 +1,401 @@
import zkac
import pytest
def make_admin():
"""Create a BBS+ issuer, self-issue an __admin__ credential."""
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
admin_rid = zkac.admin_role_id()
req = zkac.prepare_blind_request()
sig = issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), admin_rid, 0, pk
)
return issuer, pk, cred
def create_registry(mgr, issuer, admin_cred, roles=None):
"""Create a registry in the manager, optionally with roles."""
pk = issuer.public_key()
issuance_kp = zkac.IssuanceKeypair()
issuance_pk = issuance_kp.public_key_bytes()
role_entries = []
if roles:
for name, role_pk, epoch in roles:
role_entries.append((zkac.role_id(name), role_pk, epoch))
state = zkac.RegistryState.build(pk, issuance_pk, 1, b"\x00" * 32, role_entries)
state_bytes = state.serialize()
cert = state.certify(admin_cred)
rid = mgr.create(state_bytes, cert)
return rid, issuance_kp, state
class TestRegistryState:
def test_build_and_serialize_roundtrip(self):
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
data = state.serialize()
state2 = zkac.RegistryState.deserialize(data)
assert state2.version() == 1
assert state2.registry_id() == state.registry_id()
def test_registry_id_matches(self):
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
expected = zkac.registry_id(pk)
assert state.registry_id() == expected
def test_certify_and_verify(self):
issuer, pk, admin_cred = make_admin()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
assert zkac.RegistryState.verify_cert(pk, cert, state_bytes)
def test_tampered_state_rejected(self):
issuer, pk, admin_cred = make_admin()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
tampered = bytearray(state_bytes)
tampered[32] ^= 0xFF
assert not zkac.RegistryState.verify_cert(pk, cert, bytes(tampered))
def test_wrong_admin_key_rejected(self):
issuer, pk, admin_cred = make_admin()
other_issuer = zkac.BbsIssuer()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
assert not zkac.RegistryState.verify_cert(other_issuer.public_key(), cert, state_bytes)
def test_state_certs_are_unlinkable(self):
issuer, pk, admin_cred = make_admin()
s1 = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
s2 = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\x00" * 32, [])
c1 = s1.certify(admin_cred)
c2 = s2.certify(admin_cred)
assert c1 != c2
assert zkac.RegistryState.verify_cert(pk, c1, s1.serialize())
assert zkac.RegistryState.verify_cert(pk, c2, s2.serialize())
class TestRegistryManager:
def test_create_and_get(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
assert mgr.has_registry(rid)
state_bytes, cert_bytes = mgr.get(rid)
assert len(state_bytes) > 0
assert len(cert_bytes) > 0
def test_duplicate_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
create_registry(mgr, issuer, admin_cred)
# Same issuer key → same registry_id → duplicate
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="already exists"):
mgr.create(state.serialize(), cert)
def test_update_with_hash_chain(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
old_bytes, _ = mgr.get(rid)
old_state = zkac.RegistryState.deserialize(old_bytes)
prev_hash = old_state.state_hash()
role_issuer = zkac.BbsIssuer()
state2 = zkac.RegistryState.build(
pk, b"\x00" * 32, 2, prev_hash,
[(zkac.role_id("analyst"), role_issuer.public_key(), 1)],
)
cert2 = state2.certify(admin_cred)
mgr.update(rid, state2.serialize(), cert2)
new_bytes, _ = mgr.get(rid)
new_state = zkac.RegistryState.deserialize(new_bytes)
assert new_state.version() == 2
def test_wrong_version_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
old_bytes, _ = mgr.get(rid)
prev_hash = zkac.RegistryState.deserialize(old_bytes).state_hash()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 3, prev_hash, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="version"):
mgr.update(rid, state.serialize(), cert)
def test_wrong_prev_hash_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
state = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\xff" * 32, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="prev_state_hash"):
mgr.update(rid, state.serialize(), cert)
def test_nonexistent_registry_errors(self):
mgr = zkac.RegistryManager()
fake = b"\xff" * 32
with pytest.raises(ValueError, match="not found"):
mgr.get(fake)
assert not mgr.has_registry(fake)
class TestManagedHandshake:
def test_full_managed_handshake(self):
mgr = zkac.RegistryManager()
admin_issuer, admin_pk, admin_cred = make_admin()
role_issuer = zkac.BbsIssuer()
role_pk = role_issuer.public_key()
analyst_rid = zkac.role_id("analyst")
rid, _, _ = create_registry(
mgr, admin_issuer, admin_cred,
roles=[("analyst", role_pk, 1)],
)
# Issue a credential to a user
req = zkac.prepare_blind_request()
sig = role_issuer.issue_blind(req.commitment_with_proof(), analyst_rid, 1)
user_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), analyst_rid, 1, role_pk
)
# Handshake
server_kp = zkac.Keypair()
server_pk = server_kp.public_key()
server = zkac.Node(server_kp)
client = zkac.Node(zkac.Keypair())
pending, init_msg = client.connect()
server_session, response_msg = server.accept(init_msg)
identity_proof = server.prove_identity(server_session)
client_session, auth_packet = client.complete_connect_managed(
pending, response_msg, identity_proof, server_pk, user_cred, rid
)
verified_reg_id, verified_role_id = server.verify_auth_managed(
server_session, auth_packet, mgr
)
assert verified_reg_id == rid
assert verified_role_id == analyst_rid
# Data exchange works
pkt = client_session.encrypt(b"query")
assert server_session.decrypt(pkt) == b"query"
def test_multiple_registries(self):
mgr = zkac.RegistryManager()
# Registry A
issuer_a, pk_a, cred_a = make_admin()
role_issuer_a = zkac.BbsIssuer()
rid_a, _, _ = create_registry(
mgr, issuer_a, cred_a,
roles=[("ops", role_issuer_a.public_key(), 1)],
)
# Registry B
issuer_b, pk_b, cred_b = make_admin()
role_issuer_b = zkac.BbsIssuer()
rid_b, _, _ = create_registry(
mgr, issuer_b, cred_b,
roles=[("dev", role_issuer_b.public_key(), 1)],
)
assert rid_a != rid_b
# Issue credentials in each
ops_rid = zkac.role_id("ops")
req = zkac.prepare_blind_request()
sig = role_issuer_a.issue_blind(req.commitment_with_proof(), ops_rid, 1)
ops_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), ops_rid, 1, role_issuer_a.public_key()
)
dev_rid = zkac.role_id("dev")
req2 = zkac.prepare_blind_request()
sig2 = role_issuer_b.issue_blind(req2.commitment_with_proof(), dev_rid, 1)
dev_cred = zkac.Credential.finalize(
sig2, req2.member_secret(), req2.prover_blind(), dev_rid, 1, role_issuer_b.public_key()
)
# Auth against registry A with ops credential
server_kp = zkac.Keypair()
server_sk = server_kp.secret_key_bytes()
server_pk = server_kp.public_key()
server = zkac.Node(server_kp)
client = zkac.Node(zkac.Keypair())
pending, init_msg = client.connect()
ss, resp = server.accept(init_msg)
id_proof = server.prove_identity(ss)
cs, auth = client.complete_connect_managed(
pending, resp, id_proof, server_pk, ops_cred, rid_a
)
reg_id, role_id = server.verify_auth_managed(ss, auth, mgr)
assert reg_id == rid_a
assert role_id == ops_rid
# Auth against registry B with dev credential
server2 = zkac.Node(zkac.Keypair.from_secret_key(server_sk))
client2 = zkac.Node(zkac.Keypair())
pending2, init2 = client2.connect()
ss2, resp2 = server2.accept(init2)
id2 = server2.prove_identity(ss2)
cs2, auth2 = client2.complete_connect_managed(
pending2, resp2, id2, server_pk, dev_cred, rid_b
)
reg_id2, role_id2 = server2.verify_auth_managed(ss2, auth2, mgr)
assert reg_id2 == rid_b
assert role_id2 == dev_rid
class TestIssuanceRelay:
def test_e2e_encrypted_issuance(self):
"""Full flow: user requests credential through server, admin issues."""
mgr = zkac.RegistryManager()
admin_issuer, admin_pk, admin_cred = make_admin()
issuance_kp = zkac.IssuanceKeypair()
issuance_pk = issuance_kp.public_key_bytes()
role_issuer = zkac.BbsIssuer()
analyst_rid = zkac.role_id("analyst")
state = zkac.RegistryState.build(
admin_pk, issuance_pk, 1, b"\x00" * 32,
[(analyst_rid, role_issuer.public_key(), 1)],
)
state_bytes = state.serialize()
cert = state.certify(admin_cred)
rid = mgr.create(state_bytes, cert)
# --- User side: prepare blind request and encrypt ---
req = zkac.prepare_blind_request()
commitment = req.commitment_with_proof()
eph_pk, encrypted_commitment = zkac.encrypt_for_admin(issuance_pk, commitment)
# User submits to server
request_id = b"\x01" * 32
mgr.queue_issuance_request(rid, request_id, analyst_rid, eph_pk, encrypted_commitment)
# --- Admin side: fetch, decrypt, issue, encrypt response ---
pending = mgr.take_pending_requests(rid)
assert len(pending) == 1
req_id, role_id, eph, enc_blob = pending[0]
assert req_id == request_id
# Admin decrypts the commitment
decrypted_commitment = issuance_kp.decrypt(eph, enc_blob)
assert decrypted_commitment == commitment
# Admin issues
blind_sig = role_issuer.issue_blind(decrypted_commitment, analyst_rid, 1)
# Admin encrypts response
encrypted_sig = issuance_kp.encrypt(eph, blind_sig)
mgr.grant_credential(rid, req_id, encrypted_sig)
# --- User side: fetch and decrypt ---
enc_response = mgr.take_granted_credential(rid, request_id)
assert enc_response is not None
# User decrypts — needs the ephemeral secret.
# In the real flow, the user saves the StaticSecret bytes.
# encrypt_for_admin uses EphemeralSecret internally, so we
# demonstrate the decrypt_from_admin path with a full manual flow:
# (The encrypt_for_admin function doesn't expose the secret, so
# in production, the Python side would use IssuanceKeypair or
# a manual DH. For this test, verify the admin-side roundtrip.)
decrypted_sig = issuance_kp.decrypt(eph_pk, enc_response)
assert decrypted_sig == blind_sig
def test_server_cannot_substitute_commitment(self):
"""Server cannot forge valid commitments because it can't decrypt."""
admin_kp = zkac.IssuanceKeypair()
admin_pk = admin_kp.public_key_bytes()
plaintext = b"real commitment data"
eph_pk, ciphertext = zkac.encrypt_for_admin(admin_pk, plaintext)
# Server tries to decrypt with a different key — fails
attacker_kp = zkac.IssuanceKeypair()
with pytest.raises(ValueError):
attacker_kp.decrypt(eph_pk, ciphertext)
# Tampering with ciphertext — fails
tampered = bytearray(ciphertext)
tampered[-1] ^= 0xFF
with pytest.raises(ValueError):
admin_kp.decrypt(eph_pk, bytes(tampered))
def test_issuance_keypair_serialization(self):
kp = zkac.IssuanceKeypair()
secret = kp.secret_bytes()
pk = kp.public_key_bytes()
kp2 = zkac.IssuanceKeypair.from_secret(secret)
assert kp2.public_key_bytes() == pk
class TestAdminAuth:
def test_verify_admin_bbs_proof(self):
"""Admin can prove __admin__ role to the server via BBS+."""
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
nonce = b"session-transcript-hash"
admin_rid = zkac.admin_role_id()
proof = admin_cred.present(nonce)
assert mgr.verify_admin(rid, proof, nonce)
def test_non_admin_rejected(self):
"""A credential for a different role cannot pass admin verification."""
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
role_issuer = zkac.BbsIssuer()
rid, _, _ = create_registry(
mgr, issuer, admin_cred,
roles=[("user", role_issuer.public_key(), 1)],
)
# Issue a non-admin credential
user_rid = zkac.role_id("user")
req = zkac.prepare_blind_request()
sig = role_issuer.issue_blind(req.commitment_with_proof(), user_rid, 1)
user_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(),
user_rid, 1, role_issuer.public_key()
)
nonce = b"nonce"
proof = user_cred.present(nonce)
assert not mgr.verify_admin(rid, proof, nonce)