92 lines
3.3 KiB
Python
92 lines
3.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
ZKAC client for client-managed registries.
|
|
|
|
1. Issues a credential locally (admin issues to self — in production this
|
|
would go through the server's E2E-encrypted issuance relay).
|
|
2. Verifies the registry state certificate.
|
|
3. Authenticates via managed-registry handshake.
|
|
4. Sends a JSON request over the encrypted session.
|
|
|
|
Usage:
|
|
python client_managed.py --role analyst
|
|
python client_managed.py --role operator
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import socket
|
|
from pathlib import Path
|
|
|
|
import zkac
|
|
from zkac.tcp import FramedSession, client_handshake_managed
|
|
|
|
|
|
def main() -> None:
|
|
ap = argparse.ArgumentParser(description="ZKAC managed-registry client")
|
|
ap.add_argument("--creds-dir", type=Path,
|
|
default=Path(__file__).resolve().parent / "creds")
|
|
ap.add_argument("--role", default="analyst", choices=["analyst", "operator"])
|
|
ap.add_argument("--host", default="127.0.0.1")
|
|
ap.add_argument("--port", type=int, default=9877)
|
|
args = ap.parse_args()
|
|
|
|
creds_dir: Path = args.creds_dir
|
|
admin_data = json.loads((creds_dir / "managed_admin.json").read_text(encoding="utf-8"))
|
|
reg_data = json.loads((creds_dir / "managed_registry.json").read_text(encoding="utf-8"))
|
|
transport_data = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
|
|
|
|
# Reconstruct admin issuer to issue a credential for the chosen role.
|
|
# In production, this would be done via the E2E issuance relay.
|
|
admin_issuer = zkac.BbsIssuer.from_secret_key(
|
|
base64.b64decode(admin_data["admin_issuer_secret_b64"])
|
|
)
|
|
admin_pk = admin_issuer.public_key()
|
|
|
|
role_rid = zkac.role_id(args.role)
|
|
req = zkac.prepare_blind_request()
|
|
blind_sig = admin_issuer.issue_blind(req.commitment_with_proof(), role_rid, 1)
|
|
cred = zkac.Credential.finalize(
|
|
blind_sig, req.member_secret(), req.prover_blind(), role_rid, 1, admin_pk
|
|
)
|
|
|
|
# Verify registry state certificate before trusting it
|
|
state_bytes = base64.b64decode(reg_data["state_bytes_b64"])
|
|
state_cert = base64.b64decode(reg_data["state_cert_b64"])
|
|
registry_id = bytes.fromhex(reg_data["registry_id_hex"])
|
|
|
|
issuer_pk_for_verify = zkac.BbsPublicKey.from_bytes(
|
|
base64.b64decode(reg_data["admin_issuer_public_key_b64"])
|
|
)
|
|
expected_rid = zkac.registry_id(issuer_pk_for_verify)
|
|
assert expected_rid == registry_id, "registry_id mismatch"
|
|
assert zkac.RegistryState.verify_cert(issuer_pk_for_verify, state_cert, state_bytes), \
|
|
"state certificate verification failed"
|
|
print(f"Registry state verified (registry_id={registry_id.hex()[:16]}…)")
|
|
|
|
# Connect and authenticate
|
|
server_pk = zkac.PublicKey.from_bytes(
|
|
base64.b64decode(transport_data["server_public_key_b64"])
|
|
)
|
|
node = zkac.Node(zkac.Keypair())
|
|
|
|
sock = socket.create_connection((args.host, args.port))
|
|
try:
|
|
session = client_handshake_managed(
|
|
sock, node, server_pk, cred, registry_id
|
|
)
|
|
framed = FramedSession(sock, session)
|
|
|
|
framed.send(json.dumps({"path": "/api"}).encode())
|
|
reply = json.loads(framed.recv().decode())
|
|
print(json.dumps(reply, indent=2))
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|