"""Persistent storage per local user id under ~/.zkac//.""" from __future__ import annotations import base64 import json from pathlib import Path import zkac from .paths import ensure_user, user_dir, zkac_home def _b64(data: bytes) -> str: return base64.b64encode(data).decode() def _unb64(s: str) -> bytes: return base64.b64decode(s) def _ud(userid: str) -> Path: return user_dir(userid) # ── User identity ──────────────────────────────────────────────────── def create_user(userid: str) -> Path: d = ensure_user(userid) p = d / "identity.json" if p.exists(): raise FileExistsError(f"user {userid!r} already exists at {d}") issuance_kp = zkac.IssuanceKeypair() identity = { "issuance_secret_b64": _b64(issuance_kp.secret_bytes()), "issuance_public_b64": _b64(issuance_kp.public_key_bytes()), } p.write_text(json.dumps(identity, indent=2)) for sub in ("admin", "credentials", "servers"): (d / sub).mkdir(exist_ok=True) return d def load_identity(userid: str) -> dict: data = json.loads((_ud(userid) / "identity.json").read_text()) return { "issuance_sk": _unb64(data["issuance_secret_b64"]), "issuance_pk": _unb64(data["issuance_public_b64"]), } def list_users() -> list[str]: home = zkac_home() if not home.exists(): return [] return sorted( d.name for d in home.iterdir() if d.is_dir() and (d / "identity.json").exists() ) # ── Server pins (per user) ─────────────────────────────────────────── def _server_key(server: str) -> str: return server.replace(":", "_") def pin_server(userid: str, server: str, server_pk_b64: str): d = _ud(userid) / "servers" d.mkdir(parents=True, exist_ok=True) (d / f"{_server_key(server)}.json").write_text( json.dumps({"server_public_key_b64": server_pk_b64}, indent=2) ) def load_server_pin(userid: str, server: str) -> dict | None: p = _ud(userid) / "servers" / f"{_server_key(server)}.json" if not p.exists(): return None return json.loads(p.read_text()) def known_servers(userid: str) -> list[str]: """Unique server addresses from pinned registries + pin files.""" seen: list[str] = [] d = _ud(userid) / "servers" if d.exists(): for p in d.glob("*.json"): host_port = p.stem.replace("_", ":") if host_port not in seen: seen.append(host_port) for rid in list_admin_registries(userid): try: reg = load_admin(userid, rid) srv = reg.get("server") if srv and srv not in seen: seen.append(srv) except FileNotFoundError: continue return seen # ── Admin bundles (per registry, per user) ──────────────────────────── def new_admin_material() -> dict: bbs_issuer = zkac.BbsIssuer() bbs_pk = bbs_issuer.public_key() admin_rid = zkac.admin_role_id() req = zkac.prepare_blind_request() sig = bbs_issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0) return { "bbs_issuer_secret_b64": _b64(bbs_issuer.secret_key_bytes()), "bbs_issuer_public_b64": _b64(bbs_pk.to_bytes()), "admin_blind_sig_b64": _b64(sig), "admin_member_secret_b64": _b64(req.member_secret()), "admin_prover_blind_b64": _b64(req.prover_blind()), } def reconstruct_admin(data: dict) -> tuple: bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(data["bbs_issuer_secret_b64"])) bbs_pk = bbs_issuer.public_key() admin_cred = zkac.Credential.finalize( _unb64(data["admin_blind_sig_b64"]), _unb64(data["admin_member_secret_b64"]), _unb64(data["admin_prover_blind_b64"]), zkac.admin_role_id(), 0, bbs_pk, ) return bbs_issuer, bbs_pk, admin_cred def save_admin(userid: str, registry_id_hex: str, info: dict): d = _ud(userid) / "admin" d.mkdir(parents=True, exist_ok=True) (d / f"{registry_id_hex}.json").write_text(json.dumps(info, indent=2)) def load_admin(userid: str, registry_id_hex: str) -> dict: return json.loads((_ud(userid) / "admin" / f"{registry_id_hex}.json").read_text()) def list_admin_registries(userid: str) -> list[str]: d = _ud(userid) / "admin" if not d.exists(): return [] return sorted(p.stem for p in d.glob("*.json")) # ── Credentials ─────────────────────────────────────────────────────── def save_credential(userid: str, registry_id_hex: str, role_name: str, cred_data: dict): d = _ud(userid) / "credentials" d.mkdir(parents=True, exist_ok=True) (d / f"{registry_id_hex}_{role_name}.json").write_text(json.dumps(cred_data, indent=2)) def load_credential_data(userid: str, registry_id_hex: str, role_name: str) -> dict: return json.loads( (_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").read_text() ) def has_credential(userid: str, registry_id_hex: str, role_name: str) -> bool: return (_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").exists() def reconstruct_credential(cred_data: dict) -> zkac.Credential: pk = zkac.BbsPublicKey.from_bytes(_unb64(cred_data["issuer_pk_b64"])) return zkac.Credential.finalize( _unb64(cred_data["blind_sig_b64"]), _unb64(cred_data["member_secret_b64"]), _unb64(cred_data["prover_blind_b64"]), zkac.role_id(cred_data["role_name"]), cred_data["epoch"], pk, ) def list_credentials(userid: str) -> list[tuple[str, str]]: d = _ud(userid) / "credentials" if not d.exists(): return [] result = [] for p in sorted(d.glob("*.json")): parts = p.stem.rsplit("_", 1) if len(parts) == 2: result.append((parts[0], parts[1])) return result