316 lines
10 KiB
Python
316 lines
10 KiB
Python
import zkac
|
|
import pytest
|
|
|
|
|
|
class TestKeypairAndPublicKey:
|
|
def test_keypair_generates(self):
|
|
kp = zkac.Keypair()
|
|
pk = kp.public_key()
|
|
assert isinstance(pk, zkac.PublicKey)
|
|
|
|
def test_pubkey_serialization(self):
|
|
kp = zkac.Keypair()
|
|
pk = kp.public_key()
|
|
raw = pk.to_bytes()
|
|
assert len(raw) == 32
|
|
pk2 = zkac.PublicKey.from_bytes(raw)
|
|
assert pk == pk2
|
|
|
|
def test_pubkey_repr(self):
|
|
kp = zkac.Keypair()
|
|
r = repr(kp.public_key())
|
|
assert r.startswith("PublicKey(")
|
|
assert len(r) == len("PublicKey()") + 64
|
|
|
|
def test_different_keypairs_different_pubkeys(self):
|
|
pk1 = zkac.Keypair().public_key()
|
|
pk2 = zkac.Keypair().public_key()
|
|
assert pk1 != pk2
|
|
|
|
def test_invalid_pubkey_bytes(self):
|
|
with pytest.raises(ValueError):
|
|
zkac.PublicKey.from_bytes(b"\x00" * 31)
|
|
with pytest.raises(ValueError):
|
|
zkac.PublicKey.from_bytes(b"\xff" * 32)
|
|
|
|
def test_max_proof_constant(self):
|
|
assert zkac.MAX_BBS_AUTH_PROOF_BYTES > 0
|
|
|
|
|
|
class TestBbsCredentials:
|
|
def test_role_id_deterministic(self):
|
|
rid1 = zkac.role_id("admin")
|
|
rid2 = zkac.role_id("admin")
|
|
rid3 = zkac.role_id("reader")
|
|
assert rid1 == rid2
|
|
assert rid1 != rid3
|
|
assert len(rid1) == 32
|
|
|
|
def test_full_blind_credential_flow(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
|
|
req = zkac.prepare_blind_request()
|
|
blind_sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
blind_sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
|
|
nonce = b"session-nonce"
|
|
proof = cred.present(nonce)
|
|
assert len(proof) > 0
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
assert reg.verify_presentation(rid, proof, nonce)
|
|
|
|
def test_wrong_role_rejected(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
|
|
nonce = b"nonce"
|
|
proof = cred.present(nonce)
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
wrong_rid = zkac.role_id("reader")
|
|
reg.register_role(wrong_rid, issuer.public_key(), 1)
|
|
assert not reg.verify_presentation(wrong_rid, proof, nonce)
|
|
|
|
def test_epoch_revocation(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("ops")
|
|
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
nonce = b"n1"
|
|
assert reg.verify_presentation(rid, cred.present(nonce), nonce)
|
|
|
|
reg.set_epoch(rid, 2)
|
|
nonce2 = b"n2"
|
|
assert not reg.verify_presentation(rid, cred.present(nonce2), nonce2)
|
|
|
|
def test_issuer_key_serialization(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
pk_bytes = pk.to_bytes()
|
|
pk2 = zkac.BbsPublicKey.from_bytes(pk_bytes)
|
|
assert pk.to_bytes() == pk2.to_bytes()
|
|
|
|
sk_bytes = issuer.secret_key_bytes()
|
|
issuer2 = zkac.BbsIssuer.from_secret_key(sk_bytes)
|
|
assert issuer2.public_key().to_bytes() == pk_bytes
|
|
|
|
def test_presentations_unlinkable(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
|
|
p1 = cred.present(b"nonce1")
|
|
p2 = cred.present(b"nonce2")
|
|
assert p1 != p2
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
assert reg.verify_presentation(rid, p1, b"nonce1")
|
|
assert reg.verify_presentation(rid, p2, b"nonce2")
|
|
|
|
def test_multiple_members(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("editors")
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
nonce = b"shared-nonce"
|
|
|
|
for _ in range(3):
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
proof = cred.present(nonce)
|
|
assert reg.verify_presentation(rid, proof, nonce)
|
|
|
|
def test_credential_role_and_epoch(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 42)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 42, pk
|
|
)
|
|
assert cred.role_id() == rid
|
|
assert cred.epoch() == 42
|
|
|
|
|
|
class TestSchnorrSignature:
|
|
def test_sign_verify(self):
|
|
kp = zkac.Keypair()
|
|
pk = kp.public_key()
|
|
sig = kp.sign(b"hello")
|
|
assert len(sig) == 64
|
|
assert pk.verify(b"hello", sig)
|
|
|
|
def test_wrong_message(self):
|
|
kp = zkac.Keypair()
|
|
pk = kp.public_key()
|
|
sig = kp.sign(b"correct")
|
|
assert not pk.verify(b"wrong", sig)
|
|
|
|
def test_wrong_key(self):
|
|
kp1 = zkac.Keypair()
|
|
kp2 = zkac.Keypair()
|
|
sig = kp1.sign(b"msg")
|
|
assert not kp2.public_key().verify(b"msg", sig)
|
|
|
|
def test_deterministic(self):
|
|
kp = zkac.Keypair()
|
|
assert kp.sign(b"same") == kp.sign(b"same")
|
|
|
|
|
|
class TestNodeHandshake:
|
|
def _make_credential(self):
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
return issuer, pk, rid, cred
|
|
|
|
def test_full_handshake(self):
|
|
_, pk, rid, cred = self._make_credential()
|
|
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
|
|
client = zkac.Node(zkac.Keypair())
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
client_session, auth_packet = client.complete_connect(
|
|
pending, response_msg, identity_proof, server_pk, cred
|
|
)
|
|
|
|
verified_rid = server.verify_auth(server_session, auth_packet, reg)
|
|
assert verified_rid == rid
|
|
|
|
pkt = client_session.encrypt(b"admin command")
|
|
assert server_session.decrypt(pkt) == b"admin command"
|
|
|
|
pkt = server_session.encrypt(b"response")
|
|
assert client_session.decrypt(pkt) == b"response"
|
|
|
|
def test_wrong_server_key_rejected(self):
|
|
_, pk, rid, cred = self._make_credential()
|
|
|
|
client = zkac.Node(zkac.Keypair())
|
|
server = zkac.Node(zkac.Keypair())
|
|
wrong_pk = zkac.Keypair().public_key()
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
|
|
with pytest.raises(ValueError, match="identity verification failed"):
|
|
client.complete_connect(
|
|
pending, response_msg, identity_proof, wrong_pk, cred
|
|
)
|
|
|
|
def test_replay_rejected(self):
|
|
_, pk, rid, cred = self._make_credential()
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
|
|
client = zkac.Node(zkac.Keypair())
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
client_session, auth_packet = client.complete_connect(
|
|
pending, response_msg, identity_proof, server_pk, cred
|
|
)
|
|
server.verify_auth(server_session, auth_packet, reg)
|
|
|
|
pkt = client_session.encrypt(b"once")
|
|
server_session.decrypt(pkt)
|
|
with pytest.raises(ValueError, match="replay"):
|
|
server_session.decrypt(pkt)
|
|
|
|
def test_tampered_packet_rejected(self):
|
|
_, pk, rid, cred = self._make_credential()
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
|
|
client = zkac.Node(zkac.Keypair())
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
client_session, auth_packet = client.complete_connect(
|
|
pending, response_msg, identity_proof, server_pk, cred
|
|
)
|
|
server.verify_auth(server_session, auth_packet, reg)
|
|
|
|
pkt = bytearray(client_session.encrypt(b"secret"))
|
|
pkt[-1] ^= 0xFF
|
|
with pytest.raises(ValueError, match="decryption"):
|
|
server_session.decrypt(bytes(pkt))
|
|
|
|
def test_pending_connect_consumed(self):
|
|
_, pk, rid, cred = self._make_credential()
|
|
|
|
client = zkac.Node(zkac.Keypair())
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
server = zkac.Node(server_kp)
|
|
|
|
pending, init_msg = client.connect()
|
|
server_session, response_msg = server.accept(init_msg)
|
|
identity_proof = server.prove_identity(server_session)
|
|
client.complete_connect(
|
|
pending, response_msg, identity_proof, server_pk, cred
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="consumed"):
|
|
client.complete_connect(
|
|
pending, response_msg, identity_proof, server_pk, cred
|
|
)
|