Harden fs auth and storage for a trustless-server model: proof-only hello, opaque tagged bucket metadata, safer connection logging, and inbox UI without raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop tracking demo __pycache__ and fs_data artifacts. Co-authored-by: Cursor <cursoragent@cursor.com>
599 lines
21 KiB
Python
599 lines
21 KiB
Python
"""
|
|
ZKAC file-share client library (demo).
|
|
|
|
Reusable, UI-agnostic helpers used by ``file_share_tui.py`` and the smoke test
|
|
harness. The library performs:
|
|
|
|
* deterministic folder flattening and per-file content-key generation,
|
|
* per-role visibility bitmasks over the flattened index,
|
|
* opaque blob uploads to ``file_share_server.py``,
|
|
* role-credential authenticated download + decrypt against the same server.
|
|
|
|
Authenticated sessions piggy-back on the ZKAC TCP framing/handshake from
|
|
``zkac.tcp``; the ``zkac-node`` CLI is only used (separately) for identity,
|
|
registry and direct P2P credential grants.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import socket
|
|
import uuid
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Iterable, Iterator
|
|
|
|
import zkac
|
|
from zkac.tcp import FramedSession, client_handshake_anon
|
|
|
|
CHUNK_SIZE = 128 * 1024 # plaintext bytes per stored blob (well under MAX_TCP_FRAME_BYTES)
|
|
DEFAULT_CONNECT_TIMEOUT_S = 8.0
|
|
|
|
|
|
# ── small helpers ────────────────────────────────────────────────────
|
|
|
|
def _b64(data: bytes) -> str:
|
|
return base64.b64encode(data).decode("ascii")
|
|
|
|
|
|
def _unb64(s: str) -> bytes:
|
|
return base64.b64decode(s)
|
|
|
|
|
|
def _parse_server(server: str) -> tuple[str, int]:
|
|
host, sep, port_s = server.rpartition(":")
|
|
if not sep:
|
|
raise ValueError(f"invalid server {server!r}, expected host:port")
|
|
port = int(port_s, 10)
|
|
if not 1 <= port <= 65535:
|
|
raise ValueError(f"port out of range: {port}")
|
|
return (host or "127.0.0.1"), port
|
|
|
|
|
|
# ── data classes ─────────────────────────────────────────────────────
|
|
|
|
@dataclass
|
|
class FileChunk:
|
|
blob_id: str
|
|
eph_pk_b64: str # ephemeral pubkey used by the encrypt step (HPKE-style)
|
|
|
|
|
|
@dataclass
|
|
class FileEntry:
|
|
rel_path: str
|
|
size: int
|
|
sha256_b64: str
|
|
content_secret_b64: str # 32-byte X25519 secret -> derives the file's recipient kp
|
|
chunks: list[FileChunk] = field(default_factory=list)
|
|
|
|
def to_grant_dict(self) -> dict:
|
|
return {
|
|
"rel_path": self.rel_path,
|
|
"size": self.size,
|
|
"sha256_b64": self.sha256_b64,
|
|
"content_secret_b64": self.content_secret_b64,
|
|
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in self.chunks],
|
|
}
|
|
|
|
@classmethod
|
|
def from_grant_dict(cls, data: dict) -> "FileEntry":
|
|
return cls(
|
|
rel_path=data["rel_path"],
|
|
size=int(data["size"]),
|
|
sha256_b64=data["sha256_b64"],
|
|
content_secret_b64=data["content_secret_b64"],
|
|
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in data["chunks"]],
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class BucketManifest:
|
|
"""Admin-side bucket manifest, persisted locally on the bucket creator."""
|
|
|
|
bucket_id: str
|
|
server: str
|
|
registry_id_hex: str
|
|
files: list[FileEntry]
|
|
role_masks: dict[str, str] = field(default_factory=dict) # role -> bitmask string ('0'/'1')
|
|
recipients: list[dict] = field(default_factory=list) # local audit log only (no server identity binding)
|
|
|
|
def to_dict(self) -> dict:
|
|
return {
|
|
"bucket_id": self.bucket_id,
|
|
"server": self.server,
|
|
"registry_id_hex": self.registry_id_hex,
|
|
"files": [
|
|
{
|
|
"rel_path": f.rel_path,
|
|
"size": f.size,
|
|
"sha256_b64": f.sha256_b64,
|
|
"content_secret_b64": f.content_secret_b64,
|
|
"chunks": [{"blob_id": c.blob_id, "eph_pk_b64": c.eph_pk_b64} for c in f.chunks],
|
|
}
|
|
for f in self.files
|
|
],
|
|
"role_masks": self.role_masks,
|
|
"recipients": self.recipients,
|
|
}
|
|
|
|
@classmethod
|
|
def from_dict(cls, data: dict) -> "BucketManifest":
|
|
files = [
|
|
FileEntry(
|
|
rel_path=fd["rel_path"],
|
|
size=int(fd["size"]),
|
|
sha256_b64=fd["sha256_b64"],
|
|
content_secret_b64=fd["content_secret_b64"],
|
|
chunks=[FileChunk(c["blob_id"], c["eph_pk_b64"]) for c in fd["chunks"]],
|
|
)
|
|
for fd in data.get("files", [])
|
|
]
|
|
return cls(
|
|
bucket_id=data["bucket_id"],
|
|
server=data["server"],
|
|
registry_id_hex=data["registry_id_hex"],
|
|
files=files,
|
|
role_masks=dict(data.get("role_masks", {})),
|
|
recipients=list(data.get("recipients", [])),
|
|
)
|
|
|
|
|
|
# ── folder flattening + chunking ─────────────────────────────────────
|
|
|
|
def flatten_folder(folder: Path) -> list[Path]:
|
|
"""Deterministic relative-path order: sorted, files only, skipping hidden entries."""
|
|
folder = folder.resolve()
|
|
if not folder.is_dir():
|
|
raise NotADirectoryError(folder)
|
|
out: list[Path] = []
|
|
for path in sorted(folder.rglob("*")):
|
|
if not path.is_file():
|
|
continue
|
|
if any(part.startswith(".") for part in path.relative_to(folder).parts):
|
|
continue
|
|
out.append(path)
|
|
return sorted(out, key=lambda p: str(p.relative_to(folder)))
|
|
|
|
|
|
def _read_chunks(path: Path, chunk_size: int = CHUNK_SIZE) -> Iterator[bytes]:
|
|
with path.open("rb") as fh:
|
|
while True:
|
|
buf = fh.read(chunk_size)
|
|
if not buf:
|
|
return
|
|
yield buf
|
|
|
|
|
|
def _sha256_file(path: Path) -> bytes:
|
|
h = hashlib.sha256()
|
|
for buf in _read_chunks(path, 64 * 1024):
|
|
h.update(buf)
|
|
return h.digest()
|
|
|
|
|
|
def encrypt_file_to_blobs(path: Path) -> tuple[FileEntry, list[bytes]]:
|
|
"""Generate a per-file content keypair, encrypt each chunk to its public key.
|
|
|
|
Each chunk uses ``zkac.encrypt_for_admin`` so the matching
|
|
``IssuanceKeypair.decrypt`` on the recipient side succeeds (both bind the
|
|
``user->admin`` HKDF label).
|
|
|
|
Returns the file entry plus the parallel list of ciphertext blobs.
|
|
"""
|
|
content_kp = zkac.IssuanceKeypair()
|
|
content_pk = bytes(content_kp.public_key_bytes())
|
|
content_sk = bytes(content_kp.secret_bytes())
|
|
sha = _sha256_file(path)
|
|
size = path.stat().st_size
|
|
entry = FileEntry(
|
|
rel_path="",
|
|
size=size,
|
|
sha256_b64=_b64(sha),
|
|
content_secret_b64=_b64(content_sk),
|
|
)
|
|
blobs: list[bytes] = []
|
|
for chunk in _read_chunks(path):
|
|
eph_pk, ciphertext = zkac.encrypt_for_admin(content_pk, chunk)
|
|
entry.chunks.append(FileChunk(blob_id=uuid.uuid4().hex, eph_pk_b64=_b64(bytes(eph_pk))))
|
|
blobs.append(bytes(ciphertext))
|
|
return entry, blobs
|
|
|
|
|
|
def decrypt_file_from_blobs(entry: FileEntry, blobs: Iterable[bytes]) -> bytes:
|
|
"""Reassemble + verify a file from its (already fetched) ciphertext blobs."""
|
|
content_kp = zkac.IssuanceKeypair.from_secret(_unb64(entry.content_secret_b64))
|
|
out = bytearray()
|
|
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
|
|
out.extend(bytes(content_kp.decrypt(_unb64(chunk_meta.eph_pk_b64), ciphertext)))
|
|
if hashlib.sha256(bytes(out)).digest() != _unb64(entry.sha256_b64):
|
|
raise RuntimeError(f"hash mismatch for {entry.rel_path!r}")
|
|
return bytes(out)
|
|
|
|
|
|
# ── role bitmasks ────────────────────────────────────────────────────
|
|
|
|
def normalize_mask(mask: str, file_count: int) -> str:
|
|
"""Pad/truncate a bitmask string of '0'/'1' to ``file_count`` and validate."""
|
|
cleaned = "".join(c for c in mask if c in "01")
|
|
if len(cleaned) > file_count:
|
|
raise ValueError(f"mask {mask!r} longer than {file_count} files")
|
|
cleaned = cleaned.ljust(file_count, "0")
|
|
return cleaned
|
|
|
|
|
|
def files_visible_to_mask(files: list[FileEntry], mask: str) -> list[FileEntry]:
|
|
return [f for f, bit in zip(files, mask) if bit == "1"]
|
|
|
|
|
|
def encode_grant_payload(
|
|
bucket_id: str,
|
|
role_name: str,
|
|
server: str,
|
|
registry_id_hex: str,
|
|
visible_files: list[FileEntry],
|
|
) -> bytes:
|
|
payload = {
|
|
"bucket_id": bucket_id,
|
|
"role_name": role_name,
|
|
"server": server,
|
|
"registry_id_hex": registry_id_hex,
|
|
"files": [f.to_grant_dict() for f in visible_files],
|
|
}
|
|
return json.dumps(payload, separators=(",", ":")).encode("utf-8")
|
|
|
|
|
|
def encrypt_grant_to_recipient(payload: bytes, recipient_issuance_pk_hex: str) -> tuple[str, str]:
|
|
"""User->admin direction sealed box: ``zkac.encrypt_for_admin`` -> (eph_pk_b64, ciphertext_b64)."""
|
|
rec_pk = bytes.fromhex(recipient_issuance_pk_hex)
|
|
if len(rec_pk) != 32:
|
|
raise ValueError("recipient issuance public key must be 32 bytes")
|
|
eph_pk, ciphertext = zkac.encrypt_for_admin(rec_pk, payload)
|
|
return _b64(bytes(eph_pk)), _b64(bytes(ciphertext))
|
|
|
|
|
|
def decrypt_grant_for_recipient(
|
|
eph_pk_b64: str,
|
|
ciphertext_b64: str,
|
|
issuance_secret_hex: str,
|
|
) -> dict:
|
|
secret = bytes.fromhex(issuance_secret_hex)
|
|
if len(secret) != 32:
|
|
raise ValueError("issuance secret must be 32 bytes")
|
|
receiver = zkac.IssuanceKeypair.from_secret(secret)
|
|
plaintext = bytes(receiver.decrypt(_unb64(eph_pk_b64), _unb64(ciphertext_b64)))
|
|
return json.loads(plaintext.decode("utf-8"))
|
|
|
|
|
|
# ── encrypted authenticated session to file-share server ─────────────
|
|
|
|
class FileShareSession:
|
|
"""Anonymous handshake + ``op:'fs'`` BBS+ presentation -> framed JSON RPC."""
|
|
|
|
def __init__(self, sock: socket.socket, framed: FramedSession) -> None:
|
|
self._sock = sock
|
|
self._framed = framed
|
|
|
|
def _call(self, cmd: dict) -> dict:
|
|
self._framed.send(json.dumps(cmd).encode("utf-8"))
|
|
resp = json.loads(self._framed.recv())
|
|
if resp.get("error"):
|
|
raise RuntimeError(resp["error"])
|
|
return resp
|
|
|
|
def close(self) -> None:
|
|
try:
|
|
self._sock.close()
|
|
except OSError:
|
|
pass
|
|
|
|
def __enter__(self) -> "FileShareSession":
|
|
return self
|
|
|
|
def __exit__(self, *exc: object) -> None:
|
|
self.close()
|
|
|
|
# admin commands
|
|
|
|
def bucket_create(self, bucket_id: str | None = None) -> str:
|
|
cmd = {"cmd": "bucket_create"}
|
|
if bucket_id is not None:
|
|
cmd["bucket_id"] = bucket_id
|
|
return self._call(cmd)["bucket_id"]
|
|
|
|
def bucket_put_blob(self, bucket_id: str, blob_id: str, ciphertext: bytes) -> None:
|
|
self._call({
|
|
"cmd": "bucket_put_blob",
|
|
"bucket_id": bucket_id,
|
|
"blob_id": blob_id,
|
|
"ciphertext_b64": _b64(ciphertext),
|
|
})
|
|
|
|
def bucket_set_role_acl(self, bucket_id: str, role_id_hex: str, allowed_blob_ids: list[str]) -> None:
|
|
self._call({
|
|
"cmd": "bucket_set_role_acl",
|
|
"bucket_id": bucket_id,
|
|
"role_id_hex": role_id_hex,
|
|
"allowed_blob_ids": allowed_blob_ids,
|
|
})
|
|
|
|
def bucket_put_role_grant(
|
|
self,
|
|
bucket_id: str,
|
|
role_id_hex: str,
|
|
acl_version: int,
|
|
eph_pk_b64: str,
|
|
ciphertext_b64: str,
|
|
) -> None:
|
|
self._call({
|
|
"cmd": "bucket_put_role_grant",
|
|
"bucket_id": bucket_id,
|
|
"role_id_hex": role_id_hex,
|
|
"acl_version": int(acl_version),
|
|
"eph_pk_b64": eph_pk_b64,
|
|
"ciphertext_b64": ciphertext_b64,
|
|
})
|
|
|
|
def bucket_get_role_acl(self, bucket_id: str, role_id_hex: str) -> dict:
|
|
return self._call({
|
|
"cmd": "bucket_get_role_acl",
|
|
"bucket_id": bucket_id,
|
|
"role_id_hex": role_id_hex,
|
|
})
|
|
|
|
def bucket_finalize(self, bucket_id: str) -> None:
|
|
self._call({"cmd": "bucket_finalize", "bucket_id": bucket_id})
|
|
|
|
def bucket_delete(self, bucket_id: str) -> None:
|
|
self._call({"cmd": "bucket_delete", "bucket_id": bucket_id})
|
|
|
|
def bucket_list_owned(self) -> list[str]:
|
|
return self._call({"cmd": "bucket_list_owned"})["bucket_ids"]
|
|
|
|
# any-role commands
|
|
|
|
def whoami(self) -> dict:
|
|
return self._call({"cmd": "whoami"})
|
|
|
|
def fs_buckets(self) -> list[str]:
|
|
return self._call({"cmd": "fs_buckets"})["bucket_ids"]
|
|
|
|
def fs_get_role_grant(self, bucket_id: str) -> dict:
|
|
return self._call({"cmd": "fs_get_role_grant", "bucket_id": bucket_id})
|
|
|
|
def fs_get_blob(self, bucket_id: str, blob_id: str) -> bytes:
|
|
return _unb64(self._call({"cmd": "fs_get_blob", "bucket_id": bucket_id, "blob_id": blob_id})["ciphertext_b64"])
|
|
|
|
|
|
def open_session(
|
|
server: str,
|
|
*,
|
|
server_pk_hex: str,
|
|
user_transport_secret: bytes,
|
|
registry_id_hex: str,
|
|
role_id: bytes,
|
|
credential: "zkac.Credential",
|
|
connect_timeout_s: float = DEFAULT_CONNECT_TIMEOUT_S,
|
|
) -> FileShareSession:
|
|
"""Connect, anonymous-handshake, then present a BBS+ proof bound to the transcript."""
|
|
host, port = _parse_server(server)
|
|
sock = socket.create_connection((host, port), timeout=connect_timeout_s)
|
|
sock.settimeout(None)
|
|
try:
|
|
node = zkac.Node(zkac.Keypair.from_secret_key(user_transport_secret))
|
|
server_pk = zkac.PublicKey.from_bytes(bytes.fromhex(server_pk_hex))
|
|
session = client_handshake_anon(sock, node, server_pk)
|
|
framed = FramedSession(sock, session)
|
|
transcript_hash = bytes(session.transcript_hash())
|
|
bbs_auth_proof = bytes(credential.present(transcript_hash))
|
|
framed.send(json.dumps({
|
|
"op": "fs",
|
|
"bbs_auth_b64": _b64(bbs_auth_proof),
|
|
}).encode("utf-8"))
|
|
hello = json.loads(framed.recv())
|
|
if hello.get("error"):
|
|
raise RuntimeError(hello["error"])
|
|
return FileShareSession(sock, framed)
|
|
except Exception:
|
|
sock.close()
|
|
raise
|
|
|
|
|
|
# ── higher-level admin helpers ───────────────────────────────────────
|
|
|
|
def upload_bucket(
|
|
sess: FileShareSession,
|
|
folder: Path,
|
|
*,
|
|
server: str,
|
|
registry_id_hex: str,
|
|
bucket_id: str | None = None,
|
|
) -> BucketManifest:
|
|
"""Encrypt every file under ``folder`` and stream blobs to the server."""
|
|
bid = sess.bucket_create(bucket_id)
|
|
paths = flatten_folder(folder)
|
|
files: list[FileEntry] = []
|
|
for p in paths:
|
|
entry, blobs = encrypt_file_to_blobs(p)
|
|
entry.rel_path = str(p.relative_to(folder.resolve()))
|
|
for chunk_meta, ciphertext in zip(entry.chunks, blobs):
|
|
sess.bucket_put_blob(bid, chunk_meta.blob_id, ciphertext)
|
|
files.append(entry)
|
|
sess.bucket_finalize(bid)
|
|
return BucketManifest(
|
|
bucket_id=bid,
|
|
server=server,
|
|
registry_id_hex=registry_id_hex,
|
|
files=files,
|
|
)
|
|
|
|
|
|
def apply_role_masks_to_server(sess: FileShareSession, manifest: BucketManifest) -> None:
|
|
"""Push per-role blob ACLs so server enforces mask on fs_get_blob."""
|
|
if not manifest.role_masks:
|
|
return
|
|
for role_name, raw_mask in manifest.role_masks.items():
|
|
mask = normalize_mask(raw_mask, len(manifest.files))
|
|
visible = files_visible_to_mask(manifest.files, mask)
|
|
allowed_blob_ids = [c.blob_id for f in visible for c in f.chunks]
|
|
sess.bucket_set_role_acl(
|
|
manifest.bucket_id,
|
|
zkac.role_id(role_name).hex(),
|
|
allowed_blob_ids,
|
|
)
|
|
|
|
|
|
def push_role_grant(
|
|
sess: FileShareSession,
|
|
manifest: BucketManifest,
|
|
role_name: str,
|
|
recipient_issuance_pk_hex: str,
|
|
) -> None:
|
|
"""Upload anonymous encrypted role grant envelope (no recipient identifier stored)."""
|
|
if role_name not in manifest.role_masks:
|
|
raise RuntimeError(f"role {role_name!r} has no visibility mask in this bucket")
|
|
mask = normalize_mask(manifest.role_masks[role_name], len(manifest.files))
|
|
visible = files_visible_to_mask(manifest.files, mask)
|
|
role_id_hex = zkac.role_id(role_name).hex()
|
|
acl_meta = sess.bucket_get_role_acl(manifest.bucket_id, role_id_hex)
|
|
acl_version = int(acl_meta.get("version", 0))
|
|
if acl_version <= 0:
|
|
raise RuntimeError("server ACL missing for role; apply permissions before sharing")
|
|
payload = encode_grant_payload(
|
|
manifest.bucket_id, role_name, manifest.server, manifest.registry_id_hex, visible,
|
|
)
|
|
eph_pk_b64, ct_b64 = encrypt_grant_to_recipient(payload, recipient_issuance_pk_hex)
|
|
sess.bucket_put_role_grant(
|
|
manifest.bucket_id,
|
|
role_id_hex,
|
|
acl_version,
|
|
eph_pk_b64,
|
|
ct_b64,
|
|
)
|
|
manifest.recipients.append({
|
|
"role_name": role_name,
|
|
"recipient_hint": recipient_issuance_pk_hex[:16],
|
|
})
|
|
|
|
|
|
def download_bucket(
|
|
sess: FileShareSession,
|
|
bucket_id: str,
|
|
*,
|
|
issuance_secret_hex: str,
|
|
role_grant_payload: dict | None = None,
|
|
output_dir: Path,
|
|
) -> dict:
|
|
"""Fetch + decrypt files from a role grant, optionally fetched from server."""
|
|
payload = role_grant_payload
|
|
if payload is None:
|
|
grants = sess.fs_get_role_grant(bucket_id).get("grants", [])
|
|
for grant in grants:
|
|
try:
|
|
payload = decrypt_grant_for_recipient(
|
|
grant["eph_pk_b64"],
|
|
grant["ciphertext_b64"],
|
|
issuance_secret_hex,
|
|
)
|
|
break
|
|
except Exception:
|
|
continue
|
|
if payload is None:
|
|
raise RuntimeError("no decryptable role grant for this credential")
|
|
if payload.get("bucket_id") != bucket_id:
|
|
raise RuntimeError("role grant bucket_id mismatch")
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
written: list[str] = []
|
|
for fd in payload.get("files", []):
|
|
entry = FileEntry.from_grant_dict(fd)
|
|
ciphertexts = [sess.fs_get_blob(bucket_id, c.blob_id) for c in entry.chunks]
|
|
plaintext = decrypt_file_from_blobs(entry, ciphertexts)
|
|
out_path = output_dir / entry.rel_path
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
out_path.write_bytes(plaintext)
|
|
written.append(str(out_path))
|
|
return {"role_name": payload.get("role_name"), "files_written": written}
|
|
|
|
|
|
def _received_grant_path(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> Path:
|
|
return state_dir(userid) / "received_grants" / registry_id_hex / role_name / f"{bucket_id}.json"
|
|
|
|
|
|
def save_received_role_grant(
|
|
userid: str,
|
|
registry_id_hex: str,
|
|
role_name: str,
|
|
bucket_id: str,
|
|
*,
|
|
eph_pk_b64: str,
|
|
ciphertext_b64: str,
|
|
) -> Path:
|
|
path = _received_grant_path(userid, registry_id_hex, role_name, bucket_id)
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(json.dumps({
|
|
"registry_id_hex": registry_id_hex,
|
|
"role_name": role_name,
|
|
"bucket_id": bucket_id,
|
|
"eph_pk_b64": eph_pk_b64,
|
|
"ciphertext_b64": ciphertext_b64,
|
|
}, indent=2))
|
|
try:
|
|
os.chmod(path, 0o600)
|
|
except OSError:
|
|
pass
|
|
return path
|
|
|
|
|
|
def list_received_role_grants(userid: str, registry_id_hex: str, role_name: str) -> list[str]:
|
|
base = state_dir(userid) / "received_grants" / registry_id_hex / role_name
|
|
if not base.is_dir():
|
|
return []
|
|
return sorted(p.stem for p in base.glob("*.json"))
|
|
|
|
|
|
def load_received_role_grant(userid: str, registry_id_hex: str, role_name: str, bucket_id: str) -> dict:
|
|
return json.loads(_received_grant_path(userid, registry_id_hex, role_name, bucket_id).read_text())
|
|
|
|
|
|
# ── local persistence (admin manifests) ──────────────────────────────
|
|
|
|
def state_dir(userid: str) -> Path:
|
|
# Keep demo state fully self-contained with demo ZKAC_HOME, rather than
|
|
# mixing with repository-local legacy state directories.
|
|
base_home = Path(os.environ.get("ZKAC_HOME", Path.home() / ".ZKAC-FS"))
|
|
base = base_home / userid / "file_share"
|
|
base.mkdir(parents=True, exist_ok=True)
|
|
try:
|
|
os.chmod(base, 0o700)
|
|
except OSError:
|
|
pass
|
|
return base
|
|
|
|
|
|
def manifest_path(userid: str, bucket_id: str) -> Path:
|
|
return state_dir(userid) / "buckets" / f"{bucket_id}.json"
|
|
|
|
|
|
def save_manifest(userid: str, manifest: BucketManifest) -> Path:
|
|
p = manifest_path(userid, manifest.bucket_id)
|
|
p.parent.mkdir(parents=True, exist_ok=True)
|
|
p.write_text(json.dumps(manifest.to_dict(), indent=2))
|
|
try:
|
|
os.chmod(p, 0o600)
|
|
except OSError:
|
|
pass
|
|
return p
|
|
|
|
|
|
def load_manifest(userid: str, bucket_id: str) -> BucketManifest:
|
|
return BucketManifest.from_dict(json.loads(manifest_path(userid, bucket_id).read_text()))
|
|
|
|
|
|
def list_manifests(userid: str) -> list[str]:
|
|
base = state_dir(userid) / "buckets"
|
|
if not base.is_dir():
|
|
return []
|
|
return sorted(p.stem for p in base.glob("*.json"))
|