ZKAC/demo/file_share_server.py
everbarry fe68752cc7 demo: privacy-harden file share and add guardrail tests
Harden fs auth and storage for a trustless-server model: proof-only hello,
opaque tagged bucket metadata, safer connection logging, and inbox UI without
raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop
tracking demo __pycache__ and fs_data artifacts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 22:31:37 +02:00

816 lines
30 KiB
Python

#!/usr/bin/env python3
"""
ZKAC opaque file-share server (demo).
Headless TCP service that combines:
* The same management-channel wire protocol as ``zkac-node serve`` so existing
``zkac-node registry create/get/update`` commands work unchanged.
* A new file-share channel that, after BBS+ role authentication, exposes
bucket primitives. The server only ever sees opaque ciphertext blobs and
encrypted role grants; file names, contents and per-role
visibility masks are never visible server-side.
Run with::
uv run python demo/file_share_server.py --host 127.0.0.1 --port 9879
The server transport public key is printed at start-up; pin it on each client
with ``zkac-node server pin <userid> 127.0.0.1:9879 --key <hex>``.
"""
from __future__ import annotations
import argparse
import base64
import hashlib
import json
import os
import socket
import sys
import threading
import uuid
from pathlib import Path
from typing import Any
import zkac
from zkac.tcp import FramedSession, server_handshake_anon
# ── small helpers ────────────────────────────────────────────────────
def _b64(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _unb64(s: str) -> bytes:
return base64.b64decode(s)
def _chmod(path: Path, mode: int) -> None:
try:
os.chmod(path, mode)
except OSError:
pass
def _write_private_json(path: Path, payload: dict) -> None:
path.write_text(json.dumps(payload, indent=2))
_chmod(path, 0o600)
def _is_loopback(host: str) -> bool:
return host.strip().lower() in {"127.0.0.1", "::1", "localhost"}
def _safe_id(value: str) -> str:
"""Strict allowlist on identifiers used as filesystem names."""
if not isinstance(value, str) or not value:
raise ValueError("missing identifier")
if len(value) > 128 or any(c not in "0123456789abcdefABCDEF-" for c in value):
raise ValueError(f"invalid identifier {value!r}")
return value
def _privacy_safe_error_tag(exc: BaseException) -> str:
"""Return a coarse error label suitable for public service logs."""
return type(exc).__name__
# ── opaque on-disk store ─────────────────────────────────────────────
class _FileShareStore:
"""Persists registry snapshots and opaque bucket state under one data dir."""
def __init__(self, data_dir: Path) -> None:
self._dir = data_dir
self._reg_dir = data_dir / "registries"
self._buckets_dir = data_dir / "buckets"
self._privacy_key: bytes = b""
for d in (self._dir, self._reg_dir, self._buckets_dir):
d.mkdir(parents=True, exist_ok=True)
_chmod(d, 0o700)
self._lock = threading.Lock()
def set_privacy_key(self, key: bytes) -> None:
if not key:
raise RuntimeError("privacy key must not be empty")
self._privacy_key = bytes(key)
def _require_privacy_key(self) -> bytes:
if not self._privacy_key:
raise RuntimeError("privacy key not initialized")
return self._privacy_key
def _tag(self, kind: str, raw_id: str) -> str:
key = self._require_privacy_key()
raw = _safe_id(raw_id).encode("utf-8")
h = hashlib.sha256()
h.update(kind.encode("utf-8"))
h.update(b"\x00")
h.update(key)
h.update(b"\x00")
h.update(raw)
return h.hexdigest()
def _registry_tag(self, registry_id_hex: str) -> str:
return self._tag("registry", registry_id_hex)
def _role_tag(self, role_id_hex: str) -> str:
return self._tag("role", role_id_hex)
# transport key
def load_or_create_keypair(self) -> zkac.Keypair:
kf = self._dir / "server_key.json"
if kf.exists():
data = json.loads(kf.read_text())
return zkac.Keypair.from_secret_key(_unb64(data["secret_b64"]))
kp = zkac.Keypair()
_write_private_json(kf, {
"secret_b64": _b64(kp.secret_key_bytes()),
"public_b64": _b64(kp.public_key().to_bytes()),
})
return kp
# registries (mirrors zkac-node serve so the CLI works unchanged)
def save_registry(self, rid_hex: str, state_bytes: bytes, cert_bytes: bytes) -> None:
with self._lock:
(self._reg_dir / f"{rid_hex}.state").write_bytes(state_bytes)
(self._reg_dir / f"{rid_hex}.cert").write_bytes(cert_bytes)
def load_all_registries(self, mgr: zkac.RegistryManager) -> int:
n = 0
for p in sorted(self._reg_dir.glob("*.state")):
cert = self._reg_dir / f"{p.stem}.cert"
if not cert.exists():
continue
try:
mgr.restore(p.read_bytes(), cert.read_bytes())
n += 1
except Exception as exc:
print(f"[fs-server] skip registry {p.stem}: {exc}")
return n
def list_registry_ids(self) -> list[str]:
out: list[str] = []
for p in sorted(self._reg_dir.glob("*.state")):
cert = self._reg_dir / f"{p.stem}.cert"
if cert.exists():
out.append(p.stem)
return out
# buckets
def _bucket_dir(self, bucket_id: str) -> Path:
return self._buckets_dir / _safe_id(bucket_id)
def _bucket_meta_path(self, bucket_id: str) -> Path:
return self._bucket_dir(bucket_id) / "meta.json"
def _roles_index_path(self, registry_id_hex: str) -> Path:
return self._reg_dir / f"{_safe_id(registry_id_hex)}.roles.json"
def _remember_role_id(self, registry_id_hex: str, role_id_hex: str) -> None:
p = self._roles_index_path(registry_id_hex)
roles: set[str] = set()
if p.is_file():
try:
data = json.loads(p.read_text())
if isinstance(data, list):
roles = {_safe_id(str(v)) for v in data}
except Exception:
roles = set()
roles.add(_safe_id(role_id_hex))
_write_private_json(p, sorted(roles))
def bucket_create(self, bucket_id: str, owner_registry_id_hex: str) -> None:
bd = self._bucket_dir(bucket_id)
with self._lock:
if bd.exists():
raise RuntimeError("bucket already exists")
(bd / "blobs").mkdir(parents=True)
(bd / "role_grants").mkdir()
_chmod(bd, 0o700)
_write_private_json(self._bucket_meta_path(bucket_id), {
"bucket_id": bucket_id,
"owner_registry_tag": self._registry_tag(owner_registry_id_hex),
"finalized": False,
"role_acl": {}, # role_tag -> {"version": int, "allowed_blob_ids": [blob_id...]}
})
def bucket_meta(self, bucket_id: str) -> dict:
return json.loads(self._bucket_meta_path(bucket_id).read_text())
def bucket_is_finalized(self, bucket_id: str) -> bool:
return bool(self.bucket_meta(bucket_id).get("finalized", False))
def _bucket_require_owner(self, bucket_id: str, registry_id_hex: str) -> dict:
meta = self.bucket_meta(bucket_id)
expected_owner_tag = self._registry_tag(registry_id_hex)
if meta.get("owner_registry_tag") != expected_owner_tag:
raise RuntimeError("not bucket owner")
return meta
def bucket_set_finalized(self, bucket_id: str, registry_id_hex: str, finalized: bool) -> None:
with self._lock:
meta = self._bucket_require_owner(bucket_id, registry_id_hex)
meta["finalized"] = bool(finalized)
_write_private_json(self._bucket_meta_path(bucket_id), meta)
def bucket_delete(self, bucket_id: str, registry_id_hex: str) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
bd = self._bucket_dir(bucket_id)
for sub in ("blobs", "role_grants"):
d = bd / sub
if d.is_dir():
for p in d.rglob("*"):
if p.is_file():
try:
p.unlink()
except OSError:
pass
for p in sorted(d.rglob("*"), reverse=True):
if p.is_dir():
try:
p.rmdir()
except OSError:
pass
try:
d.rmdir()
except OSError:
pass
try:
self._bucket_meta_path(bucket_id).unlink()
except OSError:
pass
try:
bd.rmdir()
except OSError:
pass
def bucket_put_blob(
self,
bucket_id: str,
registry_id_hex: str,
blob_id: str,
ciphertext: bytes,
) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
(self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).write_bytes(ciphertext)
def bucket_set_role_acl(
self,
bucket_id: str,
registry_id_hex: str,
role_id_hex: str,
allowed_blob_ids: list[str],
) -> None:
with self._lock:
meta = self._bucket_require_owner(bucket_id, registry_id_hex)
role_acl = dict(meta.get("role_acl", {}))
key = self._role_tag(role_id_hex)
prev = role_acl.get(key, {})
prev_version = int(prev.get("version", 0)) if isinstance(prev, dict) else 0
role_acl[key] = {
"version": prev_version + 1,
"allowed_blob_ids": [_safe_id(b) for b in allowed_blob_ids],
}
meta["role_acl"] = role_acl
_write_private_json(self._bucket_meta_path(bucket_id), meta)
self._remember_role_id(registry_id_hex, role_id_hex)
def bucket_get_blob(self, bucket_id: str, blob_id: str) -> bytes:
return (self._bucket_dir(bucket_id) / "blobs" / _safe_id(blob_id)).read_bytes()
def bucket_blob_allowed_for_role(self, bucket_id: str, role_id_hex: str, blob_id: str) -> bool:
meta = self.bucket_meta(bucket_id)
role_acl = meta.get("role_acl", {})
role_meta = role_acl.get(self._role_tag(role_id_hex))
if not isinstance(role_meta, dict):
return False
allowed = role_meta.get("allowed_blob_ids")
if not isinstance(allowed, list):
return False
return _safe_id(blob_id) in {str(v) for v in allowed}
def bucket_role_acl(self, bucket_id: str, role_id_hex: str) -> dict:
meta = self.bucket_meta(bucket_id)
role_acl = meta.get("role_acl", {})
role_meta = role_acl.get(self._role_tag(role_id_hex), {})
if not isinstance(role_meta, dict):
return {"version": 0, "allowed_blob_ids": []}
version = int(role_meta.get("version", 0))
allowed = role_meta.get("allowed_blob_ids", [])
if not isinstance(allowed, list):
allowed = []
return {
"version": version,
"allowed_blob_ids": [_safe_id(str(v)) for v in allowed],
}
def buckets_for_role_in_registry(self, role_id_hex: str, registry_id_hex: str) -> list[str]:
"""Bucket ids where this authenticated role currently has non-empty ACL."""
role_id_hex = _safe_id(role_id_hex)
out: list[str] = []
for bd in sorted(self._buckets_dir.iterdir()):
bid = bd.name
try:
if not self.bucket_is_finalized(bid):
continue
if self.bucket_owner_registry_tag(bid) != self._registry_tag(registry_id_hex):
continue
acl = self.bucket_role_acl(bid, role_id_hex)
allowed = acl.get("allowed_blob_ids", [])
grants = self.bucket_get_role_grants(bid, role_id_hex)
current_acl_version = int(acl.get("version", -1))
has_fresh_grant = any(
isinstance(g.get("acl_version"), int) and g["acl_version"] == current_acl_version
for g in grants
)
if isinstance(allowed, list) and len(allowed) > 0 and has_fresh_grant:
out.append(bid)
except Exception:
continue
return out
def bucket_put_role_grant(
self,
bucket_id: str,
registry_id_hex: str,
role_id_hex: str,
acl_version: int,
eph_pk_b64: str,
ciphertext_b64: str,
) -> None:
with self._lock:
self._bucket_require_owner(bucket_id, registry_id_hex)
role_id_hex = _safe_id(role_id_hex)
role_tag = self._role_tag(role_id_hex)
payload = {
"bucket_id": bucket_id,
"role_tag": role_tag,
"acl_version": int(acl_version),
"eph_pk_b64": eph_pk_b64,
"ciphertext_b64": ciphertext_b64,
}
role_dir = self._bucket_dir(bucket_id) / "role_grants" / role_tag
role_dir.mkdir(parents=True, exist_ok=True)
target = role_dir / f"{uuid.uuid4().hex}.json"
_write_private_json(target, payload)
self._remember_role_id(registry_id_hex, role_id_hex)
def bucket_get_role_grants(self, bucket_id: str, role_id_hex: str) -> list[dict]:
role_dir = self._bucket_dir(bucket_id) / "role_grants" / self._role_tag(role_id_hex)
if not role_dir.is_dir():
return []
out: list[dict] = []
for p in sorted(role_dir.glob("*.json")):
try:
out.append(json.loads(p.read_text()))
except Exception:
continue
return out
def bucket_owner_registry_tag(self, bucket_id: str) -> str:
owner = self.bucket_meta(bucket_id).get("owner_registry_tag")
if not isinstance(owner, str) or not owner:
raise RuntimeError("bucket metadata missing owner_registry_tag")
return owner
def buckets_owned_by(self, registry_id_hex: str) -> list[str]:
out: list[str] = []
for bd in sorted(self._buckets_dir.iterdir()):
meta = bd / "meta.json"
if not meta.is_file():
continue
try:
owner_tag = self._registry_tag(registry_id_hex)
if json.loads(meta.read_text()).get("owner_registry_tag") == owner_tag:
out.append(bd.name)
except (OSError, json.JSONDecodeError):
continue
return out
def role_ids_for_registry(self, registry_id_hex: str) -> list[str]:
p = self._roles_index_path(registry_id_hex)
if not p.is_file():
return []
try:
data = json.loads(p.read_text())
if not isinstance(data, list):
return []
return sorted({_safe_id(str(v)) for v in data})
except Exception:
return []
# ── command dispatch (inside encrypted session) ──────────────────────
def _dispatch_mgmt(
cmd: dict,
mgr: zkac.RegistryManager,
store: _FileShareStore,
server_pk_b64: str,
transcript_hash: bytes,
) -> dict:
"""Registry management commands, wire-compatible with ``zkac-node`` CLI."""
try:
action = cmd.get("cmd")
rid_hex = cmd.get("auth_registry_id")
admin_proof_b64 = cmd.get("admin_proof_b64")
def _require_admin_for(target_rid_hex: str) -> None:
if rid_hex != target_rid_hex:
raise RuntimeError("auth_registry_id must match command registry_id")
if not isinstance(admin_proof_b64, str) or not admin_proof_b64:
raise RuntimeError("missing admin_proof_b64")
if not mgr.verify_admin(
bytes.fromhex(target_rid_hex),
_unb64(admin_proof_b64),
transcript_hash,
):
raise RuntimeError("admin authorization failed")
if action == "server_info":
return {"ok": True, "server_public_key_b64": server_pk_b64}
if action == "create_registry":
state_bytes = _unb64(cmd["state_bytes_b64"])
state_cert = _unb64(cmd["state_cert_b64"])
auth_rid = cmd.get("auth_registry_id")
if not isinstance(auth_rid, str):
raise RuntimeError("missing auth_registry_id")
if not isinstance(admin_proof_b64, str) or not admin_proof_b64:
raise RuntimeError("missing admin_proof_b64")
tmp_mgr = zkac.RegistryManager()
expected_rid = tmp_mgr.create(state_bytes, state_cert).hex()
if expected_rid != auth_rid:
raise RuntimeError("auth_registry_id does not match certified state")
if not tmp_mgr.verify_admin(
bytes.fromhex(expected_rid),
_unb64(admin_proof_b64),
transcript_hash,
):
raise RuntimeError("admin authorization failed for create_registry")
rid = mgr.create(state_bytes, state_cert)
store.save_registry(rid.hex(), state_bytes, state_cert)
return {"ok": True, "registry_id": rid.hex()}
if action == "get_registry":
rid_hex_cmd = cmd["registry_id"]
_require_admin_for(rid_hex_cmd)
rid = bytes.fromhex(rid_hex_cmd)
state_bytes, state_cert = mgr.get(rid)
return {
"ok": True,
"state_bytes_b64": _b64(state_bytes),
"state_cert_b64": _b64(state_cert),
}
if action == "update_registry":
rid_hex_cmd = cmd["registry_id"]
_require_admin_for(rid_hex_cmd)
rid = bytes.fromhex(rid_hex_cmd)
state_bytes = _unb64(cmd["state_bytes_b64"])
state_cert = _unb64(cmd["state_cert_b64"])
mgr.update(rid, state_bytes, state_cert)
store.save_registry(rid_hex_cmd, state_bytes, state_cert)
return {"ok": True}
return {"error": f"unknown command: {action}"}
except Exception as exc:
return {"error": str(exc)}
def _dispatch_fs(
cmd: dict,
store: _FileShareStore,
ctx: dict,
) -> dict:
"""File-share commands authenticated via opaque per-session authorization context."""
try:
action = cmd.get("cmd")
registry_id_hex: str = ctx["registry_id_hex"]
registry_tag: str = ctx["registry_tag"]
role_id_hex: str = ctx["role_id_hex"]
is_admin: bool = bool(ctx["is_admin"])
def _require_admin() -> None:
if not is_admin:
raise RuntimeError("admin role required for this command")
if action == "whoami":
return {
"ok": True,
"is_admin": is_admin,
"auth_scope": "admin" if is_admin else "credential",
}
if action == "bucket_create":
_require_admin()
bid = cmd.get("bucket_id") or uuid.uuid4().hex
store.bucket_create(bid, registry_id_hex)
return {"ok": True, "bucket_id": bid}
if action == "bucket_put_blob":
_require_admin()
bid = cmd["bucket_id"]
blob_id = cmd["blob_id"]
ciphertext = _unb64(cmd["ciphertext_b64"])
store.bucket_put_blob(bid, registry_id_hex, blob_id, ciphertext)
return {"ok": True}
if action == "bucket_set_role_acl":
_require_admin()
store.bucket_set_role_acl(
cmd["bucket_id"],
registry_id_hex,
cmd["role_id_hex"],
list(cmd.get("allowed_blob_ids", [])),
)
return {"ok": True}
if action == "bucket_put_role_grant":
_require_admin()
store.bucket_put_role_grant(
cmd["bucket_id"],
registry_id_hex,
cmd["role_id_hex"],
int(cmd["acl_version"]),
cmd["eph_pk_b64"],
cmd["ciphertext_b64"],
)
return {"ok": True}
if action == "bucket_get_role_acl":
_require_admin()
role_acl = store.bucket_role_acl(cmd["bucket_id"], cmd["role_id_hex"])
return {"ok": True, **role_acl}
if action == "bucket_finalize":
_require_admin()
store.bucket_set_finalized(cmd["bucket_id"], registry_id_hex, True)
return {"ok": True}
if action == "bucket_delete":
_require_admin()
store.bucket_delete(cmd["bucket_id"], registry_id_hex)
return {"ok": True}
if action == "bucket_list_owned":
_require_admin()
return {"ok": True, "bucket_ids": store.buckets_owned_by(registry_id_hex)}
if action == "fs_buckets":
return {
"ok": True,
"bucket_ids": store.buckets_for_role_in_registry(
role_id_hex,
registry_id_hex,
),
}
if action == "fs_get_role_grant":
bid = cmd["bucket_id"]
if store.bucket_owner_registry_tag(bid) != registry_tag:
raise RuntimeError("bucket does not belong to authenticated registry")
if not store.bucket_is_finalized(bid):
raise RuntimeError("bucket is not finalized")
current_acl = store.bucket_role_acl(bid, role_id_hex)
acl_version = int(current_acl.get("version", -1))
grants = [
g for g in store.bucket_get_role_grants(bid, role_id_hex)
if isinstance(g.get("acl_version"), int) and g["acl_version"] == acl_version
]
if not grants:
raise RuntimeError("permissions updated; request a fresh role grant")
return {"ok": True, "grants": grants}
if action == "fs_get_blob":
bid = cmd["bucket_id"]
blob_id = cmd["blob_id"]
if store.bucket_owner_registry_tag(bid) != registry_tag:
raise RuntimeError("bucket does not belong to authenticated registry")
if not store.bucket_is_finalized(bid):
raise RuntimeError("bucket is not finalized")
if not is_admin and not store.bucket_blob_allowed_for_role(bid, role_id_hex, blob_id):
raise RuntimeError("blob access denied by role mask")
data = store.bucket_get_blob(bid, blob_id)
return {"ok": True, "ciphertext_b64": _b64(data)}
return {"error": f"unknown command: {action}"}
except Exception as exc:
return {"error": str(exc)}
def _authenticate_fs_identity(
mgr: zkac.RegistryManager,
store: _FileShareStore,
proof: bytes,
transcript_hash: bytes,
) -> dict[str, object] | None:
"""Resolve opaque auth context from proof without client-supplied identifiers."""
registry_ids = store.list_registry_ids()
admin_matches: list[str] = []
role_matches: list[tuple[str, str]] = []
for rid_hex in registry_ids:
try:
rid = bytes.fromhex(rid_hex)
except ValueError:
continue
try:
if mgr.verify_admin(rid, proof, transcript_hash):
admin_matches.append(rid_hex)
except Exception:
pass
for role_id_hex in store.role_ids_for_registry(rid_hex):
try:
role_id = bytes.fromhex(role_id_hex)
except ValueError:
continue
if role_id == zkac.admin_role_id():
continue
try:
if mgr.verify_presentation(rid, role_id, proof, transcript_hash):
role_matches.append((rid_hex, role_id_hex))
except Exception:
continue
if len(admin_matches) == 1 and not role_matches:
rid_hex = admin_matches[0]
role_hex = zkac.admin_role_id().hex()
return {
"registry_id_hex": rid_hex,
"registry_tag": store._registry_tag(rid_hex),
"role_id_hex": role_hex,
"role_tag": store._role_tag(role_hex),
"is_admin": True,
}
if not admin_matches and len(role_matches) == 1:
rid_hex, role_hex = role_matches[0]
return {
"registry_id_hex": rid_hex,
"registry_tag": store._registry_tag(rid_hex),
"role_id_hex": role_hex,
"role_tag": store._role_tag(role_hex),
"is_admin": False,
}
# Ambiguous or invalid proof.
return None
# ── per-connection handler ────────────────────────────────────────────
def _handle_conn(
conn: socket.socket,
addr: tuple,
node: zkac.Node,
mgr: zkac.RegistryManager,
store: _FileShareStore,
server_pk_b64: str,
idle_timeout_s: float,
slots: threading.BoundedSemaphore,
) -> None:
try:
conn.settimeout(idle_timeout_s)
session = server_handshake_anon(conn, node)
framed = FramedSession(conn, session)
transcript_hash = bytes(session.transcript_hash())
hello = json.loads(framed.recv())
op = hello.get("op")
if op == "mgmt":
while True:
try:
cmd = json.loads(framed.recv())
except (ConnectionError, OSError):
break
resp = _dispatch_mgmt(cmd, mgr, store, server_pk_b64, transcript_hash)
framed.send(json.dumps(resp).encode())
return
if op == "fs":
try:
proof = _unb64(hello["bbs_auth_b64"])
except (KeyError, ValueError) as exc:
framed.send(json.dumps({"error": f"invalid fs hello: {exc}"}).encode())
return
auth = _authenticate_fs_identity(mgr, store, proof, transcript_hash)
if auth is None:
framed.send(json.dumps({"error": "auth failed"}).encode())
return
framed.send(json.dumps({"ok": True, "status": "authenticated"}).encode())
ctx = auth
while True:
try:
cmd = json.loads(framed.recv())
except (ConnectionError, OSError):
break
resp = _dispatch_fs(cmd, store, ctx)
framed.send(json.dumps(resp).encode())
return
framed.send(json.dumps({"error": f"unknown op: {op}"}).encode())
except (ConnectionError, BrokenPipeError, OSError):
pass
except Exception as exc:
# Privacy model: never emit client endpoint or request-linked payloads.
print(f"[fs-server] connection error ({_privacy_safe_error_tag(exc)})")
finally:
conn.close()
slots.release()
# ── entry point ──────────────────────────────────────────────────────
def serve(
data_dir: Path,
host: str = "127.0.0.1",
port: int = 9879,
*,
max_connections: int = 64,
idle_timeout_s: float = 60.0,
listen_backlog: int = 64,
allow_non_loopback: bool = False,
) -> None:
data_dir.mkdir(parents=True, exist_ok=True)
store = _FileShareStore(data_dir)
kp = store.load_or_create_keypair()
store.set_privacy_key(bytes(kp.secret_key_bytes()))
server_pk_b64 = _b64(kp.public_key().to_bytes())
pk_hex = kp.public_key().to_bytes().hex()
node = zkac.Node(kp)
mgr = zkac.RegistryManager()
n = store.load_all_registries(mgr)
print(f"[fs-server] data dir: {data_dir}")
print(f"[fs-server] server transport public key (pin OOB): {pk_hex}")
print(f"[fs-server] loaded {n} registries")
print(f"[fs-server] listening on {host}:{port}")
if not _is_loopback(host) and not allow_non_loopback:
raise RuntimeError(
"refusing to bind outside loopback. "
"Use --allow-non-loopback only when exposure is intentional."
)
if not _is_loopback(host):
print(f"[fs-server] warning: binding outside loopback: {host}:{port}", file=sys.stderr)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind((host, port))
slots = threading.BoundedSemaphore(max_connections)
sock.listen(listen_backlog)
try:
while True:
conn, addr = sock.accept()
if not slots.acquire(blocking=False):
conn.close()
continue
threading.Thread(
target=_handle_conn,
args=(conn, addr, node, mgr, store, server_pk_b64, idle_timeout_s, slots),
daemon=True,
).start()
except KeyboardInterrupt:
print("\n[fs-server] shutdown")
finally:
sock.close()
def main() -> None:
p = argparse.ArgumentParser(description="ZKAC opaque file-share server (demo)")
p.add_argument("--host", default="127.0.0.1")
p.add_argument("--port", type=int, default=9879)
p.add_argument(
"--data-dir",
type=Path,
default=Path(__file__).resolve().parent / "fs_data",
help="server state (transport key, registries, opaque buckets)",
)
p.add_argument("--idle-timeout", type=float, default=60.0)
p.add_argument("--max-connections", type=int, default=64)
p.add_argument("--allow-non-loopback", action="store_true")
args = p.parse_args()
serve(
args.data_dir,
host=args.host,
port=args.port,
max_connections=args.max_connections,
idle_timeout_s=args.idle_timeout,
allow_non_loopback=args.allow_non_loopback,
)
if __name__ == "__main__":
main()