"""Persistent storage per local user id under ~/.zkac//.""" from __future__ import annotations import base64 import hashlib import json import os import secrets from pathlib import Path import zkac from .paths import ensure_user, user_dir, zkac_home def _b64(data: bytes) -> str: return base64.b64encode(data).decode() def _unb64(s: str) -> bytes: return base64.b64decode(s) def _ud(userid: str) -> Path: return user_dir(userid) def _chmod_if_possible(path: Path, mode: int): try: os.chmod(path, mode) except OSError: pass def _ensure_private_dir(path: Path): path.mkdir(parents=True, exist_ok=True) _chmod_if_possible(path, 0o700) def _write_private_json(path: Path, payload: dict): path.write_text(json.dumps(payload, indent=2)) _chmod_if_possible(path, 0o600) # ── User identity ──────────────────────────────────────────────────── def create_user(userid: str) -> Path: d = ensure_user(userid) _chmod_if_possible(d, 0o700) p = d / "identity.json" if p.exists(): raise FileExistsError(f"user {userid!r} already exists at {d}") issuance_kp = zkac.IssuanceKeypair() transport_kp = zkac.Keypair() identity = { "issuance_secret_b64": _b64(issuance_kp.secret_bytes()), "issuance_public_b64": _b64(issuance_kp.public_key_bytes()), "transport_secret_b64": _b64(transport_kp.secret_key_bytes()), "transport_public_b64": _b64(transport_kp.public_key().to_bytes()), "grant_token_b64": _b64(secrets.token_bytes(32)), } _write_private_json(p, identity) for sub in ("admin", "credentials", "servers"): _ensure_private_dir(d / sub) return d def load_identity(userid: str) -> dict: p = _ud(userid) / "identity.json" data = json.loads(p.read_text()) return { "issuance_sk": _unb64(data["issuance_secret_b64"]), "issuance_pk": _unb64(data["issuance_public_b64"]), "transport_sk": _unb64(data["transport_secret_b64"]), "transport_pk": _unb64(data["transport_public_b64"]), "grant_token": _unb64(data["grant_token_b64"]), } def export_contact_bundle(userid: str, peer: str | None = None) -> str: """One-string public contact bundle for out-of-band sharing.""" ident = load_identity(userid) payload = { "v": 3, "issuance_pk_hex": ident["issuance_pk"].hex(), "transport_pk_hex": ident["transport_pk"].hex(), "grant_token_b64": _b64(ident["grant_token"]), } if peer: payload["peer"] = peer raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8") return base64.urlsafe_b64encode(raw).decode().rstrip("=") def parse_contact_bundle(bundle: str) -> dict: """Parse and validate a contact bundle into key hex fields.""" try: s = bundle.strip() padding = "=" * (-len(s) % 4) raw = base64.urlsafe_b64decode((s + padding).encode()) data = json.loads(raw.decode("utf-8")) except Exception as exc: raise ValueError("invalid contact bundle encoding") from exc version = data.get("v") if version != 3: raise ValueError("unsupported contact bundle version (requires v3)") issuance_hex = data.get("issuance_pk_hex", "") transport_hex = data.get("transport_pk_hex", "") if not isinstance(issuance_hex, str) or not isinstance(transport_hex, str): raise ValueError("invalid contact bundle fields") try: issuance = bytes.fromhex(issuance_hex) transport = bytes.fromhex(transport_hex) except ValueError as exc: raise ValueError("contact bundle keys must be hex") from exc if len(issuance) != 32: raise ValueError("issuance public key must be 32 bytes") if len(transport) != 32: raise ValueError("transport public key must be 32 bytes") parsed = { "issuance_pk_hex": issuance_hex, "transport_pk_hex": transport_hex, } tok = data.get("grant_token_b64", "") if not isinstance(tok, str): raise ValueError("invalid contact bundle grant token field") try: token = _unb64(tok) except Exception as exc: raise ValueError("invalid contact bundle grant token encoding") from exc if len(token) != 32: raise ValueError("grant token must decode to 32 bytes") parsed["grant_token_b64"] = tok peer = data.get("peer") if peer is not None and not isinstance(peer, str): raise ValueError("invalid contact bundle peer field") if isinstance(peer, str) and peer.strip(): parsed["peer"] = peer.strip() return parsed def list_users() -> list[str]: home = zkac_home() if not home.exists(): return [] return sorted( d.name for d in home.iterdir() if d.is_dir() and (d / "identity.json").exists() ) # ── Server pins (per user) ─────────────────────────────────────────── def _server_key(server: str) -> str: digest = hashlib.sha256(server.encode("utf-8")).hexdigest() return f"sha256_{digest}" def pin_server(userid: str, server: str, server_pk_b64: str): d = _ud(userid) / "servers" _ensure_private_dir(d) _write_private_json( d / f"{_server_key(server)}.json", {"server": server, "server_public_key_b64": server_pk_b64}, ) def load_server_pin(userid: str, server: str) -> dict | None: d = _ud(userid) / "servers" new_path = d / f"{_server_key(server)}.json" if new_path.exists(): return json.loads(new_path.read_text()) return None def known_servers(userid: str) -> list[str]: """Unique server addresses from pinned registries + pin files.""" seen: list[str] = [] d = _ud(userid) / "servers" if d.exists(): for p in d.glob("*.json"): try: data = json.loads(p.read_text()) except Exception: continue server = data.get("server") if isinstance(server, str) and server and server not in seen: seen.append(server) for rid in list_admin_registries(userid): try: reg = load_admin(userid, rid) srv = reg.get("server") if srv and srv not in seen: seen.append(srv) except FileNotFoundError: continue return seen # ── Admin bundles (per registry, per user) ──────────────────────────── def new_admin_material() -> dict: bbs_issuer = zkac.BbsIssuer() bbs_pk = bbs_issuer.public_key() admin_rid = zkac.admin_role_id() req = zkac.prepare_blind_request() sig = bbs_issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0) return { "bbs_issuer_secret_b64": _b64(bbs_issuer.secret_key_bytes()), "bbs_issuer_public_b64": _b64(bbs_pk.to_bytes()), "admin_blind_sig_b64": _b64(sig), "admin_member_secret_b64": _b64(req.member_secret()), "admin_prover_blind_b64": _b64(req.prover_blind()), } def reconstruct_admin(data: dict) -> tuple: bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(data["bbs_issuer_secret_b64"])) bbs_pk = bbs_issuer.public_key() admin_cred = zkac.Credential.finalize( _unb64(data["admin_blind_sig_b64"]), _unb64(data["admin_member_secret_b64"]), _unb64(data["admin_prover_blind_b64"]), zkac.admin_role_id(), 0, bbs_pk, ) return bbs_issuer, bbs_pk, admin_cred def save_admin(userid: str, registry_id_hex: str, info: dict): d = _ud(userid) / "admin" _ensure_private_dir(d) _write_private_json(d / f"{registry_id_hex}.json", info) def load_admin(userid: str, registry_id_hex: str) -> dict: return json.loads((_ud(userid) / "admin" / f"{registry_id_hex}.json").read_text()) def list_admin_registries(userid: str) -> list[str]: d = _ud(userid) / "admin" if not d.exists(): return [] return sorted(p.stem for p in d.glob("*.json")) # ── Credentials ─────────────────────────────────────────────────────── def save_credential(userid: str, registry_id_hex: str, role_name: str, cred_data: dict): d = _ud(userid) / "credentials" _ensure_private_dir(d) _write_private_json(d / f"{registry_id_hex}_{role_name}.json", cred_data) def load_credential_data(userid: str, registry_id_hex: str, role_name: str) -> dict: return json.loads( (_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").read_text() ) def has_credential(userid: str, registry_id_hex: str, role_name: str) -> bool: return (_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").exists() def reconstruct_credential(cred_data: dict) -> zkac.Credential: pk = zkac.BbsPublicKey.from_bytes(_unb64(cred_data["issuer_pk_b64"])) return zkac.Credential.finalize( _unb64(cred_data["blind_sig_b64"]), _unb64(cred_data["member_secret_b64"]), _unb64(cred_data["prover_blind_b64"]), zkac.role_id(cred_data["role_name"]), cred_data["epoch"], pk, ) def list_credentials(userid: str) -> list[tuple[str, str]]: d = _ud(userid) / "credentials" if not d.exists(): return [] result = [] for p in sorted(d.glob("*.json")): parts = p.stem.rsplit("_", 1) if len(parts) == 2: result.append((parts[0], parts[1])) return result