ZKAC/tests/test_zkac_tcp.py
2026-04-15 11:32:01 +02:00

114 lines
3.3 KiB
Python

import socket
import threading
import pytest
import zkac
from zkac.tcp import (
FramedSession,
MAX_TCP_FRAME_BYTES,
client_handshake,
read_frame,
server_handshake,
write_frame,
)
def _make_credential():
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
rid = zkac.role_id("admin")
req = zkac.prepare_blind_request()
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
cred = zkac.Credential.finalize(
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
)
return issuer, pk, rid, cred
class TestFraming:
def test_read_write_roundtrip(self):
a, b = socket.socketpair()
try:
payload = b"hello" * 400
write_frame(a, payload)
assert read_frame(b) == payload
finally:
a.close()
b.close()
def test_oversized_length_rejected(self):
a, b = socket.socketpair()
try:
a.sendall((MAX_TCP_FRAME_BYTES + 1).to_bytes(4, "little"))
with pytest.raises(ValueError, match="exceeds maximum"):
read_frame(b)
finally:
a.close()
b.close()
class TestHandshakeOverTcp:
def test_full_handshake_matching_keys(self):
_, pk, rid, cred = _make_credential()
reg = zkac.RoleRegistry()
reg.register_role(rid, pk, 1)
client_sock, server_sock = socket.socketpair()
server_kp = zkac.Keypair()
server_pk = server_kp.public_key()
def run_server():
try:
srv = zkac.Node(server_kp)
s, verified = server_handshake(server_sock, srv, reg)
assert verified == rid
pkt = s.encrypt(b"admin command")
write_frame(server_sock, pkt)
finally:
server_sock.close()
t = threading.Thread(target=run_server)
t.start()
try:
cli = zkac.Node(zkac.Keypair())
session = client_handshake(client_sock, cli, server_pk, cred)
wire = read_frame(client_sock)
assert session.decrypt(wire) == b"admin command"
finally:
client_sock.close()
t.join(timeout=5)
assert not t.is_alive()
class TestFramedSession:
def test_framed_encrypt_roundtrip(self):
_, pk, rid, cred = _make_credential()
reg = zkac.RoleRegistry()
reg.register_role(rid, pk, 1)
client_sock, server_sock = socket.socketpair()
server_kp = zkac.Keypair()
server_pk = server_kp.public_key()
def run_server():
try:
srv = zkac.Node(server_kp)
session, _ = server_handshake(server_sock, srv, reg)
framed = FramedSession(server_sock, session)
framed.send(b"reply")
finally:
server_sock.close()
t = threading.Thread(target=run_server)
t.start()
try:
cli = zkac.Node(zkac.Keypair())
session = client_handshake(client_sock, cli, server_pk, cred)
framed = FramedSession(client_sock, session)
assert framed.recv() == b"reply"
finally:
client_sock.close()
t.join(timeout=5)
assert not t.is_alive()