ZKAC/tests/test_managed_registry.py
2026-04-19 23:19:24 +02:00

431 lines
16 KiB
Python

import zkac
import pytest
def make_admin():
"""Create a BBS+ issuer, self-issue an __admin__ credential."""
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
admin_rid = zkac.admin_role_id()
req = zkac.prepare_blind_request()
sig = issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), admin_rid, 0, pk
)
return issuer, pk, cred
def create_registry(mgr, issuer, admin_cred, roles=None):
"""Create a registry in the manager, optionally with roles."""
pk = issuer.public_key()
issuance_kp = zkac.IssuanceKeypair()
issuance_pk = issuance_kp.public_key_bytes()
role_entries = []
if roles:
for name, role_pk, epoch in roles:
role_entries.append((zkac.role_id(name), role_pk, epoch))
state = zkac.RegistryState.build(pk, issuance_pk, 1, b"\x00" * 32, role_entries)
state_bytes = state.serialize()
cert = state.certify(admin_cred)
rid = mgr.create(state_bytes, cert)
return rid, issuance_kp, state
class TestRegistryState:
def test_build_and_serialize_roundtrip(self):
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
data = state.serialize()
state2 = zkac.RegistryState.deserialize(data)
assert state2.version() == 1
assert state2.registry_id() == state.registry_id()
def test_registry_id_matches(self):
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
expected = zkac.registry_id(pk)
assert state.registry_id() == expected
def test_certify_and_verify(self):
issuer, pk, admin_cred = make_admin()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
assert zkac.RegistryState.verify_cert(pk, cert, state_bytes)
def test_tampered_state_rejected(self):
issuer, pk, admin_cred = make_admin()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
tampered = bytearray(state_bytes)
tampered[32] ^= 0xFF
assert not zkac.RegistryState.verify_cert(pk, cert, bytes(tampered))
def test_wrong_admin_key_rejected(self):
issuer, pk, admin_cred = make_admin()
other_issuer = zkac.BbsIssuer()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
state_bytes = state.serialize()
cert = state.certify(admin_cred)
assert not zkac.RegistryState.verify_cert(other_issuer.public_key(), cert, state_bytes)
def test_state_certs_are_unlinkable(self):
issuer, pk, admin_cred = make_admin()
s1 = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
s2 = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\x00" * 32, [])
c1 = s1.certify(admin_cred)
c2 = s2.certify(admin_cred)
assert c1 != c2
assert zkac.RegistryState.verify_cert(pk, c1, s1.serialize())
assert zkac.RegistryState.verify_cert(pk, c2, s2.serialize())
class TestRegistryManager:
def test_create_and_get(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
assert mgr.has_registry(rid)
state_bytes, cert_bytes = mgr.get(rid)
assert len(state_bytes) > 0
assert len(cert_bytes) > 0
def test_duplicate_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
create_registry(mgr, issuer, admin_cred)
# Same issuer key → same registry_id → duplicate
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="already exists"):
mgr.create(state.serialize(), cert)
def test_update_with_hash_chain(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
old_bytes, _ = mgr.get(rid)
old_state = zkac.RegistryState.deserialize(old_bytes)
prev_hash = old_state.state_hash()
role_issuer = zkac.BbsIssuer()
state2 = zkac.RegistryState.build(
pk, b"\x00" * 32, 2, prev_hash,
[(zkac.role_id("analyst"), role_issuer.public_key(), 1)],
)
cert2 = state2.certify(admin_cred)
mgr.update(rid, state2.serialize(), cert2)
new_bytes, _ = mgr.get(rid)
new_state = zkac.RegistryState.deserialize(new_bytes)
assert new_state.version() == 2
def test_restore_snapshot_after_update(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
old_bytes, _ = mgr.get(rid)
old_state = zkac.RegistryState.deserialize(old_bytes)
prev_hash = old_state.state_hash()
role_issuer = zkac.BbsIssuer()
state2 = zkac.RegistryState.build(
pk,
b"\x00" * 32,
2,
prev_hash,
[(zkac.role_id("analyst"), role_issuer.public_key(), 1)],
)
cert2 = state2.certify(admin_cred)
mgr.update(rid, state2.serialize(), cert2)
snap_state, snap_cert = mgr.get(rid)
mgr2 = zkac.RegistryManager()
rid2 = mgr2.restore(snap_state, snap_cert)
assert rid == rid2
s2, c2 = mgr2.get(rid)
assert s2 == snap_state
assert c2 == snap_cert
assert zkac.RegistryState.deserialize(s2).version() == 2
def test_wrong_version_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
old_bytes, _ = mgr.get(rid)
prev_hash = zkac.RegistryState.deserialize(old_bytes).state_hash()
state = zkac.RegistryState.build(pk, b"\x00" * 32, 3, prev_hash, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="version"):
mgr.update(rid, state.serialize(), cert)
def test_wrong_prev_hash_rejected(self):
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
state = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\xff" * 32, [])
cert = state.certify(admin_cred)
with pytest.raises(ValueError, match="prev_state_hash"):
mgr.update(rid, state.serialize(), cert)
def test_nonexistent_registry_errors(self):
mgr = zkac.RegistryManager()
fake = b"\xff" * 32
with pytest.raises(ValueError, match="not found"):
mgr.get(fake)
assert not mgr.has_registry(fake)
class TestManagedHandshake:
def test_full_managed_handshake(self):
mgr = zkac.RegistryManager()
admin_issuer, admin_pk, admin_cred = make_admin()
role_issuer = zkac.BbsIssuer()
role_pk = role_issuer.public_key()
analyst_rid = zkac.role_id("analyst")
rid, _, _ = create_registry(
mgr, admin_issuer, admin_cred,
roles=[("analyst", role_pk, 1)],
)
# Issue a credential to a user
req = zkac.prepare_blind_request()
sig = role_issuer.issue_blind(req.commitment_with_proof(), analyst_rid, 1)
user_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), analyst_rid, 1, role_pk
)
# Handshake
server_kp = zkac.Keypair()
server_pk = server_kp.public_key()
server = zkac.Node(server_kp)
client = zkac.Node(zkac.Keypair())
pending, init_msg = client.connect()
server_session, response_msg = server.accept(init_msg)
identity_proof = server.prove_identity(server_session)
client_session, auth_packet = client.complete_connect_managed(
pending, response_msg, identity_proof, server_pk, user_cred, rid
)
verified_reg_id, verified_role_id = server.verify_auth_managed(
server_session, auth_packet, mgr
)
assert verified_reg_id == rid
assert verified_role_id == analyst_rid
# Data exchange works
pkt = client_session.encrypt(b"query")
assert server_session.decrypt(pkt) == b"query"
def test_multiple_registries(self):
mgr = zkac.RegistryManager()
# Registry A
issuer_a, pk_a, cred_a = make_admin()
role_issuer_a = zkac.BbsIssuer()
rid_a, _, _ = create_registry(
mgr, issuer_a, cred_a,
roles=[("ops", role_issuer_a.public_key(), 1)],
)
# Registry B
issuer_b, pk_b, cred_b = make_admin()
role_issuer_b = zkac.BbsIssuer()
rid_b, _, _ = create_registry(
mgr, issuer_b, cred_b,
roles=[("dev", role_issuer_b.public_key(), 1)],
)
assert rid_a != rid_b
# Issue credentials in each
ops_rid = zkac.role_id("ops")
req = zkac.prepare_blind_request()
sig = role_issuer_a.issue_blind(req.commitment_with_proof(), ops_rid, 1)
ops_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), ops_rid, 1, role_issuer_a.public_key()
)
dev_rid = zkac.role_id("dev")
req2 = zkac.prepare_blind_request()
sig2 = role_issuer_b.issue_blind(req2.commitment_with_proof(), dev_rid, 1)
dev_cred = zkac.Credential.finalize(
sig2, req2.member_secret(), req2.prover_blind(), dev_rid, 1, role_issuer_b.public_key()
)
# Auth against registry A with ops credential
server_kp = zkac.Keypair()
server_sk = server_kp.secret_key_bytes()
server_pk = server_kp.public_key()
server = zkac.Node(server_kp)
client = zkac.Node(zkac.Keypair())
pending, init_msg = client.connect()
ss, resp = server.accept(init_msg)
id_proof = server.prove_identity(ss)
cs, auth = client.complete_connect_managed(
pending, resp, id_proof, server_pk, ops_cred, rid_a
)
reg_id, role_id = server.verify_auth_managed(ss, auth, mgr)
assert reg_id == rid_a
assert role_id == ops_rid
# Auth against registry B with dev credential
server2 = zkac.Node(zkac.Keypair.from_secret_key(server_sk))
client2 = zkac.Node(zkac.Keypair())
pending2, init2 = client2.connect()
ss2, resp2 = server2.accept(init2)
id2 = server2.prove_identity(ss2)
cs2, auth2 = client2.complete_connect_managed(
pending2, resp2, id2, server_pk, dev_cred, rid_b
)
reg_id2, role_id2 = server2.verify_auth_managed(ss2, auth2, mgr)
assert reg_id2 == rid_b
assert role_id2 == dev_rid
class TestIssuanceRelay:
def test_e2e_encrypted_issuance(self):
"""Full flow: user requests credential through server, admin issues."""
mgr = zkac.RegistryManager()
admin_issuer, admin_pk, admin_cred = make_admin()
issuance_kp = zkac.IssuanceKeypair()
issuance_pk = issuance_kp.public_key_bytes()
role_issuer = zkac.BbsIssuer()
analyst_rid = zkac.role_id("analyst")
state = zkac.RegistryState.build(
admin_pk, issuance_pk, 1, b"\x00" * 32,
[(analyst_rid, role_issuer.public_key(), 1)],
)
state_bytes = state.serialize()
cert = state.certify(admin_cred)
rid = mgr.create(state_bytes, cert)
# --- User side: prepare blind request and encrypt ---
req = zkac.prepare_blind_request()
commitment = req.commitment_with_proof()
eph_pk, encrypted_commitment = zkac.encrypt_for_admin(issuance_pk, commitment)
# User submits to server
request_id = b"\x01" * 32
mgr.queue_issuance_request(rid, request_id, analyst_rid, eph_pk, encrypted_commitment)
# --- Admin side: fetch, decrypt, issue, encrypt response ---
pending = mgr.take_pending_requests(rid)
assert len(pending) == 1
req_id, role_id, eph, enc_blob = pending[0]
assert req_id == request_id
# Admin decrypts the commitment
decrypted_commitment = issuance_kp.decrypt(eph, enc_blob)
assert decrypted_commitment == commitment
# Admin issues
blind_sig = role_issuer.issue_blind(decrypted_commitment, analyst_rid, 1)
# Admin encrypts response
encrypted_sig = issuance_kp.encrypt(eph, blind_sig)
mgr.grant_credential(rid, req_id, encrypted_sig)
# --- User side: fetch and decrypt ---
enc_response = mgr.take_granted_credential(rid, request_id)
assert enc_response is not None
# User decrypts — needs the ephemeral secret.
# In the real flow, the user saves the StaticSecret bytes.
# encrypt_for_admin uses EphemeralSecret internally, so we
# demonstrate the decrypt_from_admin path with a full manual flow:
# (The encrypt_for_admin function doesn't expose the secret, so
# in production, the Python side would use IssuanceKeypair or
# a manual DH. For this test, verify the admin-side roundtrip.)
decrypted_sig = issuance_kp.decrypt(eph_pk, enc_response)
assert decrypted_sig == blind_sig
def test_server_cannot_substitute_commitment(self):
"""Server cannot forge valid commitments because it can't decrypt."""
admin_kp = zkac.IssuanceKeypair()
admin_pk = admin_kp.public_key_bytes()
plaintext = b"real commitment data"
eph_pk, ciphertext = zkac.encrypt_for_admin(admin_pk, plaintext)
# Server tries to decrypt with a different key — fails
attacker_kp = zkac.IssuanceKeypair()
with pytest.raises(ValueError):
attacker_kp.decrypt(eph_pk, ciphertext)
# Tampering with ciphertext — fails
tampered = bytearray(ciphertext)
tampered[-1] ^= 0xFF
with pytest.raises(ValueError):
admin_kp.decrypt(eph_pk, bytes(tampered))
def test_issuance_keypair_serialization(self):
kp = zkac.IssuanceKeypair()
secret = kp.secret_bytes()
pk = kp.public_key_bytes()
kp2 = zkac.IssuanceKeypair.from_secret(secret)
assert kp2.public_key_bytes() == pk
class TestAdminAuth:
def test_verify_admin_bbs_proof(self):
"""Admin can prove __admin__ role to the server via BBS+."""
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
rid, _, _ = create_registry(mgr, issuer, admin_cred)
nonce = b"session-transcript-hash"
admin_rid = zkac.admin_role_id()
proof = admin_cred.present(nonce)
assert mgr.verify_admin(rid, proof, nonce)
def test_non_admin_rejected(self):
"""A credential for a different role cannot pass admin verification."""
mgr = zkac.RegistryManager()
issuer, pk, admin_cred = make_admin()
role_issuer = zkac.BbsIssuer()
rid, _, _ = create_registry(
mgr, issuer, admin_cred,
roles=[("user", role_issuer.public_key(), 1)],
)
# Issue a non-admin credential
user_rid = zkac.role_id("user")
req = zkac.prepare_blind_request()
sig = role_issuer.issue_blind(req.commitment_with_proof(), user_rid, 1)
user_cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(),
user_rid, 1, role_issuer.public_key()
)
nonce = b"nonce"
proof = user_cred.present(nonce)
assert not mgr.verify_admin(rid, proof, nonce)