431 lines
16 KiB
Python
431 lines
16 KiB
Python
import zkac
|
|
import pytest
|
|
|
|
|
|
def make_admin():
|
|
"""Create a BBS+ issuer, self-issue an __admin__ credential."""
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
admin_rid = zkac.admin_role_id()
|
|
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), admin_rid, 0, pk
|
|
)
|
|
return issuer, pk, cred
|
|
|
|
|
|
def create_registry(mgr, issuer, admin_cred, roles=None):
|
|
"""Create a registry in the manager, optionally with roles."""
|
|
pk = issuer.public_key()
|
|
issuance_kp = zkac.IssuanceKeypair()
|
|
issuance_pk = issuance_kp.public_key_bytes()
|
|
|
|
role_entries = []
|
|
if roles:
|
|
for name, role_pk, epoch in roles:
|
|
role_entries.append((zkac.role_id(name), role_pk, epoch))
|
|
|
|
state = zkac.RegistryState.build(pk, issuance_pk, 1, b"\x00" * 32, role_entries)
|
|
state_bytes = state.serialize()
|
|
cert = state.certify(admin_cred)
|
|
rid = mgr.create(state_bytes, cert)
|
|
return rid, issuance_kp, state
|
|
|
|
|
|
class TestRegistryState:
|
|
def test_build_and_serialize_roundtrip(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
data = state.serialize()
|
|
state2 = zkac.RegistryState.deserialize(data)
|
|
assert state2.version() == 1
|
|
assert state2.registry_id() == state.registry_id()
|
|
|
|
def test_registry_id_matches(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
expected = zkac.registry_id(pk)
|
|
assert state.registry_id() == expected
|
|
|
|
def test_certify_and_verify(self):
|
|
issuer, pk, admin_cred = make_admin()
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
state_bytes = state.serialize()
|
|
cert = state.certify(admin_cred)
|
|
assert zkac.RegistryState.verify_cert(pk, cert, state_bytes)
|
|
|
|
def test_tampered_state_rejected(self):
|
|
issuer, pk, admin_cred = make_admin()
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
state_bytes = state.serialize()
|
|
cert = state.certify(admin_cred)
|
|
|
|
tampered = bytearray(state_bytes)
|
|
tampered[32] ^= 0xFF
|
|
assert not zkac.RegistryState.verify_cert(pk, cert, bytes(tampered))
|
|
|
|
def test_wrong_admin_key_rejected(self):
|
|
issuer, pk, admin_cred = make_admin()
|
|
other_issuer = zkac.BbsIssuer()
|
|
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
state_bytes = state.serialize()
|
|
cert = state.certify(admin_cred)
|
|
assert not zkac.RegistryState.verify_cert(other_issuer.public_key(), cert, state_bytes)
|
|
|
|
def test_state_certs_are_unlinkable(self):
|
|
issuer, pk, admin_cred = make_admin()
|
|
s1 = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
s2 = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\x00" * 32, [])
|
|
|
|
c1 = s1.certify(admin_cred)
|
|
c2 = s2.certify(admin_cred)
|
|
assert c1 != c2
|
|
|
|
assert zkac.RegistryState.verify_cert(pk, c1, s1.serialize())
|
|
assert zkac.RegistryState.verify_cert(pk, c2, s2.serialize())
|
|
|
|
|
|
class TestRegistryManager:
|
|
def test_create_and_get(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
assert mgr.has_registry(rid)
|
|
state_bytes, cert_bytes = mgr.get(rid)
|
|
assert len(state_bytes) > 0
|
|
assert len(cert_bytes) > 0
|
|
|
|
def test_duplicate_rejected(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
create_registry(mgr, issuer, admin_cred)
|
|
|
|
# Same issuer key → same registry_id → duplicate
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 1, b"\x00" * 32, [])
|
|
cert = state.certify(admin_cred)
|
|
with pytest.raises(ValueError, match="already exists"):
|
|
mgr.create(state.serialize(), cert)
|
|
|
|
def test_update_with_hash_chain(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
old_bytes, _ = mgr.get(rid)
|
|
old_state = zkac.RegistryState.deserialize(old_bytes)
|
|
prev_hash = old_state.state_hash()
|
|
|
|
role_issuer = zkac.BbsIssuer()
|
|
state2 = zkac.RegistryState.build(
|
|
pk, b"\x00" * 32, 2, prev_hash,
|
|
[(zkac.role_id("analyst"), role_issuer.public_key(), 1)],
|
|
)
|
|
cert2 = state2.certify(admin_cred)
|
|
mgr.update(rid, state2.serialize(), cert2)
|
|
|
|
new_bytes, _ = mgr.get(rid)
|
|
new_state = zkac.RegistryState.deserialize(new_bytes)
|
|
assert new_state.version() == 2
|
|
|
|
def test_restore_snapshot_after_update(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
old_bytes, _ = mgr.get(rid)
|
|
old_state = zkac.RegistryState.deserialize(old_bytes)
|
|
prev_hash = old_state.state_hash()
|
|
|
|
role_issuer = zkac.BbsIssuer()
|
|
state2 = zkac.RegistryState.build(
|
|
pk,
|
|
b"\x00" * 32,
|
|
2,
|
|
prev_hash,
|
|
[(zkac.role_id("analyst"), role_issuer.public_key(), 1)],
|
|
)
|
|
cert2 = state2.certify(admin_cred)
|
|
mgr.update(rid, state2.serialize(), cert2)
|
|
|
|
snap_state, snap_cert = mgr.get(rid)
|
|
mgr2 = zkac.RegistryManager()
|
|
rid2 = mgr2.restore(snap_state, snap_cert)
|
|
assert rid == rid2
|
|
s2, c2 = mgr2.get(rid)
|
|
assert s2 == snap_state
|
|
assert c2 == snap_cert
|
|
assert zkac.RegistryState.deserialize(s2).version() == 2
|
|
|
|
def test_wrong_version_rejected(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
old_bytes, _ = mgr.get(rid)
|
|
prev_hash = zkac.RegistryState.deserialize(old_bytes).state_hash()
|
|
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 3, prev_hash, [])
|
|
cert = state.certify(admin_cred)
|
|
with pytest.raises(ValueError, match="version"):
|
|
mgr.update(rid, state.serialize(), cert)
|
|
|
|
def test_wrong_prev_hash_rejected(self):
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
state = zkac.RegistryState.build(pk, b"\x00" * 32, 2, b"\xff" * 32, [])
|
|
cert = state.certify(admin_cred)
|
|
with pytest.raises(ValueError, match="prev_state_hash"):
|
|
mgr.update(rid, state.serialize(), cert)
|
|
|
|
def test_nonexistent_registry_errors(self):
|
|
mgr = zkac.RegistryManager()
|
|
fake = b"\xff" * 32
|
|
with pytest.raises(ValueError, match="not found"):
|
|
mgr.get(fake)
|
|
assert not mgr.has_registry(fake)
|
|
|
|
|
|
class TestManagedHandshake:
|
|
def test_full_managed_handshake(self):
|
|
mgr = zkac.RegistryManager()
|
|
admin_issuer, admin_pk, admin_cred = make_admin()
|
|
|
|
role_issuer = zkac.BbsIssuer()
|
|
role_pk = role_issuer.public_key()
|
|
analyst_rid = zkac.role_id("analyst")
|
|
|
|
rid, _, _ = create_registry(
|
|
mgr, admin_issuer, admin_cred,
|
|
roles=[("analyst", role_pk, 1)],
|
|
)
|
|
|
|
# Issue a credential to a user
|
|
req = zkac.prepare_blind_request()
|
|
sig = role_issuer.issue_blind(req.commitment_with_proof(), analyst_rid, 1)
|
|
user_cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), analyst_rid, 1, role_pk
|
|
)
|
|
|
|
# Handshake
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
client = zkac.Node(zkac.Keypair())
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
client_session, auth_packet = client.complete_connect_managed(
|
|
pending, response_msg, identity_proof, server_pk, user_cred, rid
|
|
)
|
|
|
|
verified_reg_id, verified_role_id = server.verify_auth_managed(
|
|
server_session, auth_packet, mgr
|
|
)
|
|
assert verified_reg_id == rid
|
|
assert verified_role_id == analyst_rid
|
|
|
|
# Data exchange works
|
|
pkt = client_session.encrypt(b"query")
|
|
assert server_session.decrypt(pkt) == b"query"
|
|
|
|
def test_multiple_registries(self):
|
|
mgr = zkac.RegistryManager()
|
|
|
|
# Registry A
|
|
issuer_a, pk_a, cred_a = make_admin()
|
|
role_issuer_a = zkac.BbsIssuer()
|
|
rid_a, _, _ = create_registry(
|
|
mgr, issuer_a, cred_a,
|
|
roles=[("ops", role_issuer_a.public_key(), 1)],
|
|
)
|
|
|
|
# Registry B
|
|
issuer_b, pk_b, cred_b = make_admin()
|
|
role_issuer_b = zkac.BbsIssuer()
|
|
rid_b, _, _ = create_registry(
|
|
mgr, issuer_b, cred_b,
|
|
roles=[("dev", role_issuer_b.public_key(), 1)],
|
|
)
|
|
|
|
assert rid_a != rid_b
|
|
|
|
# Issue credentials in each
|
|
ops_rid = zkac.role_id("ops")
|
|
req = zkac.prepare_blind_request()
|
|
sig = role_issuer_a.issue_blind(req.commitment_with_proof(), ops_rid, 1)
|
|
ops_cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), ops_rid, 1, role_issuer_a.public_key()
|
|
)
|
|
|
|
dev_rid = zkac.role_id("dev")
|
|
req2 = zkac.prepare_blind_request()
|
|
sig2 = role_issuer_b.issue_blind(req2.commitment_with_proof(), dev_rid, 1)
|
|
dev_cred = zkac.Credential.finalize(
|
|
sig2, req2.member_secret(), req2.prover_blind(), dev_rid, 1, role_issuer_b.public_key()
|
|
)
|
|
|
|
# Auth against registry A with ops credential
|
|
server_kp = zkac.Keypair()
|
|
server_sk = server_kp.secret_key_bytes()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
client = zkac.Node(zkac.Keypair())
|
|
|
|
pending, init_msg = client.connect()
|
|
ss, resp = server.accept(init_msg)
|
|
id_proof = server.prove_identity(ss)
|
|
cs, auth = client.complete_connect_managed(
|
|
pending, resp, id_proof, server_pk, ops_cred, rid_a
|
|
)
|
|
reg_id, role_id = server.verify_auth_managed(ss, auth, mgr)
|
|
assert reg_id == rid_a
|
|
assert role_id == ops_rid
|
|
|
|
# Auth against registry B with dev credential
|
|
server2 = zkac.Node(zkac.Keypair.from_secret_key(server_sk))
|
|
client2 = zkac.Node(zkac.Keypair())
|
|
|
|
pending2, init2 = client2.connect()
|
|
ss2, resp2 = server2.accept(init2)
|
|
id2 = server2.prove_identity(ss2)
|
|
cs2, auth2 = client2.complete_connect_managed(
|
|
pending2, resp2, id2, server_pk, dev_cred, rid_b
|
|
)
|
|
reg_id2, role_id2 = server2.verify_auth_managed(ss2, auth2, mgr)
|
|
assert reg_id2 == rid_b
|
|
assert role_id2 == dev_rid
|
|
|
|
|
|
class TestIssuanceRelay:
|
|
def test_e2e_encrypted_issuance(self):
|
|
"""Full flow: user requests credential through server, admin issues."""
|
|
mgr = zkac.RegistryManager()
|
|
admin_issuer, admin_pk, admin_cred = make_admin()
|
|
|
|
issuance_kp = zkac.IssuanceKeypair()
|
|
issuance_pk = issuance_kp.public_key_bytes()
|
|
|
|
role_issuer = zkac.BbsIssuer()
|
|
analyst_rid = zkac.role_id("analyst")
|
|
|
|
state = zkac.RegistryState.build(
|
|
admin_pk, issuance_pk, 1, b"\x00" * 32,
|
|
[(analyst_rid, role_issuer.public_key(), 1)],
|
|
)
|
|
state_bytes = state.serialize()
|
|
cert = state.certify(admin_cred)
|
|
rid = mgr.create(state_bytes, cert)
|
|
|
|
# --- User side: prepare blind request and encrypt ---
|
|
req = zkac.prepare_blind_request()
|
|
commitment = req.commitment_with_proof()
|
|
eph_pk, encrypted_commitment = zkac.encrypt_for_admin(issuance_pk, commitment)
|
|
|
|
# User submits to server
|
|
request_id = b"\x01" * 32
|
|
mgr.queue_issuance_request(rid, request_id, analyst_rid, eph_pk, encrypted_commitment)
|
|
|
|
# --- Admin side: fetch, decrypt, issue, encrypt response ---
|
|
pending = mgr.take_pending_requests(rid)
|
|
assert len(pending) == 1
|
|
req_id, role_id, eph, enc_blob = pending[0]
|
|
assert req_id == request_id
|
|
|
|
# Admin decrypts the commitment
|
|
decrypted_commitment = issuance_kp.decrypt(eph, enc_blob)
|
|
assert decrypted_commitment == commitment
|
|
|
|
# Admin issues
|
|
blind_sig = role_issuer.issue_blind(decrypted_commitment, analyst_rid, 1)
|
|
|
|
# Admin encrypts response
|
|
encrypted_sig = issuance_kp.encrypt(eph, blind_sig)
|
|
mgr.grant_credential(rid, req_id, encrypted_sig)
|
|
|
|
# --- User side: fetch and decrypt ---
|
|
enc_response = mgr.take_granted_credential(rid, request_id)
|
|
assert enc_response is not None
|
|
|
|
# User decrypts — needs the ephemeral secret.
|
|
# In the real flow, the user saves the StaticSecret bytes.
|
|
# encrypt_for_admin uses EphemeralSecret internally, so we
|
|
# demonstrate the decrypt_from_admin path with a full manual flow:
|
|
# (The encrypt_for_admin function doesn't expose the secret, so
|
|
# in production, the Python side would use IssuanceKeypair or
|
|
# a manual DH. For this test, verify the admin-side roundtrip.)
|
|
decrypted_sig = issuance_kp.decrypt(eph_pk, enc_response)
|
|
assert decrypted_sig == blind_sig
|
|
|
|
def test_server_cannot_substitute_commitment(self):
|
|
"""Server cannot forge valid commitments because it can't decrypt."""
|
|
admin_kp = zkac.IssuanceKeypair()
|
|
admin_pk = admin_kp.public_key_bytes()
|
|
|
|
plaintext = b"real commitment data"
|
|
eph_pk, ciphertext = zkac.encrypt_for_admin(admin_pk, plaintext)
|
|
|
|
# Server tries to decrypt with a different key — fails
|
|
attacker_kp = zkac.IssuanceKeypair()
|
|
with pytest.raises(ValueError):
|
|
attacker_kp.decrypt(eph_pk, ciphertext)
|
|
|
|
# Tampering with ciphertext — fails
|
|
tampered = bytearray(ciphertext)
|
|
tampered[-1] ^= 0xFF
|
|
with pytest.raises(ValueError):
|
|
admin_kp.decrypt(eph_pk, bytes(tampered))
|
|
|
|
def test_issuance_keypair_serialization(self):
|
|
kp = zkac.IssuanceKeypair()
|
|
secret = kp.secret_bytes()
|
|
pk = kp.public_key_bytes()
|
|
kp2 = zkac.IssuanceKeypair.from_secret(secret)
|
|
assert kp2.public_key_bytes() == pk
|
|
|
|
|
|
class TestAdminAuth:
|
|
def test_verify_admin_bbs_proof(self):
|
|
"""Admin can prove __admin__ role to the server via BBS+."""
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
rid, _, _ = create_registry(mgr, issuer, admin_cred)
|
|
|
|
nonce = b"session-transcript-hash"
|
|
admin_rid = zkac.admin_role_id()
|
|
proof = admin_cred.present(nonce)
|
|
|
|
assert mgr.verify_admin(rid, proof, nonce)
|
|
|
|
def test_non_admin_rejected(self):
|
|
"""A credential for a different role cannot pass admin verification."""
|
|
mgr = zkac.RegistryManager()
|
|
issuer, pk, admin_cred = make_admin()
|
|
|
|
role_issuer = zkac.BbsIssuer()
|
|
rid, _, _ = create_registry(
|
|
mgr, issuer, admin_cred,
|
|
roles=[("user", role_issuer.public_key(), 1)],
|
|
)
|
|
|
|
# Issue a non-admin credential
|
|
user_rid = zkac.role_id("user")
|
|
req = zkac.prepare_blind_request()
|
|
sig = role_issuer.issue_blind(req.commitment_with_proof(), user_rid, 1)
|
|
user_cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(),
|
|
user_rid, 1, role_issuer.public_key()
|
|
)
|
|
|
|
nonce = b"nonce"
|
|
proof = user_cred.present(nonce)
|
|
assert not mgr.verify_admin(rid, proof, nonce)
|