import zkac import pytest def make_admin(): """Create a BBS+ issuer, self-issue an __admin__ credential.""" issuer = zkac.BbsIssuer() pk = issuer.public_key() admin_rid = zkac.admin_role_id() req = zkac.prepare_blind_request() sig = issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0) cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), admin_rid, 0, pk ) return issuer, pk, cred def create_registry(mgr, issuer, admin_cred, roles=None): """Create a registry in the manager, optionally with roles.""" pk = issuer.public_key() issuance_kp = zkac.IssuanceKeypair() issuance_pk = issuance_kp.public_key_bytes() role_entries = [] if roles: for name, role_pk, epoch in roles: role_entries.append((zkac.role_id(name), role_pk, epoch)) state = zkac.RegistryState.build(pk, issuance_pk, 1, b"\x00" * 32, role_entries) state_bytes = state.serialize() cert = state.certify(admin_cred) rid = mgr.create(state_bytes, cert) return rid, issuance_kp, state class TestRegistryState: def test_build_and_serialize_roundtrip(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) data = state.serialize() state2 = zkac.RegistryState.deserialize(data) assert state2.version() == 1 assert state2.registry_id() == state.registry_id() def test_registry_id_matches(self): issuer = zkac.BbsIssuer() pk = issuer.public_key() state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) expected = zkac.registry_id(pk) assert state.registry_id() == expected def test_certify_and_verify(self): issuer, pk, admin_cred = make_admin() state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) state_bytes = state.serialize() cert = state.certify(admin_cred) assert zkac.RegistryState.verify_cert(pk, cert, state_bytes) def test_tampered_state_rejected(self): issuer, pk, admin_cred = make_admin() state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) state_bytes = state.serialize() cert = state.certify(admin_cred) tampered = bytearray(state_bytes) tampered[32] ^= 0xFF assert not zkac.RegistryState.verify_cert(pk, cert, bytes(tampered)) def test_wrong_admin_key_rejected(self): issuer, pk, admin_cred = make_admin() other_issuer = zkac.BbsIssuer() state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) state_bytes = state.serialize() cert = state.certify(admin_cred) assert not zkac.RegistryState.verify_cert(other_issuer.public_key(), cert, state_bytes) def test_state_certs_are_unlinkable(self): issuer, pk, admin_cred = make_admin() s1 = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) s2 = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\x00" * 32, []) c1 = s1.certify(admin_cred) c2 = s2.certify(admin_cred) assert c1 != c2 assert zkac.RegistryState.verify_cert(pk, c1, s1.serialize()) assert zkac.RegistryState.verify_cert(pk, c2, s2.serialize()) class TestRegistryManager: def test_create_and_get(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) assert mgr.has_registry(rid) state_bytes, cert_bytes = mgr.get(rid) assert len(state_bytes) > 0 assert len(cert_bytes) > 0 def test_duplicate_rejected(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() create_registry(mgr, issuer, admin_cred) # Same issuer key → same registry_id → duplicate state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, []) cert = state.certify(admin_cred) with pytest.raises(ValueError, match="already exists"): mgr.create(state.serialize(), cert) def test_update_with_hash_chain(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) old_bytes, _ = mgr.get(rid) old_state = zkac.RegistryState.deserialize(old_bytes) prev_hash = old_state.state_hash() role_issuer = zkac.BbsIssuer() state2 = zkac.RegistryState.build( pk, b"\x00" * 32, 2, prev_hash, [(zkac.role_id("analyst"), role_issuer.public_key(), 1)], ) cert2 = state2.certify(admin_cred) mgr.update(rid, state2.serialize(), cert2) new_bytes, _ = mgr.get(rid) new_state = zkac.RegistryState.deserialize(new_bytes) assert new_state.version() == 2 def test_restore_snapshot_after_update(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) old_bytes, _ = mgr.get(rid) old_state = zkac.RegistryState.deserialize(old_bytes) prev_hash = old_state.state_hash() role_issuer = zkac.BbsIssuer() state2 = zkac.RegistryState.build( pk, b"\x00" * 32, 2, prev_hash, [(zkac.role_id("analyst"), role_issuer.public_key(), 1)], ) cert2 = state2.certify(admin_cred) mgr.update(rid, state2.serialize(), cert2) snap_state, snap_cert = mgr.get(rid) mgr2 = zkac.RegistryManager() rid2 = mgr2.restore(snap_state, snap_cert) assert rid == rid2 s2, c2 = mgr2.get(rid) assert s2 == snap_state assert c2 == snap_cert assert zkac.RegistryState.deserialize(s2).version() == 2 def test_wrong_version_rejected(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) old_bytes, _ = mgr.get(rid) prev_hash = zkac.RegistryState.deserialize(old_bytes).state_hash() state = zkac.RegistryState.build(pk, b"\x00" * 32, 3, prev_hash, []) cert = state.certify(admin_cred) with pytest.raises(ValueError, match="version"): mgr.update(rid, state.serialize(), cert) def test_wrong_prev_hash_rejected(self): mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) state = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\xff" * 32, []) cert = state.certify(admin_cred) with pytest.raises(ValueError, match="prev_state_hash"): mgr.update(rid, state.serialize(), cert) def test_nonexistent_registry_errors(self): mgr = zkac.RegistryManager() fake = b"\xff" * 32 with pytest.raises(ValueError, match="not found"): mgr.get(fake) assert not mgr.has_registry(fake) class TestManagedHandshake: def test_full_managed_handshake(self): mgr = zkac.RegistryManager() admin_issuer, admin_pk, admin_cred = make_admin() role_issuer = zkac.BbsIssuer() role_pk = role_issuer.public_key() analyst_rid = zkac.role_id("analyst") rid, _, _ = create_registry( mgr, admin_issuer, admin_cred, roles=[("analyst", role_pk, 1)], ) # Issue a credential to a user req = zkac.prepare_blind_request() sig = role_issuer.issue_blind(req.commitment_with_proof(), analyst_rid, 1) user_cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), analyst_rid, 1, role_pk ) # Handshake server_kp = zkac.Keypair() server_pk = server_kp.public_key() server = zkac.Node(server_kp) client = zkac.Node(zkac.Keypair()) pending, init_msg = client.connect() server_session, response_msg = server.accept(init_msg) identity_proof = server.prove_identity(server_session) client_session, auth_packet = client.complete_connect_managed( pending, response_msg, identity_proof, server_pk, user_cred, rid ) verified_reg_id, verified_role_id = server.verify_auth_managed( server_session, auth_packet, mgr ) assert verified_reg_id == rid assert verified_role_id == analyst_rid # Data exchange works pkt = client_session.encrypt(b"query") assert server_session.decrypt(pkt) == b"query" def test_multiple_registries(self): mgr = zkac.RegistryManager() # Registry A issuer_a, pk_a, cred_a = make_admin() role_issuer_a = zkac.BbsIssuer() rid_a, _, _ = create_registry( mgr, issuer_a, cred_a, roles=[("ops", role_issuer_a.public_key(), 1)], ) # Registry B issuer_b, pk_b, cred_b = make_admin() role_issuer_b = zkac.BbsIssuer() rid_b, _, _ = create_registry( mgr, issuer_b, cred_b, roles=[("dev", role_issuer_b.public_key(), 1)], ) assert rid_a != rid_b # Issue credentials in each ops_rid = zkac.role_id("ops") req = zkac.prepare_blind_request() sig = role_issuer_a.issue_blind(req.commitment_with_proof(), ops_rid, 1) ops_cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), ops_rid, 1, role_issuer_a.public_key() ) dev_rid = zkac.role_id("dev") req2 = zkac.prepare_blind_request() sig2 = role_issuer_b.issue_blind(req2.commitment_with_proof(), dev_rid, 1) dev_cred = zkac.Credential.finalize( sig2, req2.member_secret(), req2.prover_blind(), dev_rid, 1, role_issuer_b.public_key() ) # Auth against registry A with ops credential server_kp = zkac.Keypair() server_sk = server_kp.secret_key_bytes() server_pk = server_kp.public_key() server = zkac.Node(server_kp) client = zkac.Node(zkac.Keypair()) pending, init_msg = client.connect() ss, resp = server.accept(init_msg) id_proof = server.prove_identity(ss) cs, auth = client.complete_connect_managed( pending, resp, id_proof, server_pk, ops_cred, rid_a ) reg_id, role_id = server.verify_auth_managed(ss, auth, mgr) assert reg_id == rid_a assert role_id == ops_rid # Auth against registry B with dev credential server2 = zkac.Node(zkac.Keypair.from_secret_key(server_sk)) client2 = zkac.Node(zkac.Keypair()) pending2, init2 = client2.connect() ss2, resp2 = server2.accept(init2) id2 = server2.prove_identity(ss2) cs2, auth2 = client2.complete_connect_managed( pending2, resp2, id2, server_pk, dev_cred, rid_b ) reg_id2, role_id2 = server2.verify_auth_managed(ss2, auth2, mgr) assert reg_id2 == rid_b assert role_id2 == dev_rid class TestIssuanceRelay: def test_e2e_encrypted_issuance(self): """Full flow: user requests credential through server, admin issues.""" mgr = zkac.RegistryManager() admin_issuer, admin_pk, admin_cred = make_admin() issuance_kp = zkac.IssuanceKeypair() issuance_pk = issuance_kp.public_key_bytes() role_issuer = zkac.BbsIssuer() analyst_rid = zkac.role_id("analyst") state = zkac.RegistryState.build( admin_pk, issuance_pk, 1, b"\x00" * 32, [(analyst_rid, role_issuer.public_key(), 1)], ) state_bytes = state.serialize() cert = state.certify(admin_cred) rid = mgr.create(state_bytes, cert) # --- User side: prepare blind request and encrypt --- req = zkac.prepare_blind_request() commitment = req.commitment_with_proof() eph_pk, encrypted_commitment = zkac.encrypt_for_admin(issuance_pk, commitment) # User submits to server request_id = b"\x01" * 32 mgr.queue_issuance_request(rid, request_id, analyst_rid, eph_pk, encrypted_commitment) # --- Admin side: fetch, decrypt, issue, encrypt response --- pending = mgr.take_pending_requests(rid) assert len(pending) == 1 req_id, role_id, eph, enc_blob = pending[0] assert req_id == request_id # Admin decrypts the commitment decrypted_commitment = issuance_kp.decrypt(eph, enc_blob) assert decrypted_commitment == commitment # Admin issues blind_sig = role_issuer.issue_blind(decrypted_commitment, analyst_rid, 1) # Admin encrypts response encrypted_sig = issuance_kp.encrypt(eph, blind_sig) mgr.grant_credential(rid, req_id, encrypted_sig) # --- User side: fetch and decrypt --- enc_response = mgr.take_granted_credential(rid, request_id) assert enc_response is not None # User decrypts — needs the ephemeral secret. # In the real flow, the user saves the StaticSecret bytes. # encrypt_for_admin uses EphemeralSecret internally, so we # demonstrate the decrypt_from_admin path with a full manual flow: # (The encrypt_for_admin function doesn't expose the secret, so # in production, the Python side would use IssuanceKeypair or # a manual DH. For this test, verify the admin-side roundtrip.) decrypted_sig = issuance_kp.decrypt(eph_pk, enc_response) assert decrypted_sig == blind_sig def test_server_cannot_substitute_commitment(self): """Server cannot forge valid commitments because it can't decrypt.""" admin_kp = zkac.IssuanceKeypair() admin_pk = admin_kp.public_key_bytes() plaintext = b"real commitment data" eph_pk, ciphertext = zkac.encrypt_for_admin(admin_pk, plaintext) # Server tries to decrypt with a different key — fails attacker_kp = zkac.IssuanceKeypair() with pytest.raises(ValueError): attacker_kp.decrypt(eph_pk, ciphertext) # Tampering with ciphertext — fails tampered = bytearray(ciphertext) tampered[-1] ^= 0xFF with pytest.raises(ValueError): admin_kp.decrypt(eph_pk, bytes(tampered)) def test_issuance_keypair_serialization(self): kp = zkac.IssuanceKeypair() secret = kp.secret_bytes() pk = kp.public_key_bytes() kp2 = zkac.IssuanceKeypair.from_secret(secret) assert kp2.public_key_bytes() == pk class TestAdminAuth: def test_verify_admin_bbs_proof(self): """Admin can prove __admin__ role to the server via BBS+.""" mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() rid, _, _ = create_registry(mgr, issuer, admin_cred) nonce = b"session-transcript-hash" admin_rid = zkac.admin_role_id() proof = admin_cred.present(nonce) assert mgr.verify_admin(rid, proof, nonce) def test_non_admin_rejected(self): """A credential for a different role cannot pass admin verification.""" mgr = zkac.RegistryManager() issuer, pk, admin_cred = make_admin() role_issuer = zkac.BbsIssuer() rid, _, _ = create_registry( mgr, issuer, admin_cred, roles=[("user", role_issuer.public_key(), 1)], ) # Issue a non-admin credential user_rid = zkac.role_id("user") req = zkac.prepare_blind_request() sig = role_issuer.issue_blind(req.commitment_with_proof(), user_rid, 1) user_cred = zkac.Credential.finalize( sig, req.member_secret(), req.prover_blind(), user_rid, 1, role_issuer.public_key() ) nonce = b"nonce" proof = user_cred.present(nonce) assert not mgr.verify_admin(rid, proof, nonce)