import socket import threading import pytest import zkac from zkac.tcp import ( FramedSession, MAX_TCP_FRAME_BYTES, client_handshake, read_frame, server_handshake, write_frame, ) def _make_credential(): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) return issuer, pk, rid, cred class TestFraming: def test_read_write_roundtrip(self): a, b = socket.socketpair() try: payload = b"hello" * 400 write_frame(a, payload) assert read_frame(b) == payload finally: a.close() b.close() def test_oversized_length_rejected(self): a, b = socket.socketpair() try: a.sendall((MAX_TCP_FRAME_BYTES + 1).to_bytes(4, "little")) with pytest.raises(ValueError, match="exceeds maximum"): read_frame(b) finally: a.close() b.close() class TestHandshakeOverTcp: def test_full_handshake_matching_keys(self): _, pk, rid, cred = _make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) client_sock, server_sock = socket.socketpair() server_kp = zkac.Keypair() server_pk = server_kp.public_key() def run_server(): try: srv = zkac.Node(server_kp) s, verified = server_handshake(server_sock, srv, reg) assert verified == rid pkt = s.encrypt(b"admin command") write_frame(server_sock, pkt) finally: server_sock.close() t = threading.Thread(target=run_server) t.start() try: cli = zkac.Node(zkac.Keypair()) session = client_handshake(client_sock, cli, server_pk, cred) wire = read_frame(client_sock) assert session.decrypt(wire) == b"admin command" finally: client_sock.close() t.join(timeout=5) assert not t.is_alive() class TestFramedSession: def test_framed_encrypt_roundtrip(self): _, pk, rid, cred = _make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) client_sock, server_sock = socket.socketpair() server_kp = zkac.Keypair() server_pk = server_kp.public_key() def run_server(): try: srv = zkac.Node(server_kp) session, _ = server_handshake(server_sock, srv, reg) framed = FramedSession(server_sock, session) framed.send(b"reply") finally: server_sock.close() t = threading.Thread(target=run_server) t.start() try: cli = zkac.Node(zkac.Keypair()) session = client_handshake(client_sock, cli, server_pk, cred) framed = FramedSession(client_sock, session) assert framed.recv() == b"reply" finally: client_sock.close() t.join(timeout=5) assert not t.is_alive()