"""Client-side operations for registry management and direct P2P grants.""" from __future__ import annotations import base64 import json import os import socket import struct import time import zkac from zkac.tcp import FramedSession, client_handshake_anon, server_handshake_anon from . import store DEFAULT_CONNECT_TIMEOUT_S = 8.0 def _b64(data: bytes) -> str: return base64.b64encode(data).decode() def _unb64(s: str) -> bytes: return base64.b64decode(s) def _parse_server(server: str) -> tuple[str, int]: host, sep, port_s = server.rpartition(":") if not sep: raise ValueError( f"invalid server {server!r}: expected host:port (e.g. 127.0.0.1:9800). " "That is the TCP address of the running node, not the userid from " "`zkac-node serve `." ) try: port = int(port_s, 10) except ValueError as e: raise ValueError( f"invalid server {server!r}: port after ':' must be a number (host:port)" ) from e if not 1 <= port <= 65535: raise ValueError(f"invalid server port {port}") return (host or "127.0.0.1"), port def _parse_host_port(value: str, name: str) -> tuple[str, int]: host, sep, port_s = value.rpartition(":") if not sep: raise ValueError(f"{name} must be host:port") try: port = int(port_s, 10) except ValueError as e: raise ValueError(f"{name} port must be numeric") from e if not 1 <= port <= 65535: raise ValueError(f"{name} port out of range: {port}") return (host or "127.0.0.1"), port def _proxy_target() -> tuple[str, int] | None: raw = os.environ.get("ZKAC_SOCKS5_PROXY", "").strip() if not raw: return None return _parse_host_port(raw, "ZKAC_SOCKS5_PROXY") def _is_i2p_host(host: str) -> bool: return host.strip().lower().endswith(".i2p") def _maybe_i2p_pin_hint(userid: str, server: str) -> str: host, _port = _parse_server(server) is_i2p_target = _is_i2p_host(host) known = store.known_servers(userid) candidates: list[str] = [] for known_server in known: try: known_host, _ = _parse_server(known_server) except ValueError: continue if _is_i2p_host(known_host) == is_i2p_target: continue candidates.append(known_server) if not candidates: return "" choices = ", ".join(sorted(candidates)[:3]) return ( " You appear to have pins under a different address style " f"({choices}). Pins are exact host:port keys; pin and connect with the same endpoint string." ) def _recv_exact(sock: socket.socket, n: int) -> bytes: buf = bytearray() while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("connection closed during SOCKS5 handshake") buf.extend(chunk) return bytes(buf) def _socks5_connect(sock: socket.socket, host: str, port: int): sock.sendall(b"\x05\x01\x00") hello = _recv_exact(sock, 2) if hello != b"\x05\x00": raise RuntimeError("SOCKS5 proxy requires unsupported authentication") host_b = host.encode("idna") if len(host_b) > 255: raise ValueError("destination host too long for SOCKS5") req = b"\x05\x01\x00\x03" + bytes([len(host_b)]) + host_b + struct.pack(">H", port) sock.sendall(req) head = _recv_exact(sock, 4) if head[0] != 0x05: raise RuntimeError("invalid SOCKS5 proxy reply") if head[1] != 0x00: raise RuntimeError(f"SOCKS5 connect failed (code=0x{head[1]:02x})") atyp = head[3] if atyp == 0x01: _ = _recv_exact(sock, 4 + 2) elif atyp == 0x03: ln = _recv_exact(sock, 1)[0] _ = _recv_exact(sock, ln + 2) elif atyp == 0x04: _ = _recv_exact(sock, 16 + 2) else: raise RuntimeError("invalid SOCKS5 address type in reply") def _connect_with_timeout(address: tuple[str, int], timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S) -> socket.socket: sock = socket.create_connection(address, timeout=timeout_s) sock.settimeout(None) return sock def _connect(host: str, port: int, *, connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S) -> socket.socket: proxy = _proxy_target() if _is_i2p_host(host) and proxy is None: raise RuntimeError( "destination is .i2p but ZKAC_SOCKS5_PROXY is not set. " "Set ZKAC_SOCKS5_PROXY=127.0.0.1:4447 (or your I2P SOCKS endpoint)." ) # Only route through SOCKS5 for I2P destinations. This keeps local/direct # peer delivery (e.g. 127.0.0.1) working even when ZKAC_SOCKS5_PROXY is set. if proxy is None or not _is_i2p_host(host): return _connect_with_timeout((host, port), connect_timeout_s) sock = _connect_with_timeout(proxy, connect_timeout_s) try: _socks5_connect(sock, host, port) return sock except Exception: sock.close() raise def net_check( target: str, timeout_s: float = 8.0, *, handshake: bool = False, userid: str | None = None, server_key_hex: str | None = None, ) -> dict: """Connectivity diagnostic for direct/server endpoints, with optional SOCKS5 and handshake.""" host, port = _parse_server(target) proxy = _proxy_target() if _is_i2p_host(host) and proxy is None: return { "ok": False, "target": target, "via": "direct", "error": ( "destination is .i2p but ZKAC_SOCKS5_PROXY is not set; " "configure your I2P SOCKS proxy (for example 127.0.0.1:4447)" ), } use_proxy = proxy is not None and _is_i2p_host(host) via = "direct" if not use_proxy else f"socks5:{proxy[0]}:{proxy[1]}" sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(timeout_s) try: if not use_proxy: sock.connect((host, port)) else: assert proxy is not None sock.connect(proxy) _socks5_connect(sock, host, port) result = {"ok": True, "target": target, "via": via} if handshake: if server_key_hex and userid: return { "ok": False, "target": target, "via": via, "error": "use either server_key_hex or userid for handshake, not both", } if server_key_hex: server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_key_hex)) elif userid: server_pk = _resolve_server_pk(userid, target) else: return { "ok": False, "target": target, "via": via, "error": "handshake check requires --key or --userid ", } node = zkac.Node(zkac.Keypair()) _ = client_handshake_anon(sock, node, server_pk) result["handshake"] = "ok" return result except Exception as exc: return {"ok": False, "target": target, "via": via, "error": str(exc)} finally: sock.close() def parse_spec(spec: str) -> tuple[str, str, str]: """Parse 'host:port:registry_id:role' into (server, registry_id, role).""" parts = spec.rsplit(":", 2) if len(parts) != 3: raise ValueError(f"invalid spec {spec!r}, expected host:port:registry_id:role") return parts[0], parts[1], parts[2] def _resolve_server_pk(userid: str, server: str) -> zkac.PublicKey: pin = store.load_server_pin(userid, server) if pin is None: hint = _maybe_i2p_pin_hint(userid, server) raise RuntimeError( f"no pinned transport key for {server!r} under client {userid!r} " f"(pins are per client identity in ~/.zkac/{userid}/, not the userid " "passed to `zkac-node serve <…>` on the server). " f"Run: zkac-node server pin {userid} {server} --key ." f"{hint}" ) return zkac.PublicKey.from_bytes(_unb64(pin["server_public_key_b64"])) def _local_node(userid: str) -> zkac.Node: ident = store.load_identity(userid) keypair = zkac.Keypair.from_secret_key(ident["transport_sk"]) return zkac.Node(keypair) def _mgmt_connect(userid: str, server: str) -> tuple[socket.socket, FramedSession, bytes]: host, port = _parse_server(server) sock = _connect(host, port, connect_timeout_s=DEFAULT_CONNECT_TIMEOUT_S) server_pk = _resolve_server_pk(userid, server) node = _local_node(userid) session = client_handshake_anon(sock, node, server_pk) framed = FramedSession(sock, session) framed.send(json.dumps({"op": "mgmt"}).encode()) return sock, framed, bytes(session.transcript_hash()) def _mgmt_cmd(framed: FramedSession, cmd: dict) -> dict: framed.send(json.dumps(cmd).encode()) return json.loads(framed.recv()) def _mgmt_single( userid: str, server: str, cmd: dict, *, auth_registry_id: str | None = None, admin_cred: zkac.Credential | None = None, ) -> dict: sock, framed, transcript_hash = _mgmt_connect(userid, server) try: if auth_registry_id is not None: if admin_cred is None: raise RuntimeError("admin credential required for authenticated management command") cmd = dict(cmd) cmd["auth_registry_id"] = auth_registry_id cmd["admin_proof_b64"] = _b64(admin_cred.present(transcript_hash)) return _ok(_mgmt_cmd(framed, cmd)) finally: sock.close() def _ok(resp: dict) -> dict: if resp.get("error"): raise RuntimeError(resp["error"]) return resp def _validated_admin_update_base( userid: str, server: str, registry_id_hex: str, ) -> tuple[dict, zkac.BbsPublicKey, zkac.Credential, bytes, int]: admin_data = store.load_admin(userid, registry_id_hex) _bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data) cur = _mgmt_single( userid, server, {"cmd": "get_registry", "registry_id": registry_id_hex}, auth_registry_id=registry_id_hex, admin_cred=admin_cred, ) old_state = zkac.RegistryState.deserialize(_unb64(cur["state_bytes_b64"])) prev_hash = bytes(old_state.state_hash()) server_version = old_state.version() local_version = admin_data.get("last_known_version") local_hash_b64 = admin_data.get("last_known_state_hash_b64") if local_version != server_version: raise RuntimeError( "local admin metadata is stale versus server state version; " "refetch/synchronize admin metadata before updating" ) if not isinstance(local_hash_b64, str) or _unb64(local_hash_b64) != prev_hash: raise RuntimeError( "local admin metadata state hash mismatch; refusing update to avoid accidental role/state clobber" ) return admin_data, bbs_pk, admin_cred, prev_hash, server_version + 1 def _normalize_role_epochs(all_roles: list[str], raw_role_epochs: object) -> dict[str, int]: role_epochs: dict[str, int] = {} if isinstance(raw_role_epochs, dict): for role in all_roles: value = raw_role_epochs.get(role, 1) role_epochs[role] = max(int(value), 1) else: for role in all_roles: role_epochs[role] = 1 return role_epochs # ── Public operations ──────────────────────────────────────────────── def create_registry(userid: str, server: str, role_names: list[str]) -> str: identity = store.load_identity(userid) admin_mat = store.new_admin_material() bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_mat) role_entries = [(zkac.role_id(name), bbs_pk, 1) for name in role_names] state = zkac.RegistryState.build( bbs_pk, identity["issuance_pk"], 1, b"\x00" * 32, role_entries, ) state_bytes = state.serialize() state_cert = state.certify(admin_cred) registry_id = state.registry_id() resp = _mgmt_single(userid, server, { "cmd": "create_registry", "state_bytes_b64": _b64(state_bytes), "state_cert_b64": _b64(bytes(state_cert)), }, auth_registry_id=registry_id.hex(), admin_cred=admin_cred) rid_hex = resp["registry_id"] store.save_admin(userid, rid_hex, { "server": server, "roles": role_names, "role_epochs": {name: 1 for name in role_names}, "last_known_version": 1, "last_known_state_hash_b64": _b64(state.state_hash()), **admin_mat, }) return rid_hex def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: list[str]): admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base( userid, server, registry_id_hex, ) identity = store.load_identity(userid) old_roles = admin_data.get("roles", []) all_roles = list(old_roles) + [r for r in add_roles if r not in old_roles] role_epochs = _normalize_role_epochs(all_roles, admin_data.get("role_epochs", {})) role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in all_roles] new_state = zkac.RegistryState.build( bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries, ) new_cert = new_state.certify(admin_cred) _mgmt_single(userid, server, { "cmd": "update_registry", "registry_id": registry_id_hex, "state_bytes_b64": _b64(new_state.serialize()), "state_cert_b64": _b64(bytes(new_cert)), }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) admin_data["roles"] = all_roles for name in all_roles: role_epochs.setdefault(name, 1) admin_data["role_epochs"] = role_epochs admin_data["last_known_version"] = new_version admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash()) store.save_admin(userid, registry_id_hex, admin_data) def revoke_registry(userid: str, server: str, registry_id_hex: str, role_name: str | None = None): admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base( userid, server, registry_id_hex, ) identity = store.load_identity(userid) roles = admin_data.get("roles", []) if not roles: raise RuntimeError("registry has no roles to revoke") role_epochs = _normalize_role_epochs(roles, admin_data.get("role_epochs", {})) if role_name is None: for role in roles: role_epochs[role] += 1 else: if role_name not in roles: raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})") role_epochs[role_name] += 1 role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in roles] new_state = zkac.RegistryState.build( bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries, ) new_cert = new_state.certify(admin_cred) _mgmt_single(userid, server, { "cmd": "update_registry", "registry_id": registry_id_hex, "state_bytes_b64": _b64(new_state.serialize()), "state_cert_b64": _b64(bytes(new_cert)), }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) admin_data["role_epochs"] = role_epochs admin_data["last_known_version"] = new_version admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash()) store.save_admin(userid, registry_id_hex, admin_data) def get_registry(userid: str, server: str, registry_id_hex: str) -> dict: admin_data = store.load_admin(userid, registry_id_hex) _bbs_issuer, _bbs_pk, admin_cred = store.reconstruct_admin(admin_data) return _mgmt_single(userid, server, { "cmd": "get_registry", "registry_id": registry_id_hex, }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) def list_own_registries(userid: str) -> list[dict]: result = [] for rid in store.list_admin_registries(userid): data = store.load_admin(userid, rid) result.append({ "registry_id": rid, "server": data.get("server", "?"), "roles": data.get("roles", []), }) return result def grant_p2p( userid: str, server: str, registry_id_hex: str, role_name: str, recipient_pk_hex: str, recipient_grant_token_b64: str, peer: str, peer_transport_pk_hex: str, ) -> dict: admin_data = store.load_admin(userid, registry_id_hex) roles = admin_data.get("roles", []) if role_name not in roles: raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})") bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data) role_rid = zkac.role_id(role_name) role_epochs = admin_data.get("role_epochs", {}) if not isinstance(role_epochs, dict) or role_name not in role_epochs: raise RuntimeError( f"missing epoch metadata for role {role_name!r}; " "refresh local admin metadata before granting" ) epoch = int(role_epochs[role_name]) req = zkac.prepare_blind_request() blind_sig = bbs_issuer.issue_blind(req.commitment_with_proof(), role_rid, epoch) payload = json.dumps({ "registry_id": registry_id_hex, "role_name": role_name, "epoch": epoch, "issuer_pk_b64": _b64(bbs_pk.to_bytes()), "blind_sig_b64": _b64(blind_sig), "member_secret_b64": _b64(req.member_secret()), "prover_blind_b64": _b64(req.prover_blind()), }).encode() recipient_pk = bytes.fromhex(recipient_pk_hex) eph_kp = zkac.IssuanceKeypair() ciphertext = eph_kp.encrypt(recipient_pk, payload) reg = _mgmt_single( userid, server, {"cmd": "get_registry", "registry_id": registry_id_hex}, auth_registry_id=registry_id_hex, admin_cred=admin_cred, ) host, port = _parse_server(peer) peer_transport_pk = zkac.PublicKey.from_bytes(bytes.fromhex(peer_transport_pk_hex)) sock = _connect(host, port, connect_timeout_s=DEFAULT_CONNECT_TIMEOUT_S) try: session = client_handshake_anon(sock, _local_node(userid), peer_transport_pk) framed = FramedSession(sock, session) transcript_hash = bytes(session.transcript_hash()) admin_proof = admin_cred.present(transcript_hash) resp = _ok(json.loads(framed.recv())) if resp.get("op") != "ready_for_grant": raise RuntimeError("peer did not accept grant session") framed.send(json.dumps({ "op": "p2p_grant", "grant_token_b64": recipient_grant_token_b64, "registry_id": registry_id_hex, "registry_state_bytes_b64": reg["state_bytes_b64"], "registry_state_cert_b64": reg["state_cert_b64"], "role_name": role_name, "eph_pk_b64": _b64(eph_kp.public_key_bytes()), "ciphertext_b64": _b64(ciphertext), "admin_proof_b64": _b64(admin_proof), }).encode()) ack = _ok(json.loads(framed.recv())) finally: sock.close() return {"status": ack.get("status", "ok"), "peer": peer} def receive_p2p(userid: str, host: str, port: int, *, timeout_s: float = 60.0) -> dict: ident = store.load_identity(userid) receiver_kp = zkac.IssuanceKeypair.from_secret(ident["issuance_sk"]) expected_grant_token_b64 = _b64(ident["grant_token"]) listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM) listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) listener.bind((host, port)) listener.listen(1) listener.settimeout(timeout_s) deadline = time.monotonic() + timeout_s try: while True: remaining = deadline - time.monotonic() if remaining <= 0: raise RuntimeError("timed out waiting for authenticated grant sender") listener.settimeout(remaining) conn, _addr = listener.accept() try: conn.settimeout(min(remaining, 30.0)) session = server_handshake_anon(conn, _local_node(userid)) framed = FramedSession(conn, session) framed.send(json.dumps({"ok": True, "op": "ready_for_grant"}).encode()) msg = _ok(json.loads(framed.recv())) if msg.get("op") != "p2p_grant": raise RuntimeError("unexpected p2p message") if msg.get("grant_token_b64") != expected_grant_token_b64: raise RuntimeError("grant pairing token mismatch") registry_id_hex = msg["registry_id"] expected_role_name = msg.get("role_name") if not isinstance(expected_role_name, str) or not expected_role_name: raise RuntimeError("grant message missing required role_name") state_bytes = _unb64(msg["registry_state_bytes_b64"]) state_cert = _unb64(msg["registry_state_cert_b64"]) mgr = zkac.RegistryManager() restored_registry_id = mgr.restore(state_bytes, state_cert).hex() if restored_registry_id != registry_id_hex: raise RuntimeError("registry snapshot does not match announced registry_id") if not mgr.verify_admin( bytes.fromhex(registry_id_hex), _unb64(msg["admin_proof_b64"]), bytes(session.transcript_hash()), ): raise RuntimeError("sender admin proof failed") payload = json.loads( receiver_kp.decrypt(_unb64(msg["eph_pk_b64"]), _unb64(msg["ciphertext_b64"])) ) if payload["registry_id"] != registry_id_hex: raise RuntimeError("grant payload registry_id does not match authenticated registry") if expected_role_name and payload["role_name"] != expected_role_name: raise RuntimeError("grant payload role does not match announced role") cred_data = { "blind_sig_b64": payload["blind_sig_b64"], "member_secret_b64": payload["member_secret_b64"], "prover_blind_b64": payload["prover_blind_b64"], "role_name": payload["role_name"], "epoch": payload["epoch"], "issuer_pk_b64": payload["issuer_pk_b64"], } cred = store.reconstruct_credential(cred_data) nonce = bytes(session.transcript_hash()) cred_proof = cred.present(nonce) role_id = zkac.role_id(payload["role_name"]) if not mgr.verify_presentation( bytes.fromhex(registry_id_hex), role_id, cred_proof, nonce, ): raise RuntimeError("grant credential does not verify against certified registry state") store.save_credential(userid, registry_id_hex, payload["role_name"], cred_data) framed.send(json.dumps({"ok": True, "status": "stored"}).encode()) return {"registry_id": registry_id_hex, "role": payload["role_name"]} except (RuntimeError, ValueError, KeyError, json.JSONDecodeError): # Keep listening until timeout; this prevents first-connector DoS. continue finally: conn.close() finally: listener.close() def authenticate(userid: str, registry_id_hex: str, role_name: str, server: str | None = None) -> dict: admin_data = None try: admin_data = store.load_admin(userid, registry_id_hex) except FileNotFoundError: pass if server is None: if admin_data and admin_data.get("server"): server = admin_data["server"] else: raise RuntimeError("server address required (--server host:port)") cred_data = store.load_credential_data(userid, registry_id_hex, role_name) cred = store.reconstruct_credential(cred_data) server_pk = _resolve_server_pk(userid, server) node = zkac.Node(zkac.Keypair()) host, port = _parse_server(server) sock = _connect(host, port, connect_timeout_s=DEFAULT_CONNECT_TIMEOUT_S) try: session = client_handshake_anon(sock, node, server_pk) framed = FramedSession(sock, session) transcript_hash = bytes(session.transcript_hash()) auth_proof = cred.present(transcript_hash) role_rid = zkac.role_id(role_name) framed.send(json.dumps({ "op": "auth", "registry_id": registry_id_hex, "role_id": role_rid.hex(), "bbs_auth_b64": _b64(auth_proof), }).encode()) return json.loads(framed.recv()) finally: sock.close()