ZKAC/cli/zkac_cli/store.py
2026-05-06 16:35:09 +02:00

293 lines
9.7 KiB
Python

"""Persistent storage per local user id under ~/.zkac/<userid>/."""
from __future__ import annotations
import base64
import json
import os
import secrets
from pathlib import Path
import zkac
from .paths import ensure_user, user_dir, zkac_home
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode()
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _ud(userid: str) -> Path:
return user_dir(userid)
def _chmod_if_possible(path: Path, mode: int):
try:
os.chmod(path, mode)
except OSError:
pass
def _ensure_private_dir(path: Path):
path.mkdir(parents=True, exist_ok=True)
_chmod_if_possible(path, 0o700)
def _write_private_json(path: Path, payload: dict):
path.write_text(json.dumps(payload, indent=2))
_chmod_if_possible(path, 0o600)
# ── User identity ────────────────────────────────────────────────────
def create_user(userid: str) -> Path:
d = ensure_user(userid)
_chmod_if_possible(d, 0o700)
p = d / "identity.json"
if p.exists():
raise FileExistsError(f"user {userid!r} already exists at {d}")
issuance_kp = zkac.IssuanceKeypair()
transport_kp = zkac.Keypair()
identity = {
"issuance_secret_b64": _b64(issuance_kp.secret_bytes()),
"issuance_public_b64": _b64(issuance_kp.public_key_bytes()),
"transport_secret_b64": _b64(transport_kp.secret_key_bytes()),
"transport_public_b64": _b64(transport_kp.public_key().to_bytes()),
"grant_token_b64": _b64(secrets.token_bytes(32)),
}
_write_private_json(p, identity)
for sub in ("admin", "credentials", "servers"):
_ensure_private_dir(d / sub)
return d
def load_identity(userid: str) -> dict:
p = _ud(userid) / "identity.json"
data = json.loads(p.read_text())
changed = False
if "transport_secret_b64" not in data or "transport_public_b64" not in data:
transport_kp = zkac.Keypair()
data["transport_secret_b64"] = _b64(transport_kp.secret_key_bytes())
data["transport_public_b64"] = _b64(transport_kp.public_key().to_bytes())
changed = True
if "grant_token_b64" not in data:
data["grant_token_b64"] = _b64(secrets.token_bytes(32))
changed = True
if changed:
_write_private_json(p, data)
return {
"issuance_sk": _unb64(data["issuance_secret_b64"]),
"issuance_pk": _unb64(data["issuance_public_b64"]),
"transport_sk": _unb64(data["transport_secret_b64"]),
"transport_pk": _unb64(data["transport_public_b64"]),
"grant_token": _unb64(data["grant_token_b64"]),
}
def export_contact_bundle(userid: str, peer: str | None = None) -> str:
"""One-string public contact bundle for out-of-band sharing."""
ident = load_identity(userid)
payload = {
"v": 3,
"issuance_pk_hex": ident["issuance_pk"].hex(),
"transport_pk_hex": ident["transport_pk"].hex(),
"grant_token_b64": _b64(ident["grant_token"]),
}
if peer:
payload["peer"] = peer
raw = json.dumps(payload, separators=(",", ":"), sort_keys=True).encode("utf-8")
return base64.urlsafe_b64encode(raw).decode().rstrip("=")
def parse_contact_bundle(bundle: str) -> dict:
"""Parse and validate a contact bundle into key hex fields."""
try:
s = bundle.strip()
padding = "=" * (-len(s) % 4)
raw = base64.urlsafe_b64decode((s + padding).encode())
data = json.loads(raw.decode("utf-8"))
except Exception as exc:
raise ValueError("invalid contact bundle encoding") from exc
version = data.get("v")
if version != 3:
raise ValueError("unsupported contact bundle version (requires v3)")
issuance_hex = data.get("issuance_pk_hex", "")
transport_hex = data.get("transport_pk_hex", "")
if not isinstance(issuance_hex, str) or not isinstance(transport_hex, str):
raise ValueError("invalid contact bundle fields")
try:
issuance = bytes.fromhex(issuance_hex)
transport = bytes.fromhex(transport_hex)
except ValueError as exc:
raise ValueError("contact bundle keys must be hex") from exc
if len(issuance) != 32:
raise ValueError("issuance public key must be 32 bytes")
if len(transport) != 32:
raise ValueError("transport public key must be 32 bytes")
parsed = {
"issuance_pk_hex": issuance_hex,
"transport_pk_hex": transport_hex,
}
tok = data.get("grant_token_b64", "")
if not isinstance(tok, str):
raise ValueError("invalid contact bundle grant token field")
try:
token = _unb64(tok)
except Exception as exc:
raise ValueError("invalid contact bundle grant token encoding") from exc
if len(token) != 32:
raise ValueError("grant token must decode to 32 bytes")
parsed["grant_token_b64"] = tok
peer = data.get("peer")
if peer is not None and not isinstance(peer, str):
raise ValueError("invalid contact bundle peer field")
if isinstance(peer, str) and peer.strip():
parsed["peer"] = peer.strip()
return parsed
def list_users() -> list[str]:
home = zkac_home()
if not home.exists():
return []
return sorted(
d.name for d in home.iterdir()
if d.is_dir() and (d / "identity.json").exists()
)
# ── Server pins (per user) ───────────────────────────────────────────
def _server_key(server: str) -> str:
return server.replace(":", "_")
def pin_server(userid: str, server: str, server_pk_b64: str):
d = _ud(userid) / "servers"
_ensure_private_dir(d)
_write_private_json(d / f"{_server_key(server)}.json", {"server_public_key_b64": server_pk_b64})
def load_server_pin(userid: str, server: str) -> dict | None:
p = _ud(userid) / "servers" / f"{_server_key(server)}.json"
if not p.exists():
return None
return json.loads(p.read_text())
def known_servers(userid: str) -> list[str]:
"""Unique server addresses from pinned registries + pin files."""
seen: list[str] = []
d = _ud(userid) / "servers"
if d.exists():
for p in d.glob("*.json"):
host_port = p.stem.replace("_", ":")
if host_port not in seen:
seen.append(host_port)
for rid in list_admin_registries(userid):
try:
reg = load_admin(userid, rid)
srv = reg.get("server")
if srv and srv not in seen:
seen.append(srv)
except FileNotFoundError:
continue
return seen
# ── Admin bundles (per registry, per user) ────────────────────────────
def new_admin_material() -> dict:
bbs_issuer = zkac.BbsIssuer()
bbs_pk = bbs_issuer.public_key()
admin_rid = zkac.admin_role_id()
req = zkac.prepare_blind_request()
sig = bbs_issuer.issue_blind(req.commitment_with_proof(), admin_rid, 0)
return {
"bbs_issuer_secret_b64": _b64(bbs_issuer.secret_key_bytes()),
"bbs_issuer_public_b64": _b64(bbs_pk.to_bytes()),
"admin_blind_sig_b64": _b64(sig),
"admin_member_secret_b64": _b64(req.member_secret()),
"admin_prover_blind_b64": _b64(req.prover_blind()),
}
def reconstruct_admin(data: dict) -> tuple:
bbs_issuer = zkac.BbsIssuer.from_secret_key(_unb64(data["bbs_issuer_secret_b64"]))
bbs_pk = bbs_issuer.public_key()
admin_cred = zkac.Credential.finalize(
_unb64(data["admin_blind_sig_b64"]),
_unb64(data["admin_member_secret_b64"]),
_unb64(data["admin_prover_blind_b64"]),
zkac.admin_role_id(), 0, bbs_pk,
)
return bbs_issuer, bbs_pk, admin_cred
def save_admin(userid: str, registry_id_hex: str, info: dict):
d = _ud(userid) / "admin"
_ensure_private_dir(d)
_write_private_json(d / f"{registry_id_hex}.json", info)
def load_admin(userid: str, registry_id_hex: str) -> dict:
return json.loads((_ud(userid) / "admin" / f"{registry_id_hex}.json").read_text())
def list_admin_registries(userid: str) -> list[str]:
d = _ud(userid) / "admin"
if not d.exists():
return []
return sorted(p.stem for p in d.glob("*.json"))
# ── Credentials ───────────────────────────────────────────────────────
def save_credential(userid: str, registry_id_hex: str, role_name: str, cred_data: dict):
d = _ud(userid) / "credentials"
_ensure_private_dir(d)
_write_private_json(d / f"{registry_id_hex}_{role_name}.json", cred_data)
def load_credential_data(userid: str, registry_id_hex: str, role_name: str) -> dict:
return json.loads(
(_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").read_text()
)
def has_credential(userid: str, registry_id_hex: str, role_name: str) -> bool:
return (_ud(userid) / "credentials" / f"{registry_id_hex}_{role_name}.json").exists()
def reconstruct_credential(cred_data: dict) -> zkac.Credential:
pk = zkac.BbsPublicKey.from_bytes(_unb64(cred_data["issuer_pk_b64"]))
return zkac.Credential.finalize(
_unb64(cred_data["blind_sig_b64"]),
_unb64(cred_data["member_secret_b64"]),
_unb64(cred_data["prover_blind_b64"]),
zkac.role_id(cred_data["role_name"]),
cred_data["epoch"],
pk,
)
def list_credentials(userid: str) -> list[tuple[str, str]]:
d = _ud(userid) / "credentials"
if not d.exists():
return []
result = []
for p in sorted(d.glob("*.json")):
parts = p.stem.rsplit("_", 1)
if len(parts) == 2:
result.append((parts[0], parts[1]))
return result