84 lines
2.4 KiB
Python
84 lines
2.4 KiB
Python
import socket
|
|
import threading
|
|
|
|
import zkac
|
|
from zkac.udp import (
|
|
FramedSession,
|
|
client_handshake,
|
|
read_datagram,
|
|
server_handshake,
|
|
write_datagram,
|
|
)
|
|
|
|
|
|
def _make_credential():
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
rid = zkac.role_id("admin")
|
|
req = zkac.prepare_blind_request()
|
|
sig = issuer.issue_blind(req.commitment_with_proof(), rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
sig, req.member_secret(), req.prover_blind(), rid, 1, pk
|
|
)
|
|
return pk, rid, cred
|
|
|
|
|
|
class TestFraming:
|
|
def test_write_read_connected(self):
|
|
a, b = socket.socketpair(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
try:
|
|
payload = b"udp-framed"
|
|
write_datagram(a, payload)
|
|
assert read_datagram(b) == payload
|
|
finally:
|
|
a.close()
|
|
b.close()
|
|
|
|
|
|
class TestHandshakeOverUdp:
|
|
def test_full_handshake_localhost(self):
|
|
pk, rid, cred = _make_credential()
|
|
reg = zkac.RoleRegistry()
|
|
reg.register_role(rid, pk, 1)
|
|
|
|
server_kp = zkac.Keypair()
|
|
server_pk = server_kp.public_key()
|
|
client_kp = zkac.Keypair()
|
|
|
|
ready = threading.Event()
|
|
port_holder: list[int] = []
|
|
err: list[BaseException] = []
|
|
|
|
def run_server():
|
|
srv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
|
try:
|
|
srv.bind(("127.0.0.1", 0))
|
|
port_holder.append(srv.getsockname()[1])
|
|
ready.set()
|
|
node = zkac.Node(server_kp)
|
|
session, verified, _addr = server_handshake(srv, node, reg)
|
|
assert verified == rid
|
|
framed = FramedSession(srv, session)
|
|
framed.send(b"pong: " + framed.recv())
|
|
except BaseException as e:
|
|
err.append(e)
|
|
finally:
|
|
srv.close()
|
|
|
|
t = threading.Thread(target=run_server, daemon=True)
|
|
t.start()
|
|
ready.wait()
|
|
cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
try:
|
|
sess = client_handshake(
|
|
cli, ("127.0.0.1", port_holder[0]), zkac.Node(client_kp), server_pk, cred
|
|
)
|
|
cf = FramedSession(cli, sess)
|
|
cf.send(b"ping")
|
|
assert cf.recv() == b"pong: ping"
|
|
finally:
|
|
cli.close()
|
|
t.join(timeout=5.0)
|
|
assert not err, err
|