add cli revoke role

This commit is contained in:
everbarry 2026-05-06 17:17:22 +02:00
parent 3a3bd30e03
commit 6ce6bf5675
2 changed files with 116 additions and 29 deletions

View File

@ -240,6 +240,51 @@ def _ok(resp: dict) -> dict:
return resp return resp
def _validated_admin_update_base(
userid: str,
server: str,
registry_id_hex: str,
) -> tuple[dict, zkac.BbsPublicKey, zkac.Credential, bytes, int]:
admin_data = store.load_admin(userid, registry_id_hex)
_bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data)
cur = _mgmt_single(
userid,
server,
{"cmd": "get_registry", "registry_id": registry_id_hex},
auth_registry_id=registry_id_hex,
admin_cred=admin_cred,
)
old_state = zkac.RegistryState.deserialize(_unb64(cur["state_bytes_b64"]))
prev_hash = bytes(old_state.state_hash())
server_version = old_state.version()
local_version = admin_data.get("last_known_version")
local_hash_b64 = admin_data.get("last_known_state_hash_b64")
if local_version != server_version:
raise RuntimeError(
"local admin metadata is stale versus server state version; "
"refetch/synchronize admin metadata before updating"
)
if not isinstance(local_hash_b64, str) or _unb64(local_hash_b64) != prev_hash:
raise RuntimeError(
"local admin metadata state hash mismatch; refusing update to avoid accidental role/state clobber"
)
return admin_data, bbs_pk, admin_cred, prev_hash, server_version + 1
def _normalize_role_epochs(all_roles: list[str], raw_role_epochs: object) -> dict[str, int]:
role_epochs: dict[str, int] = {}
if isinstance(raw_role_epochs, dict):
for role in all_roles:
value = raw_role_epochs.get(role, 1)
role_epochs[role] = max(int(value), 1)
else:
for role in all_roles:
role_epochs[role] = 1
return role_epochs
# ── Public operations ──────────────────────────────────────────────── # ── Public operations ────────────────────────────────────────────────
def create_registry(userid: str, server: str, role_names: list[str]) -> str: def create_registry(userid: str, server: str, role_names: list[str]) -> str:
@ -274,36 +319,18 @@ def create_registry(userid: str, server: str, role_names: list[str]) -> str:
def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: list[str]): def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: list[str]):
admin_data = store.load_admin(userid, registry_id_hex) admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base(
bbs_issuer, bbs_pk, admin_cred = store.reconstruct_admin(admin_data) userid, server, registry_id_hex,
)
identity = store.load_identity(userid) identity = store.load_identity(userid)
cur = _mgmt_single(userid, server, {
"cmd": "get_registry", "registry_id": registry_id_hex,
}, auth_registry_id=registry_id_hex, admin_cred=admin_cred)
old_state = zkac.RegistryState.deserialize(_unb64(cur["state_bytes_b64"]))
prev_hash = old_state.state_hash()
server_version = old_state.version()
local_version = admin_data.get("last_known_version")
local_hash_b64 = admin_data.get("last_known_state_hash_b64")
if local_version != server_version:
raise RuntimeError(
"local admin metadata is stale versus server state version; "
"refetch/synchronize admin metadata before updating"
)
if not isinstance(local_hash_b64, str) or _unb64(local_hash_b64) != bytes(prev_hash):
raise RuntimeError(
"local admin metadata state hash mismatch; refusing update to avoid accidental role/state clobber"
)
new_version = old_state.version() + 1
old_roles = admin_data.get("roles", []) old_roles = admin_data.get("roles", [])
all_roles = list(old_roles) + [r for r in add_roles if r not in old_roles] all_roles = list(old_roles) + [r for r in add_roles if r not in old_roles]
role_entries = [(zkac.role_id(name), bbs_pk, 1) for name in all_roles] role_epochs = _normalize_role_epochs(all_roles, admin_data.get("role_epochs", {}))
role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in all_roles]
new_state = zkac.RegistryState.build( new_state = zkac.RegistryState.build(
bbs_pk, identity["issuance_pk"], new_version, bytes(prev_hash), role_entries, bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries,
) )
new_cert = new_state.certify(admin_cred) new_cert = new_state.certify(admin_cred)
@ -315,12 +342,45 @@ def update_registry(userid: str, server: str, registry_id_hex: str, add_roles: l
}, auth_registry_id=registry_id_hex, admin_cred=admin_cred) }, auth_registry_id=registry_id_hex, admin_cred=admin_cred)
admin_data["roles"] = all_roles admin_data["roles"] = all_roles
role_epochs = admin_data.get("role_epochs", {})
if not isinstance(role_epochs, dict):
role_epochs = {}
for name in all_roles: for name in all_roles:
if name not in role_epochs: role_epochs.setdefault(name, 1)
role_epochs[name] = 1 admin_data["role_epochs"] = role_epochs
admin_data["last_known_version"] = new_version
admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash())
store.save_admin(userid, registry_id_hex, admin_data)
def revoke_registry(userid: str, server: str, registry_id_hex: str, role_name: str | None = None):
admin_data, bbs_pk, admin_cred, prev_hash, new_version = _validated_admin_update_base(
userid, server, registry_id_hex,
)
identity = store.load_identity(userid)
roles = admin_data.get("roles", [])
if not roles:
raise RuntimeError("registry has no roles to revoke")
role_epochs = _normalize_role_epochs(roles, admin_data.get("role_epochs", {}))
if role_name is None:
for role in roles:
role_epochs[role] += 1
else:
if role_name not in roles:
raise RuntimeError(f"role {role_name!r} not in registry (have: {roles})")
role_epochs[role_name] += 1
role_entries = [(zkac.role_id(name), bbs_pk, role_epochs[name]) for name in roles]
new_state = zkac.RegistryState.build(
bbs_pk, identity["issuance_pk"], new_version, prev_hash, role_entries,
)
new_cert = new_state.certify(admin_cred)
_mgmt_single(userid, server, {
"cmd": "update_registry",
"registry_id": registry_id_hex,
"state_bytes_b64": _b64(new_state.serialize()),
"state_cert_b64": _b64(bytes(new_cert)),
}, auth_registry_id=registry_id_hex, admin_cred=admin_cred)
admin_data["role_epochs"] = role_epochs admin_data["role_epochs"] = role_epochs
admin_data["last_known_version"] = new_version admin_data["last_known_version"] = new_version
admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash()) admin_data["last_known_state_hash_b64"] = _b64(new_state.state_hash())

