ZKAC/demo/client_managed.py
everbarry 15998edb51 .3
2026-04-16 01:02:55 +02:00

92 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
ZKAC client for client-managed registries.
1. Issues a credential locally (admin issues to self — in production this
would go through the server's E2E-encrypted issuance relay).
2. Verifies the registry state certificate.
3. Authenticates via managed-registry handshake.
4. Sends a JSON request over the encrypted session.
Usage:
python client_managed.py --role analyst
python client_managed.py --role operator
"""
from __future__ import annotations
import argparse
import base64
import json
import socket
from pathlib import Path
import zkac
from zkac.tcp import FramedSession, client_handshake_managed
def main() -> None:
ap = argparse.ArgumentParser(description="ZKAC managed-registry client")
ap.add_argument("--creds-dir", type=Path,
default=Path(__file__).resolve().parent / "creds")
ap.add_argument("--role", default="analyst", choices=["analyst", "operator"])
ap.add_argument("--host", default="127.0.0.1")
ap.add_argument("--port", type=int, default=9877)
args = ap.parse_args()
creds_dir: Path = args.creds_dir
admin_data = json.loads((creds_dir / "managed_admin.json").read_text(encoding="utf-8"))
reg_data = json.loads((creds_dir / "managed_registry.json").read_text(encoding="utf-8"))
transport_data = json.loads((creds_dir / "transport.json").read_text(encoding="utf-8"))
# Reconstruct admin issuer to issue a credential for the chosen role.
# In production, this would be done via the E2E issuance relay.
admin_issuer = zkac.BbsIssuer.from_secret_key(
base64.b64decode(admin_data["admin_issuer_secret_b64"])
)
admin_pk = admin_issuer.public_key()
role_rid = zkac.role_id(args.role)
req = zkac.prepare_blind_request()
blind_sig = admin_issuer.issue_blind(req.commitment_with_proof(), role_rid, 1)
cred = zkac.Credential.finalize(
blind_sig, req.member_secret(), req.prover_blind(), role_rid, 1, admin_pk
)
# Verify registry state certificate before trusting it
state_bytes = base64.b64decode(reg_data["state_bytes_b64"])
state_cert = base64.b64decode(reg_data["state_cert_b64"])
registry_id = bytes.fromhex(reg_data["registry_id_hex"])
issuer_pk_for_verify = zkac.BbsPublicKey.from_bytes(
base64.b64decode(reg_data["admin_issuer_public_key_b64"])
)
expected_rid = zkac.registry_id(issuer_pk_for_verify)
assert expected_rid == registry_id, "registry_id mismatch"
assert zkac.RegistryState.verify_cert(issuer_pk_for_verify, state_cert, state_bytes), \
"state certificate verification failed"
print(f"Registry state verified (registry_id={registry_id.hex()[:16]}…)")
# Connect and authenticate
server_pk = zkac.PublicKey.from_bytes(
base64.b64decode(transport_data["server_public_key_b64"])
)
node = zkac.Node(zkac.Keypair())
sock = socket.create_connection((args.host, args.port))
try:
session = client_handshake_managed(
sock, node, server_pk, cred, registry_id
)
framed = FramedSession(sock, session)
framed.send(json.dumps({"path": "/api"}).encode())
reply = json.loads(framed.recv().decode())
print(json.dumps(reply, indent=2))
finally:
sock.close()
if __name__ == "__main__":
main()