ZKAC/cli/zkac_cli/store.py
everbarry d01a6ebf85 v0.3
2026-04-17 14:10:52 +02:00

182 lines
5.7 KiB
Python

"""Persistent storage for ZKAC identities, admin bundles, credentials, and server pins.
Layout (single identity, no userid):
~/.zkac/identity.json issuance keypair (long-term client secret)
~/.zkac/admin/<registry_id>.json BBS+ issuer + admin credential per owned registry
~/.zkac/credentials/<rid>_<role>.json BBS+ credentials granted to this identity
~/.zkac/servers/<host_port>.json pinned server public keys
"""
from __future__ import annotations
import base64
import json
from pathlib import Path
import zkac
from .paths import zkac_home
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode()
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _home() -> Path:
d = zkac_home()
d.mkdir(parents=True, exist_ok=True)
return d
# ── Identity (single, no userid) ─────────────────────────────────────
def create_identity() -> Path:
d = _home()
p = d / "identity.json"
if p.exists():
raise FileExistsError(f"identity already exists at {p}")
issuance_kp = zkac.IssuanceKeypair()
identity = {
"issuance_secret_b64": _b64(issuance_kp.secret_bytes()),
"issuance_public_b64": _b64(issuance_kp.public_key_bytes()),
}
p.write_text(json.dumps(identity, indent=2))
for sub in ("admin", "credentials", "servers"):
(d / sub).mkdir(exist_ok=True)
return d
def load_identity() -> dict:
data = json.loads((_home() / "identity.json").read_text())
return {
"issuance_sk": _unb64(data["issuance_secret_b64"]),
"issuance_pk": _unb64(data["issuance_public_b64"]),
}
def identity_exists() -> bool:
return (_home() / "identity.json").exists()
# ── Server pins ───────────────────────────────────────────────────────
def _server_key(server: str) -> str:
return server.replace(":", "_")
def pin_server(server: str, server_pk_b64: str):
d = _home() / "servers"
d.mkdir(exist_ok=True)
(d / f"{_server_key(server)}.json").write_text(
json.dumps({"server_public_key_b64": server_pk_b64}, indent=2)
)
def load_server_pin(server: str) -> dict | None:
p = _home() / "servers" / f"{_server_key(server)}.json"
if not p.exists():
return None
return json.loads(p.read_text())
def list_pinned_servers() -> list[str]:
d = _home() / "servers"
if not d.exists():
return []
return sorted(p.stem.replace("_", ":") for p in d.glob("*.json"))
# ── Admin bundles (per registry) ──────────────────────────────────────
def new_admin_material() -> dict:
bbs_issuer = zkac.BbsIssuer()
bbs_pk = bbs_issuer.public_key()
admin_rid = zkac.admin_role_id()
req = zkac.prepare_blind_request()
sig = bbs_issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
return {
"bbs_issuer_secret_b64": _b64(bbs_issuer.secret_key_bytes()),
"bbs_issuer_public_b64": _b64(bbs_pk.to_bytes()),
"admin_blind_sig_b64": _b64(sig),
"admin_member_secret_b64": _b64(req.member_secret()),
"admin_prover_blind_b64": _b64(req.prover_blind()),
}
def reconstruct_admin(data: dict) -> tuple:
bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(data["bbs_issuer_secret_b64"]))
bbs_pk = bbs_issuer.public_key()
admin_cred = zkac.Credential.finalize(
_unb64(data["admin_blind_sig_b64"]),
_unb64(data["admin_member_secret_b64"]),
_unb64(data["admin_prover_blind_b64"]),
zkac.admin_role_id(), 0, bbs_pk,
)
return bbs_issuer, bbs_pk, admin_cred
def save_admin(registry_id_hex: str, info: dict):
d = _home() / "admin"
d.mkdir(exist_ok=True)
(d / f"{registry_id_hex}.json").write_text(json.dumps(info, indent=2))
def load_admin(registry_id_hex: str) -> dict:
return json.loads((_home() / "admin" / f"{registry_id_hex}.json").read_text())
def list_admin_registries() -> list[str]:
d = _home() / "admin"
if not d.exists():
return []
return sorted(p.stem for p in d.glob("*.json"))
# ── Credentials ───────────────────────────────────────────────────────
def save_credential(registry_id_hex: str, role_name: str, cred_data: dict):
d = _home() / "credentials"
d.mkdir(exist_ok=True)
(d / f"{registry_id_hex}_{role_name}.json").write_text(json.dumps(cred_data, indent=2))
def load_credential_data(registry_id_hex: str, role_name: str) -> dict:
return json.loads(
(_home() / "credentials" / f"{registry_id_hex}_{role_name}.json").read_text()
)
def has_credential(registry_id_hex: str, role_name: str) -> bool:
return (_home() / "credentials" / f"{registry_id_hex}_{role_name}.json").exists()
def reconstruct_credential(cred_data: dict) -> zkac.Credential:
pk = zkac.BbsPublicKey.from_bytes(_unb64(cred_data["issuer_pk_b64"]))
return zkac.Credential.finalize(
_unb64(cred_data["blind_sig_b64"]),
_unb64(cred_data["member_secret_b64"]),
_unb64(cred_data["prover_blind_b64"]),
zkac.role_id(cred_data["role_name"]),
cred_data["epoch"],
pk,
)
def list_credentials() -> list[tuple[str, str]]:
d = _home() / "credentials"
if not d.exists():
return []
result = []
for p in sorted(d.glob("*.json")):
parts = p.stem.rsplit("_", 1)
if len(parts) == 2:
result.append((parts[0], parts[1]))
return result