Harden fs auth and storage for a trustless-server model: proof-only hello, opaque tagged bucket metadata, safer connection logging, and inbox UI without raw ids. Add demo/test_demo_privacy_guardrails.py and README notes. Stop tracking demo __pycache__ and fs_data artifacts. Co-authored-by: Cursor <cursoragent@cursor.com>
193 lines
6.0 KiB
Python
193 lines
6.0 KiB
Python
import json
|
|
import sys
|
|
import threading
|
|
from pathlib import Path
|
|
|
|
import zkac
|
|
|
|
DEMO_DIR = Path(__file__).resolve().parent
|
|
if str(DEMO_DIR) not in sys.path:
|
|
sys.path.insert(0, str(DEMO_DIR))
|
|
|
|
import file_share_client as fsc # noqa: E402
|
|
import file_share_server as fss # noqa: E402
|
|
|
|
|
|
def _make_credential() -> tuple[bytes, zkac.Credential]:
|
|
issuer = zkac.BbsIssuer()
|
|
pk = issuer.public_key()
|
|
role_id = zkac.role_id("viewer")
|
|
req = zkac.prepare_blind_request()
|
|
blind_sig = issuer.issue_blind(req.commitment_with_proof(), role_id, 1)
|
|
cred = zkac.Credential.finalize(
|
|
blind_sig,
|
|
req.member_secret(),
|
|
req.prover_blind(),
|
|
role_id,
|
|
1,
|
|
pk,
|
|
)
|
|
return role_id, cred
|
|
|
|
|
|
def test_open_session_fs_hello_contains_only_proof(monkeypatch):
|
|
role_id, credential = _make_credential()
|
|
sent_payloads: list[dict] = []
|
|
|
|
class _FakeSocket:
|
|
def settimeout(self, _value):
|
|
return None
|
|
|
|
def close(self):
|
|
return None
|
|
|
|
class _FakeHandshakeSession:
|
|
def transcript_hash(self) -> bytes:
|
|
return b"\x11" * 32
|
|
|
|
class _FakeFramedSession:
|
|
def __init__(self, _sock, _session):
|
|
pass
|
|
|
|
def send(self, payload: bytes) -> None:
|
|
sent_payloads.append(json.loads(payload.decode("utf-8")))
|
|
|
|
def recv(self) -> bytes:
|
|
return b'{"ok": true, "status": "authenticated"}'
|
|
|
|
monkeypatch.setattr(fsc.socket, "create_connection", lambda *_args, **_kwargs: _FakeSocket())
|
|
monkeypatch.setattr(fsc, "client_handshake_anon", lambda *_args, **_kwargs: _FakeHandshakeSession())
|
|
monkeypatch.setattr(fsc, "FramedSession", _FakeFramedSession)
|
|
|
|
sess = fsc.open_session(
|
|
"127.0.0.1:9879",
|
|
server_pk_hex=zkac.Keypair().public_key().to_bytes().hex(),
|
|
user_transport_secret=bytes(zkac.Keypair().secret_key_bytes()),
|
|
registry_id_hex="00" * 32,
|
|
role_id=role_id,
|
|
credential=credential,
|
|
)
|
|
sess.close()
|
|
|
|
assert len(sent_payloads) == 1
|
|
hello = sent_payloads[0]
|
|
assert set(hello.keys()) == {"op", "bbs_auth_b64"}
|
|
assert hello["op"] == "fs"
|
|
assert isinstance(hello["bbs_auth_b64"], str) and hello["bbs_auth_b64"]
|
|
|
|
|
|
def test_handle_conn_error_log_never_includes_peer_endpoint(monkeypatch, tmp_path, capsys):
|
|
class _DummyConn:
|
|
def settimeout(self, _value):
|
|
return None
|
|
|
|
def close(self):
|
|
return None
|
|
|
|
def _raise_handshake(_conn, _node):
|
|
raise RuntimeError("boom")
|
|
|
|
monkeypatch.setattr(fss, "server_handshake_anon", _raise_handshake)
|
|
|
|
slots = threading.BoundedSemaphore(1)
|
|
assert slots.acquire(blocking=False)
|
|
|
|
fss._handle_conn(
|
|
_DummyConn(),
|
|
("203.0.113.44", 4242),
|
|
zkac.Node(zkac.Keypair()),
|
|
zkac.RegistryManager(),
|
|
fss._FileShareStore(tmp_path),
|
|
"",
|
|
5.0,
|
|
slots,
|
|
)
|
|
|
|
out = capsys.readouterr().out
|
|
assert "[fs-server] connection error (RuntimeError)" in out
|
|
assert "203.0.113.44" not in out
|
|
assert ":4242" not in out
|
|
|
|
|
|
def test_bucket_metadata_uses_opaque_tags(tmp_path):
|
|
store = fss._FileShareStore(tmp_path)
|
|
store.set_privacy_key(b"privacy-key-32-bytes-minimum-seed")
|
|
registry_id = "aa" * 32
|
|
role_id = "bb" * 32
|
|
bucket_id = "cc" * 16
|
|
blob_id = "dd" * 16
|
|
|
|
store.bucket_create(bucket_id, registry_id)
|
|
meta = store.bucket_meta(bucket_id)
|
|
assert "owner_registry_id" not in meta
|
|
assert isinstance(meta.get("owner_registry_tag"), str)
|
|
assert len(meta["owner_registry_tag"]) == 64
|
|
|
|
store.bucket_set_role_acl(bucket_id, registry_id, role_id, [blob_id])
|
|
meta2 = store.bucket_meta(bucket_id)
|
|
acl_keys = list(meta2.get("role_acl", {}).keys())
|
|
assert acl_keys and acl_keys[0] != role_id
|
|
assert all(len(k) == 64 for k in acl_keys)
|
|
|
|
store.bucket_put_role_grant(bucket_id, registry_id, role_id, 1, "eph", "ct")
|
|
grants_root = tmp_path / "buckets" / bucket_id / "role_grants"
|
|
assert (grants_root / role_id).exists() is False
|
|
role_dirs = [p.name for p in grants_root.iterdir() if p.is_dir()]
|
|
assert role_dirs and all(len(d) == 64 for d in role_dirs)
|
|
|
|
|
|
def test_auth_scan_does_not_return_early():
|
|
class _Mgr:
|
|
def __init__(self):
|
|
self.admin_checks: list[str] = []
|
|
self.role_checks: list[tuple[str, str]] = []
|
|
|
|
def verify_admin(self, rid: bytes, _proof: bytes, _th: bytes) -> bool:
|
|
self.admin_checks.append(rid.hex())
|
|
return rid.hex() == ("11" * 32)
|
|
|
|
def verify_presentation(self, rid: bytes, role_id: bytes, _proof: bytes, _th: bytes) -> bool:
|
|
self.role_checks.append((rid.hex(), role_id.hex()))
|
|
return False
|
|
|
|
class _Store:
|
|
def list_registry_ids(self):
|
|
return ["11" * 32, "22" * 32]
|
|
|
|
def role_ids_for_registry(self, rid_hex: str):
|
|
if rid_hex == "11" * 32:
|
|
return ["33" * 32]
|
|
return ["44" * 32]
|
|
|
|
def _registry_tag(self, rid_hex: str):
|
|
return "r-" + rid_hex[:8]
|
|
|
|
def _role_tag(self, role_hex: str):
|
|
return "k-" + role_hex[:8]
|
|
|
|
mgr = _Mgr()
|
|
auth = fss._authenticate_fs_identity(mgr, _Store(), b"proof", b"nonce")
|
|
assert auth is not None and auth["is_admin"] is True
|
|
# Both registries must be checked even though first one matched admin.
|
|
assert mgr.admin_checks == ["11" * 32, "22" * 32]
|
|
# Role checks also run for all known role candidates.
|
|
assert mgr.role_checks == [("11" * 32, "33" * 32), ("22" * 32, "44" * 32)]
|
|
|
|
|
|
def test_dispatch_whoami_does_not_expose_registry_or_role():
|
|
resp = fss._dispatch_fs(
|
|
{"cmd": "whoami"},
|
|
store=None, # not used by whoami branch
|
|
ctx={
|
|
"registry_id_hex": "11" * 32,
|
|
"registry_tag": "r-tag",
|
|
"role_id_hex": "22" * 32,
|
|
"role_tag": "k-tag",
|
|
"is_admin": False,
|
|
},
|
|
)
|
|
assert resp["ok"] is True
|
|
assert "registry_id" not in resp
|
|
assert "role_id" not in resp
|
|
assert resp["auth_scope"] == "credential"
|