import zkac import pytest class TestKeypairAndPublicKey: def test_keypair_generates(self): kp = zkac.Keypair() pk = kp.public_key() assert isinstance(pk, zkac.PublicKey) def test_pubkey_serialization(self): kp = zkac.Keypair() pk = kp.public_key() raw = pk.to_bytes() assert len(raw) == 32 pk2 = zkac.PublicKey.from_bytes(raw) assert pk == pk2 def test_pubkey_repr(self): kp = zkac.Keypair() r = repr(kp.public_key()) assert r.startswith("PublicKey(") assert len(r) == len("PublicKey()") + 64 def test_secret_key_roundtrip(self): kp = zkac.Keypair() sk = kp.secret_key_bytes() assert len(sk) == 32 kp2 = zkac.Keypair.from_secret_key(sk) assert kp.public_key().to_bytes() == kp2.public_key().to_bytes() def test_different_keypairs_different_pubkeys(self): pk1 = zkac.Keypair().public_key() pk2 = zkac.Keypair().public_key() assert pk1 != pk2 def test_invalid_pubkey_bytes(self): with pytest.raises(ValueError): zkac.PublicKey.from_bytes(b"\x00" * 31) with pytest.raises(ValueError): zkac.PublicKey.from_bytes(b"\xff" * 32) def test_max_proof_constant(self): assert zkac.MAX_BBS_AUTH_PROOF_BYTES > 0 class TestBbsCredentials: def test_role_id_deterministic(self): rid1 = zkac.role_id("admin") rid2 = zkac.role_id("admin") rid3 = zkac.role_id("reader") assert rid1 == rid2 assert rid1 != rid3 assert len(rid1) == 32 def test_full_blind_credential_flow(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() blind_sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( blind_sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) nonce = b"session-nonce" proof = cred.present(nonce) assert len(proof) > 0 reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) assert reg.verify_presentation(rid, proof, nonce) def test_wrong_role_rejected(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) nonce = b"nonce" proof = cred.present(nonce) reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) wrong_rid = zkac.role_id("reader") reg.register_role(wrong_rid, issuer.public_key(), 1) assert not reg.verify_presentation(wrong_rid, proof, nonce) def test_epoch_revocation(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("ops") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) nonce = b"n1" assert reg.verify_presentation(rid, cred.present(nonce), nonce) reg.set_epoch(rid, 2) nonce2 = b"n2" assert not reg.verify_presentation(rid, cred.present(nonce2), nonce2) def test_issuer_key_serialization(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() pk_bytes = pk.to_bytes() pk2 = zkac.BbsPublicKey.from_bytes(pk_bytes) assert pk.to_bytes() == pk2.to_bytes() sk_bytes = issuer.secret_key_bytes() issuer2 = zkac.BbsIssuer.from_secret_key(sk_bytes) assert issuer2.public_key().to_bytes() == pk_bytes def test_presentations_unlinkable(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) p1 = cred.present(b"nonce1") p2 = cred.present(b"nonce2") assert p1 != p2 reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) assert reg.verify_presentation(rid, p1, b"nonce1") assert reg.verify_presentation(rid, p2, b"nonce2") def test_multiple_members(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("editors") reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) nonce = b"shared-nonce" for _ in range(3): req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) proof = cred.present(nonce) assert reg.verify_presentation(rid, proof, nonce) def test_credential_role_and_epoch(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 42) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 42, pk ) assert cred.role_id() == rid assert cred.epoch() == 42 class TestSchnorrSignature: def test_sign_verify(self): kp = zkac.Keypair() pk = kp.public_key() sig = kp.sign(b"hello") assert len(sig) == 64 assert pk.verify(b"hello", sig) def test_wrong_message(self): kp = zkac.Keypair() pk = kp.public_key() sig = kp.sign(b"correct") assert not pk.verify(b"wrong", sig) def test_wrong_key(self): kp1 = zkac.Keypair() kp2 = zkac.Keypair() sig = kp1.sign(b"msg") assert not kp2.public_key().verify(b"msg", sig) def test_deterministic(self): kp = zkac.Keypair() assert kp.sign(b"same") == kp.sign(b"same") class TestNodeHandshake: def _make_credential(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() rid = zkac.role_id("admin") req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), rid, 1, pk ) return issuer, pk, rid, cred def test_full_handshake(self): _, pk, rid, cred = self._make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) client = zkac.Node(zkac.Keypair()) server_kp = zkac.Keypair() server_pk = server_kp.public_key() server = zkac.Node(server_kp) pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) client_session, auth_packet = client.complete_connect( pending, response_msg, identity_proof, server_pk, cred ) verified_rid = server.verify_auth(server_session, auth_packet, reg) assert verified_rid == rid pkt = client_session.encrypt(b"admin command") assert server_session.decrypt(pkt) == b"admin command" pkt = server_session.encrypt(b"response") assert client_session.decrypt(pkt) == b"response" def test_wrong_server_key_rejected(self): _, pk, rid, cred = self._make_credential() client = zkac.Node(zkac.Keypair()) server = zkac.Node(zkac.Keypair()) wrong_pk = zkac.Keypair().public_key() pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) with pytest.raises(ValueError, match="identity verification failed"): client.complete_connect( pending, response_msg, identity_proof, wrong_pk, cred ) def test_replay_rejected(self): _, pk, rid, cred = self._make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) client = zkac.Node(zkac.Keypair()) server_kp = zkac.Keypair() server_pk = server_kp.public_key() server = zkac.Node(server_kp) pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) client_session, auth_packet = client.complete_connect( pending, response_msg, identity_proof, server_pk, cred ) server.verify_auth(server_session, auth_packet, reg) pkt = client_session.encrypt(b"once") server_session.decrypt(pkt) with pytest.raises(ValueError, match="replay"): server_session.decrypt(pkt) def test_tampered_packet_rejected(self): _, pk, rid, cred = self._make_credential() reg = zkac.RoleRegistry() reg.register_role(rid, pk, 1) client = zkac.Node(zkac.Keypair()) server_kp = zkac.Keypair() server_pk = server_kp.public_key() server = zkac.Node(server_kp) pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) client_session, auth_packet = client.complete_connect( pending, response_msg, identity_proof, server_pk, cred ) server.verify_auth(server_session, auth_packet, reg) pkt = bytearray(client_session.encrypt(b"secret")) pkt[-1] ^= 0xFF with pytest.raises(ValueError, match="decryption"): server_session.decrypt(bytes(pkt)) def test_pending_connect_consumed(self): _, pk, rid, cred = self._make_credential() client = zkac.Node(zkac.Keypair()) server_kp = zkac.Keypair() server_pk = server_kp.public_key() server = zkac.Node(server_kp) pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) client.complete_connect( pending, response_msg, identity_proof, server_pk, cred ) with pytest.raises(ValueError, match="consumed"): client.complete_connect( pending, response_msg, identity_proof, server_pk, cred )