From 6ce6bf56754f9b19c95cb6f81e4600a621c73f33 Mon Sep 17 00:00:00 2001 From: everbarry Date: Wed, 6 May 2026 17:17:22 +0200 Subject: [PATCH] add cli revoke role --- cli/zkac_cli/client.py | 118 +++++++++++++++++++++++++++++++---------- cli/zkac_cli/main.py | 27 ++++++++++ 2 files changed, 116 insertions(+), 29 deletions(-) diff --git a/cli/zkac_cli/client.py b/cli/zkac_cli/client.py index 7bee4bd..153d0ce 100644 --- a/cli/zkac_cli/client.py +++ b/cli/zkac_cli/client.py @@ -240,6 +240,51 @@ def _ok(resp: dict) -> dict: return resp +def _validated_admin_update_base( + userid: str, + server: str, + registry_id_hex: str, +) -> tuple[dict, zkac.BbsPublicKey, zkac.Credential, bytes, int]: + admin_data = store.load_admin(userid, registry_id_hex) + _bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data) + cur = _mgmt_single( + userid, + server, + {"cmd": "get_registry", "registry_id": registry_id_hex}, + auth_registry_id=registry_id_hex, + admin_cred=admin_cred, + ) + + old_state = zkac.RegistryState.deserialize(_unb64(cur["state_bytes_b64"])) + prev_hash = bytes(old_state.state_hash()) + server_version = old_state.version() + local_version = admin_data.get("last_known_version") + local_hash_b64 = admin_data.get("last_known_state_hash_b64") + if local_version != server_version: + raise RuntimeError( + "local admin metadata is stale versus server state version; " + "refetch/synchronize admin metadata before updating" + ) + if not isinstance(local_hash_b64, str) or _unb64(local_hash_b64) != prev_hash: + raise RuntimeError( + "local admin metadata state hash mismatch; refusing update to avoid accidental role/state clobber" + ) + + return admin_data, bbs_pk, admin_cred, prev_hash, server_version + 1 + + +def _normalize_role_epochs(all_roles: list[str], raw_role_epochs: object) -> dict[str, int]: + role_epochs: dict[str, int] = {} + if isinstance(raw_role_epochs, dict): + for role in all_roles: + value = raw_role_epochs.get(role, 1) + role_epochs[role] = max(int(value), 1) + else: + for role in all_roles: + role_epochs[role] = 1 + return role_epochs + + # ── Public operations ──────────────────────────────────────────────── def create_registry(userid: str, server: str, role_names: list[str]) -> str: @@ -274,36 +319,18 @@ def create_registry(userid: str, server: str, role_names: list[str]) -> str: def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: list[str]): - admin_data = store.load_admin(userid, registry_id_hex) - bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data) + admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base( + userid, server, registry_id_hex, + ) identity = store.load_identity(userid) - cur = _mgmt_single(userid, server, { - "cmd": "get_registry", "registry_id": registry_id_hex, - }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) - - old_state = zkac.RegistryState.deserialize(_unb64(cur["state_bytes_b64"])) - prev_hash = old_state.state_hash() - server_version = old_state.version() - local_version = admin_data.get("last_known_version") - local_hash_b64 = admin_data.get("last_known_state_hash_b64") - if local_version != server_version: - raise RuntimeError( - "local admin metadata is stale versus server state version; " - "refetch/synchronize admin metadata before updating" - ) - if not isinstance(local_hash_b64, str) or _unb64(local_hash_b64) != bytes(prev_hash): - raise RuntimeError( - "local admin metadata state hash mismatch; refusing update to avoid accidental role/state clobber" - ) - new_version = old_state.version() + 1 - old_roles = admin_data.get("roles", []) all_roles = list(old_roles) + [r for r in add_roles if r not in old_roles] - role_entries = [(zkac.role_id(name), bbs_pk, 1) for name in all_roles] + role_epochs = _normalize_role_epochs(all_roles, admin_data.get("role_epochs", {})) + role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in all_roles] new_state = zkac.RegistryState.build( - bbs_pk, identity["issuance_pk"], new_version, bytes(prev_hash), role_entries, + bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries, ) new_cert = new_state.certify(admin_cred) @@ -315,12 +342,45 @@ def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: l }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) admin_data["roles"] = all_roles - role_epochs = admin_data.get("role_epochs", {}) - if not isinstance(role_epochs, dict): - role_epochs = {} for name in all_roles: - if name not in role_epochs: - role_epochs[name] = 1 + role_epochs.setdefault(name, 1) + admin_data["role_epochs"] = role_epochs + admin_data["last_known_version"] = new_version + admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash()) + store.save_admin(userid, registry_id_hex, admin_data) + + +def revoke_registry(userid: str, server: str, registry_id_hex: str, role_name: str | None = None): + admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base( + userid, server, registry_id_hex, + ) + identity = store.load_identity(userid) + roles = admin_data.get("roles", []) + if not roles: + raise RuntimeError("registry has no roles to revoke") + role_epochs = _normalize_role_epochs(roles, admin_data.get("role_epochs", {})) + + if role_name is None: + for role in roles: + role_epochs[role] += 1 + else: + if role_name not in roles: + raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})") + role_epochs[role_name] += 1 + + role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in roles] + new_state = zkac.RegistryState.build( + bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries, + ) + new_cert = new_state.certify(admin_cred) + + _mgmt_single(userid, server, { + "cmd": "update_registry", + "registry_id": registry_id_hex, + "state_bytes_b64": _b64(new_state.serialize()), + "state_cert_b64": _b64(bytes(new_cert)), + }, auth_registry_id=registry_id_hex, admin_cred=admin_cred) + admin_data["role_epochs"] = role_epochs admin_data["last_known_version"] = new_version admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash()) diff --git a/cli/zkac_cli/main.py b/cli/zkac_cli/main.py index 98cf76e..5131aa7 100644 --- a/cli/zkac_cli/main.py +++ b/cli/zkac_cli/main.py @@ -115,6 +115,25 @@ def _cmd_registry_list(args): print(f" {r['registry_id']} @ {r['server']} roles={r['roles']}") +def _cmd_registry_revoke(args): + if args.all and args.role: + raise RuntimeError("use either --role or --all, not both") + if not args.all and not args.role: + raise RuntimeError("missing target: pass --role or --all") + client.revoke_registry( + args.userid, + args.server, + args.registry, + role_name=None if args.all else args.role, + ) + if args.all: + print(f"registry revoked: {args.registry[:16]}…") + print(" bumped epoch for all roles") + else: + print(f"registry revoked: {args.registry[:16]}…") + print(f" bumped epoch for role: {args.role}") + + # ── grant ───────────────────────────────────────────────────────────── def _cmd_grant(args): @@ -263,6 +282,14 @@ def main(): c.add_argument("userid") c.set_defaults(func=_cmd_registry_list) + c = reg_sub.add_parser("revoke", help="revoke credentials by bumping role epoch(s)") + c.add_argument("userid") + c.add_argument("server", help="host:port") + c.add_argument("--registry", required=True) + c.add_argument("--role", default=None, help="revoke a single role by bumping its epoch") + c.add_argument("--all", action="store_true", help="revoke all roles by bumping all epochs") + c.set_defaults(func=_cmd_registry_revoke) + # grant c = sub.add_parser("grant", help="issue a credential to a recipient (admin)") c.add_argument("userid")