ZKAC/demo/test_demo_privacy_guardrails.py
everbarry fe68752cc7 demo: privacy-harden file share and add guardrail tests
Harden fs auth and storage for a trustless-server model: proof-only hello,
opaque tagged bucket metadata, safer connection logging, and inbox UI without
raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop
tracking demo __pycache__ and fs_data artifacts.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-07 22:31:37 +02:00

193 lines
6.0 KiB
Python

import json
import sys
import threading
from pathlib import Path
import zkac
DEMO_DIR = Path(__file__).resolve().parent
if str(DEMO_DIR) not in sys.path:
sys.path.insert(0, str(DEMO_DIR))
import file_share_client as fsc # noqa: E402
import file_share_server as fss # noqa: E402
def _make_credential() -> tuple[bytes, zkac.Credential]:
issuer = zkac.BbsIssuer()
pk = issuer.public_key()
role_id = zkac.role_id("viewer")
req = zkac.prepare_blind_request()
blind_sig = issuer.issue_blind(req.commitment_with_proof(), role_id, 1)
cred = zkac.Credential.finalize(
blind_sig,
req.member_secret(),
req.prover_blind(),
role_id,
1,
pk,
)
return role_id, cred
def test_open_session_fs_hello_contains_only_proof(monkeypatch):
role_id, credential = _make_credential()
sent_payloads: list[dict] = []
class _FakeSocket:
def settimeout(self, _value):
return None
def close(self):
return None
class _FakeHandshakeSession:
def transcript_hash(self) -> bytes:
return b"\x11" * 32
class _FakeFramedSession:
def __init__(self, _sock, _session):
pass
def send(self, payload: bytes) -> None:
sent_payloads.append(json.loads(payload.decode("utf-8")))
def recv(self) -> bytes:
return b'{"ok": true, "status": "authenticated"}'
monkeypatch.setattr(fsc.socket, "create_connection", lambda *_args, **_kwargs: _FakeSocket())
monkeypatch.setattr(fsc, "client_handshake_anon", lambda *_args, **_kwargs: _FakeHandshakeSession())
monkeypatch.setattr(fsc, "FramedSession", _FakeFramedSession)
sess = fsc.open_session(
"127.0.0.1:9879",
server_pk_hex=zkac.Keypair().public_key().to_bytes().hex(),
user_transport_secret=bytes(zkac.Keypair().secret_key_bytes()),
registry_id_hex="00" * 32,
role_id=role_id,
credential=credential,
)
sess.close()
assert len(sent_payloads) == 1
hello = sent_payloads[0]
assert set(hello.keys()) == {"op", "bbs_auth_b64"}
assert hello["op"] == "fs"
assert isinstance(hello["bbs_auth_b64"], str) and hello["bbs_auth_b64"]
def test_handle_conn_error_log_never_includes_peer_endpoint(monkeypatch, tmp_path, capsys):
class _DummyConn:
def settimeout(self, _value):
return None
def close(self):
return None
def _raise_handshake(_conn, _node):
raise RuntimeError("boom")
monkeypatch.setattr(fss, "server_handshake_anon", _raise_handshake)
slots = threading.BoundedSemaphore(1)
assert slots.acquire(blocking=False)
fss._handle_conn(
_DummyConn(),
("203.0.113.44", 4242),
zkac.Node(zkac.Keypair()),
zkac.RegistryManager(),
fss._FileShareStore(tmp_path),
"",
5.0,
slots,
)
out = capsys.readouterr().out
assert "[fs-server] connection error (RuntimeError)" in out
assert "203.0.113.44" not in out
assert ":4242" not in out
def test_bucket_metadata_uses_opaque_tags(tmp_path):
store = fss._FileShareStore(tmp_path)
store.set_privacy_key(b"privacy-key-32-bytes-minimum-seed")
registry_id = "aa" * 32
role_id = "bb" * 32
bucket_id = "cc" * 16
blob_id = "dd" * 16
store.bucket_create(bucket_id, registry_id)
meta = store.bucket_meta(bucket_id)
assert "owner_registry_id" not in meta
assert isinstance(meta.get("owner_registry_tag"), str)
assert len(meta["owner_registry_tag"]) == 64
store.bucket_set_role_acl(bucket_id, registry_id, role_id, [blob_id])
meta2 = store.bucket_meta(bucket_id)
acl_keys = list(meta2.get("role_acl", {}).keys())
assert acl_keys and acl_keys[0] != role_id
assert all(len(k) == 64 for k in acl_keys)
store.bucket_put_role_grant(bucket_id, registry_id, role_id, 1, "eph", "ct")
grants_root = tmp_path / "buckets" / bucket_id / "role_grants"
assert (grants_root / role_id).exists() is False
role_dirs = [p.name for p in grants_root.iterdir() if p.is_dir()]
assert role_dirs and all(len(d) == 64 for d in role_dirs)
def test_auth_scan_does_not_return_early():
class _Mgr:
def __init__(self):
self.admin_checks: list[str] = []
self.role_checks: list[tuple[str, str]] = []
def verify_admin(self, rid: bytes, _proof: bytes, _th: bytes) -> bool:
self.admin_checks.append(rid.hex())
return rid.hex() == ("11" * 32)
def verify_presentation(self, rid: bytes, role_id: bytes, _proof: bytes, _th: bytes) -> bool:
self.role_checks.append((rid.hex(), role_id.hex()))
return False
class _Store:
def list_registry_ids(self):
return ["11" * 32, "22" * 32]
def role_ids_for_registry(self, rid_hex: str):
if rid_hex == "11" * 32:
return ["33" * 32]
return ["44" * 32]
def _registry_tag(self, rid_hex: str):
return "r-" + rid_hex[:8]
def _role_tag(self, role_hex: str):
return "k-" + role_hex[:8]
mgr = _Mgr()
auth = fss._authenticate_fs_identity(mgr, _Store(), b"proof", b"nonce")
assert auth is not None and auth["is_admin"] is True
# Both registries must be checked even though first one matched admin.
assert mgr.admin_checks == ["11" * 32, "22" * 32]
# Role checks also run for all known role candidates.
assert mgr.role_checks == [("11" * 32, "33" * 32), ("22" * 32, "44" * 32)]
def test_dispatch_whoami_does_not_expose_registry_or_role():
resp = fss._dispatch_fs(
{"cmd": "whoami"},
store=None, # not used by whoami branch
ctx={
"registry_id_hex": "11" * 32,
"registry_tag": "r-tag",
"role_id_hex": "22" * 32,
"role_tag": "k-tag",
"is_admin": False,
},
)
assert resp["ok"] is True
assert "registry_id" not in resp
assert "role_id" not in resp
assert resp["auth_scope"] == "credential"