ZKAC/cli/zkac_cli/client_ops.py
2026-04-16 01:29:59 +02:00

65 lines
1.8 KiB
Python

"""TCP clients for management, managed, and relay channels."""
from __future__ import annotations
import json
import socket
from typing import Any
import zkac
from zkac.tcp import FramedSession, client_handshake, client_handshake_managed
def mgmt_call(
host: str,
port: int,
server_pk: zkac.PublicKey,
mgmt_cred: zkac.Credential,
cmd: dict[str, Any],
) -> dict[str, Any]:
node = zkac.Node(zkac.Keypair())
sock = socket.create_connection((host, port))
try:
session = client_handshake(sock, node, server_pk, mgmt_cred)
framed = FramedSession(sock, session)
framed.send(json.dumps(cmd).encode("utf-8"))
return json.loads(framed.recv().decode("utf-8"))
finally:
sock.close()
def managed_call(
host: str,
port: int,
server_pk: zkac.PublicKey,
cred: zkac.Credential,
registry_id: bytes,
cmd: dict[str, Any],
) -> dict[str, Any]:
node = zkac.Node(zkac.Keypair())
sock = socket.create_connection((host, port))
try:
session = client_handshake_managed(sock, node, server_pk, cred, registry_id)
framed = FramedSession(sock, session)
framed.send(json.dumps(cmd).encode("utf-8"))
return json.loads(framed.recv().decode("utf-8"))
finally:
sock.close()
def relay_line(host: str, port: int, obj: dict[str, Any]) -> dict[str, Any]:
payload = (json.dumps(obj) + "\n").encode("utf-8")
sock = socket.create_connection((host, port))
try:
sock.sendall(payload)
buf = b""
while b"\n" not in buf:
chunk = sock.recv(4096)
if not chunk:
break
buf += chunk
line = buf.split(b"\n", 1)[0]
return json.loads(line.decode("utf-8"))
finally:
sock.close()