View File

@ -115,6 +115,25 @@ def _cmd_registry_list(args):
print(f" {r['registry_id']} @ {r['server']} roles={r['roles']}") print(f" {r['registry_id']} @ {r['server']} roles={r['roles']}")
def _cmd_registry_revoke(args):
if args.all and args.role:
raise RuntimeError("use either --role or --all, not both")
if not args.all and not args.role:
raise RuntimeError("missing target: pass --role <name> or --all")
client.revoke_registry(
args.userid,
args.server,
args.registry,
role_name=None if args.all else args.role,
)
if args.all:
print(f"registry revoked: {args.registry[:16]}")
print(" bumped epoch for all roles")
else:
print(f"registry revoked: {args.registry[:16]}")
print(f" bumped epoch for role: {args.role}")
# ── grant ───────────────────────────────────────────────────────────── # ── grant ─────────────────────────────────────────────────────────────
def _cmd_grant(args): def _cmd_grant(args):
@ -263,6 +282,14 @@ def main():
c.add_argument("userid") c.add_argument("userid")
c.set_defaults(func=_cmd_registry_list) c.set_defaults(func=_cmd_registry_list)
c = reg_sub.add_parser("revoke", help="revoke credentials by bumping role epoch(s)")
c.add_argument("userid")
c.add_argument("server", help="host:port")
c.add_argument("--registry", required=True)
c.add_argument("--role", default=None, help="revoke a single role by bumping its epoch")
c.add_argument("--all", action="store_true", help="revoke all roles by bumping all epochs")
c.set_defaults(func=_cmd_registry_revoke)
# grant # grant
c = sub.add_parser("grant", help="issue a credential to a recipient (admin)") c = sub.add_parser("grant", help="issue a credential to a recipient (admin)")
c.add_argument("userid") c.add_argument("userid")