""" Demo-local P2P credential grant helper. The shipped ``zkac-node grant`` command pairs a sender that calls ``IssuanceKeypair.encrypt`` (HKDF info ``admin->user``) with a listener that calls ``IssuanceKeypair.decrypt`` (HKDF info ``user->admin``). Those labels intentionally differ in the protocol — they describe the user/admin direction of the issuance pipeline — so the round-trip in the same direction must use ``encrypt_for_admin`` + ``IssuanceKeypair.decrypt``, which both bind the ``user->admin`` label. This module performs the grant from the demo using the matched pair so the encrypted payload decodes correctly under ``zkac-node p2p-listen``. The wire format of the ``p2p_grant`` message is unchanged, only the encryption call. We deliberately avoid importing ``cli/zkac_cli/*``; identity, admin and pin material is read straight from the CLI's stable on-disk JSON formats via ``zkac_cli_adapter`` helpers. """ from __future__ import annotations import base64 import json import socket import time from pathlib import Path import zkac from zkac.tcp import FramedSession, client_handshake_anon import file_share_client as fsc import zkac_cli_adapter as cli def _b64(data: bytes) -> str: return base64.b64encode(data).decode("ascii") def _unb64(s: str) -> bytes: return base64.b64decode(s) def _parse_server(server: str) -> tuple[str, int]: host, sep, port_s = server.rpartition(":") if not sep: raise ValueError(f"invalid server {server!r}, expected host:port") return (host or "127.0.0.1"), int(port_s, 10) def _parse_contact_bundle(bundle: str) -> dict: s = bundle.strip() raw = base64.urlsafe_b64decode((s + "=" * (-len(s) % 4)).encode()) return json.loads(raw.decode("utf-8")) def _resolve_server_pk_hex(userid: str, server: str) -> str: """Locate the user's pinned transport key for ``server`` on disk.""" server_pk_hex = cli.load_pinned_server_key(userid, server) if not server_pk_hex: raise FileNotFoundError( f"no pinned server key for {server!r} under {userid!r}; " f"run: zkac-node server pin {userid} {server} --key " ) return server_pk_hex def _fetch_registry_state( userid: str, server: str, registry_id_hex: str, admin_cred: zkac.Credential, ) -> tuple[bytes, bytes]: """Authenticated mgmt-channel ``get_registry`` (matches the CLI's mgmt protocol).""" server_pk_hex = _resolve_server_pk_hex(userid, server) transport_secret = bytes.fromhex(cli.load_identity_secrets(userid)["transport_secret_hex"]) host, port = _parse_server(server) sock = socket.create_connection((host, port), timeout=8.0) sock.settimeout(None) try: node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret)) server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex)) session = client_handshake_anon(sock, node, server_pk) framed = FramedSession(sock, session) framed.send(json.dumps({"op": "mgmt"}).encode()) transcript_hash = bytes(session.transcript_hash()) framed.send(json.dumps({ "cmd": "get_registry", "registry_id": registry_id_hex, "auth_registry_id": registry_id_hex, "admin_proof_b64": _b64(bytes(admin_cred.present(transcript_hash))), }).encode()) resp = json.loads(framed.recv()) if resp.get("error"): raise RuntimeError(resp["error"]) return _unb64(resp["state_bytes_b64"]), _unb64(resp["state_cert_b64"]) finally: sock.close() # ── grant (sender side) ────────────────────────────────────────────── def grant_role_p2p( admin_userid: str, server: str, registry_id_hex: str, role_name: str, recipient_contact_bundle: str, *, bucket_role_grant: dict | None = None, connect_timeout_s: float = 8.0, ) -> dict: """Issue a BBS+ role credential and ship it to the recipient over an authenticated TCP session. Wire-compatible with ``zkac-node p2p-listen``: the ciphertext is produced via ``encrypt_for_admin`` so the listener's ``IssuanceKeypair.decrypt`` succeeds. """ parsed = _parse_contact_bundle(recipient_contact_bundle) recipient_issuance_pk_hex = parsed["issuance_pk_hex"] recipient_transport_pk_hex = parsed["transport_pk_hex"] recipient_grant_token_b64 = parsed["grant_token_b64"] peer = parsed.get("peer") if not peer: raise RuntimeError( "contact bundle missing peer endpoint; ask recipient to regenerate " "with `zkac-node user show --peer `" ) admin_data = cli.load_admin_material(admin_userid, registry_id_hex) roles = admin_data.get("roles", []) if role_name not in roles: raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})") role_epochs = admin_data.get("role_epochs", {}) if role_name not in role_epochs: raise RuntimeError(f"missing epoch metadata for role {role_name!r}") epoch = int(role_epochs[role_name]) bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(admin_data["bbs_issuer_secret_b64"])) bbs_pk = bbs_issuer.public_key() admin_cred = cli.load_admin_credential(admin_userid, registry_id_hex) role_rid = zkac.role_id(role_name) req = zkac.prepare_blind_request() blind_sig = bbs_issuer.issue_blind(req.commitment_with_proof(), role_rid, epoch) payload = json.dumps({ "registry_id": registry_id_hex, "role_name": role_name, "epoch": epoch, "issuer_pk_b64": _b64(bytes(bbs_pk.to_bytes())), "blind_sig_b64": _b64(bytes(blind_sig)), "member_secret_b64": _b64(bytes(req.member_secret())), "prover_blind_b64": _b64(bytes(req.prover_blind())), "bucket_role_grant": bucket_role_grant or None, }).encode() rec_pk_bytes = bytes.fromhex(recipient_issuance_pk_hex) if len(rec_pk_bytes) != 32: raise RuntimeError("recipient issuance pubkey must decode to 32 bytes") eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk_bytes, payload) state_bytes, state_cert = _fetch_registry_state( admin_userid, server, registry_id_hex, admin_cred, ) sender_transport_secret = bytes.fromhex( cli.load_identity_secrets(admin_userid)["transport_secret_hex"] ) peer_host, peer_port = _parse_server(peer) sock = socket.create_connection((peer_host, peer_port), timeout=connect_timeout_s) sock.settimeout(None) try: node = zkac.Node(zkac.Keypair.from_secret_key(sender_transport_secret)) peer_pk = zkac.PublicKey.from_bytes(bytes.fromhex(recipient_transport_pk_hex)) session = client_handshake_anon(sock, node, peer_pk) framed = FramedSession(sock, session) ready = json.loads(framed.recv()) if ready.get("error") or ready.get("op") != "ready_for_grant": raise RuntimeError(f"peer did not accept grant session: {ready}") transcript_hash = bytes(session.transcript_hash()) admin_proof = bytes(admin_cred.present(transcript_hash)) framed.send(json.dumps({ "op": "p2p_grant", "grant_token_b64": recipient_grant_token_b64, "registry_id": registry_id_hex, "registry_state_bytes_b64": _b64(state_bytes), "registry_state_cert_b64": _b64(state_cert), "role_name": role_name, "eph_pk_b64": _b64(bytes(eph_pk)), "ciphertext_b64": _b64(bytes(ciphertext)), "admin_proof_b64": _b64(admin_proof), }).encode()) ack = json.loads(framed.recv()) if ack.get("error"): raise RuntimeError(ack["error"]) return {"status": ack.get("status", "ok"), "peer": peer} finally: sock.close() # ── listen (recipient side) ────────────────────────────────────────── def _save_credential_json( userid: str, registry_id_hex: str, role_name: str, payload: dict, ) -> Path: """Write the received credential to ``ZKAC_HOME//credentials/_.json``. Mirrors the on-disk format used by ``zkac-node grant`` so subsequent ``zkac-node`` commands can pick up the credential normally. """ creds_dir = cli.zkac_home() / userid / "credentials" creds_dir.mkdir(parents=True, exist_ok=True) target = creds_dir / f"{registry_id_hex}_{role_name}.json" target.write_text(json.dumps({ "blind_sig_b64": payload["blind_sig_b64"], "member_secret_b64": payload["member_secret_b64"], "prover_blind_b64": payload["prover_blind_b64"], "role_name": payload["role_name"], "epoch": payload["epoch"], "issuer_pk_b64": payload["issuer_pk_b64"], }, indent=2)) try: target.chmod(0o600) except OSError: pass return target def listen_for_role_credential( userid: str, host: str, port: int, *, timeout_s: float = 60.0, ) -> dict: """Block until one valid grant arrives, then save the credential locally. Compatible with ``demo.file_share_credentials.grant_role_p2p`` and (because the wire format matches) also useful for testing against the existing ``zkac-node grant`` listener API. """ from zkac.tcp import server_handshake_anon secrets = cli.load_identity_secrets(userid) transport_secret = bytes.fromhex(secrets["transport_secret_hex"]) issuance_secret = bytes.fromhex(secrets["issuance_secret_hex"]) expected_token_b64 = secrets["grant_token_b64"] listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind((host, port)) listener.listen(1) deadline = time.monotonic() + timeout_s receiver_kp = zkac.IssuanceKeypair.from_secret(issuance_secret) try: while True: remaining = deadline - time.monotonic() if remaining <= 0: raise RuntimeError("timed out waiting for authenticated grant sender") listener.settimeout(remaining) conn, _addr = listener.accept() try: conn.settimeout(min(remaining, 30.0)) node = zkac.Node(zkac.Keypair.from_secret_key(transport_secret)) session = server_handshake_anon(conn, node) framed = FramedSession(conn, session) framed.send(json.dumps({"ok": True, "op": "ready_for_grant"}).encode()) msg = json.loads(framed.recv()) if msg.get("error"): raise RuntimeError(msg["error"]) if msg.get("op") != "p2p_grant": raise RuntimeError("unexpected p2p message") if msg.get("grant_token_b64") != expected_token_b64: raise RuntimeError("grant pairing token mismatch") registry_id_hex = msg["registry_id"] role_name = msg["role_name"] state_bytes = _unb64(msg["registry_state_bytes_b64"]) state_cert = _unb64(msg["registry_state_cert_b64"]) mgr = zkac.RegistryManager() restored = mgr.restore(state_bytes, state_cert).hex() if restored != registry_id_hex: raise RuntimeError("registry snapshot does not match announced registry_id") if not mgr.verify_admin( bytes.fromhex(registry_id_hex), _unb64(msg["admin_proof_b64"]), bytes(session.transcript_hash()), ): raise RuntimeError("sender admin proof failed") payload = json.loads(receiver_kp.decrypt( _unb64(msg["eph_pk_b64"]), _unb64(msg["ciphertext_b64"]), )) if payload.get("registry_id") != registry_id_hex: raise RuntimeError("payload registry_id mismatch") if payload.get("role_name") != role_name: raise RuntimeError("payload role mismatch") cred = zkac.Credential.finalize( _unb64(payload["blind_sig_b64"]), _unb64(payload["member_secret_b64"]), _unb64(payload["prover_blind_b64"]), zkac.role_id(role_name), int(payload["epoch"]), zkac.BbsPublicKey.from_bytes(_unb64(payload["issuer_pk_b64"])), ) nonce = bytes(session.transcript_hash()) if not mgr.verify_presentation( bytes.fromhex(registry_id_hex), zkac.role_id(role_name), bytes(cred.present(nonce)), nonce, ): raise RuntimeError("granted credential does not verify") _save_credential_json(userid, registry_id_hex, role_name, payload) bucket_grant = payload.get("bucket_role_grant") if isinstance(bucket_grant, dict): bucket_id = bucket_grant.get("bucket_id") eph_pk_b64 = bucket_grant.get("eph_pk_b64") ciphertext_b64 = bucket_grant.get("ciphertext_b64") if isinstance(bucket_id, str) and isinstance(eph_pk_b64, str) and isinstance(ciphertext_b64, str): fsc.save_received_role_grant( userid, registry_id_hex, role_name, bucket_id, eph_pk_b64=eph_pk_b64, ciphertext_b64=ciphertext_b64, ) framed.send(json.dumps({"ok": True, "status": "stored"}).encode()) return {"registry_id": registry_id_hex, "role": role_name} finally: conn.close() finally: listener.close()