#!/usr/bin/env python3 """ ZKAC client for client-managed registries. 1. Issues a credential locally (admin issues to self — in production this would go through the server's E2E-encrypted issuance relay). 2. Verifies the registry state certificate. 3. Authenticates via managed-registry handshake. 4. Sends a JSON request over the encrypted session. Usage: python client_managed.py --role analyst python client_managed.py --role operator """ from __future__ import annotations import argparse import base64 import json import socket from pathlib import Path import zkac from zkac.tcp import FramedSession, client_handshake_managed def main() -> None: ap = argparse.ArgumentParser(description="ZKAC managed-registry client") ap.add_argument("--creds-dir", type=Path, default=Path(__file__).resolve().parent / "creds") ap.add_argument("--role", default="analyst", choices=["analyst", "operator"]) ap.add_argument("--host", default="127.0.0.1") ap.add_argument("--port", type=int, default=9877) args = ap.parse_args() creds_dir: Path = args.creds_dir admin_data = json.loads((creds_dir / "managed_admin.json").read_text(encoding="utf-8")) reg_data = json.loads((creds_dir / "managed_registry.json").read_text(encoding="utf-8")) transport_data = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8")) # Reconstruct admin issuer to issue a credential for the chosen role. # In production, this would be done via the E2E issuance relay. admin_issuer = zkac.BbsIssuer.from_secret_key( base64.b64decode(admin_data["admin_issuer_secret_b64"]) ) admin_pk = admin_issuer.public_key() role_rid = zkac.role_id(args.role) req = zkac.prepare_blind_request() blind_sig = admin_issuer.issue_blind(req.commitment_with_proof(), role_rid, 1) cred = zkac.Credential.finalize( blind_sig, req.member_secret(), req.prover_blind(), role_rid, 1, admin_pk ) # Verify registry state certificate before trusting it state_bytes = base64.b64decode(reg_data["state_bytes_b64"]) state_cert = base64.b64decode(reg_data["state_cert_b64"]) registry_id = bytes.fromhex(reg_data["registry_id_hex"]) issuer_pk_for_verify = zkac.BbsPublicKey.from_bytes( base64.b64decode(reg_data["admin_issuer_public_key_b64"]) ) expected_rid = zkac.registry_id(issuer_pk_for_verify) assert expected_rid == registry_id, "registry_id mismatch" assert zkac.RegistryState.verify_cert(issuer_pk_for_verify, state_cert, state_bytes), \ "state certificate verification failed" print(f"Registry state verified (registry_id={registry_id.hex()[:16]}…)") # Connect and authenticate server_pk = zkac.PublicKey.from_bytes( base64.b64decode(transport_data["server_public_key_b64"]) ) node = zkac.Node(zkac.Keypair()) sock = socket.create_connection((args.host, args.port)) try: session = client_handshake_managed( sock, node, server_pk, cred, registry_id ) framed = FramedSession(sock, session) framed.send(json.dumps({"path": "/api"}).encode()) reply = json.loads(framed.recv().decode()) print(json.dumps(reply, indent=2)) finally: sock.close() if __name__ == "__main__": main()