import socket import threading import zkac from zkac.udp import ( FramedSession, client_handshake, read_datagram, server_handshake, write_datagram, ) def _make_credential(): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) return pk, rid, cred class TestFraming: def test_write_read_connected(self): a, b = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM) try: payload = b"udp-framed" write_datagram(a, payload) assert read_datagram(b) == payload finally: a.close() b.close() class TestHandshakeOverUdp: def test_full_handshake_localhost(self): pk, rid, cred = _make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) server_kp = zkac.Keypair() server_pk = server_kp.public_key() client_kp = zkac.Keypair() ready = threading.Event() port_holder: list[int] = [] err: list[BaseException] = [] def run_server(): srv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) try: srv.bind(("127.0.0.1", 0)) port_holder.append(srv.getsockname()[1]) ready.set() node = zkac.Node(server_kp) session, verified, _addr = server_handshake(srv, node, reg) assert verified == rid framed = FramedSession(srv, session) framed.send(b"pong: " + framed.recv()) except BaseException as e: err.append(e) finally: srv.close() t = threading.Thread(target=run_server, daemon=True) t.start() ready.wait() cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: sess = client_handshake( cli, ("127.0.0.1", port_holder[0]), zkac.Node(client_kp), server_pk, cred ) cf = FramedSession(cli, sess) cf.send(b"ping") assert cf.recv() == b"pong: ping" finally: cli.close() t.join(timeout=5.0) assert not err, err