Skip to content

Commit faef4b6

Browse files
committed
feat: add CloudClient and rewrite wire protocol for Rust compatibility
- Add CloudClient for luxfi/zap binary protocol (TCP + auto-TLS) - Rewrite wire.py Builder/ObjectBuilder to match Rust single-buffer architecture - Fix response field offsets (CLOUD_RESP_BODY=4, CLOUD_RESP_ERROR=12) - Use signed i32 relative offsets from absolute positions - 28 unit tests + cross-language integration test passing
1 parent 77e2a45 commit faef4b6

File tree

4 files changed

+825
-0
lines changed

4 files changed

+825
-0
lines changed

pkg/hanzo-zap/hanzo_zap/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,16 @@
3030
)
3131
from .client import ZapClient
3232
from .server import ZapServer
33+
from .cloud import CloudClient
3334
from .playground import PlaygroundClient
3435

3536
__version__ = "0.7.0"
3637
__all__ = [
3738
# Wire protocol
3839
"ZapClient",
3940
"ZapServer",
41+
# Cloud (luxfi/zap binary protocol — compatible with Rust hanzo-zap)
42+
"CloudClient",
4043
# Playground
4144
"PlaygroundClient",
4245
# Core types

pkg/hanzo-zap/hanzo_zap/cloud.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""
2+
ZAP Cloud Client — speaks the luxfi/zap binary wire protocol.
3+
4+
Connects to Hanzo Node (port 3692) or Engine via native binary transport.
5+
Compatible with the Rust `hanzo-zap` crate server.
6+
7+
Example:
8+
>>> from hanzo_zap import CloudClient
9+
>>> async with CloudClient.connect("localhost:3692") as client:
10+
... status, body, error = await client.call("chat.completions", "", body_bytes)
11+
"""
12+
13+
from __future__ import annotations
14+
15+
import asyncio
16+
import json
17+
import ssl
18+
import struct
19+
from typing import Any
20+
21+
from .wire import (
22+
REQ_FLAG_REQ,
23+
REQ_FLAG_RESP,
24+
Message,
25+
build_cloud_request,
26+
build_handshake,
27+
parse_cloud_response,
28+
parse_handshake,
29+
read_frame,
30+
write_frame,
31+
)
32+
33+
DEFAULT_ENDPOINT = "localhost:3692"
34+
CLIENT_NODE_ID = "python-sdk"
35+
36+
37+
class CloudClient:
38+
"""
39+
A client that speaks the luxfi/zap binary wire protocol.
40+
41+
Supports both plain TCP (localhost) and TLS (remote endpoints).
42+
"""
43+
44+
def __init__(
45+
self,
46+
reader: asyncio.StreamReader,
47+
writer: asyncio.StreamWriter,
48+
peer_id: str,
49+
) -> None:
50+
self._reader = reader
51+
self._writer = writer
52+
self._peer_id = peer_id
53+
self._req_id = 0
54+
55+
@classmethod
56+
async def connect(
57+
cls,
58+
endpoint: str | None = None,
59+
*,
60+
use_tls: bool | None = None,
61+
node_id: str = CLIENT_NODE_ID,
62+
) -> "CloudClient":
63+
"""
64+
Connect to a ZAP endpoint, perform handshake.
65+
66+
Args:
67+
endpoint: "host:port" (default: localhost:3692)
68+
use_tls: Force TLS on/off. None = auto-detect (TLS for non-localhost).
69+
node_id: Client node ID for handshake.
70+
"""
71+
addr = endpoint or DEFAULT_ENDPOINT
72+
host, _, port_str = addr.rpartition(":")
73+
if not host:
74+
host = addr
75+
port_str = "3692"
76+
port = int(port_str)
77+
78+
# Auto-detect TLS: skip for localhost
79+
if use_tls is None:
80+
use_tls = host not in ("localhost", "127.0.0.1", "::1")
81+
82+
ssl_ctx = None
83+
if use_tls:
84+
ssl_ctx = ssl.create_default_context()
85+
86+
reader, writer = await asyncio.open_connection(host, port, ssl=ssl_ctx)
87+
88+
# Send handshake
89+
hs_bytes = build_handshake(node_id)
90+
await write_frame(writer, hs_bytes)
91+
92+
# Read handshake response
93+
resp_data = await read_frame(reader)
94+
resp_msg = Message.parse(resp_data)
95+
peer_id = parse_handshake(resp_msg)
96+
97+
return cls(reader, writer, peer_id)
98+
99+
@property
100+
def peer_id(self) -> str:
101+
return self._peer_id
102+
103+
async def call(
104+
self,
105+
method: str,
106+
auth: str,
107+
body: bytes,
108+
) -> tuple[int, bytes, str]:
109+
"""
110+
Send a MsgType 100 cloud service request and return (status, body, error).
111+
"""
112+
self._req_id += 1
113+
req_id = self._req_id
114+
115+
# Build ZAP message
116+
msg_bytes = build_cloud_request(method, auth, body)
117+
118+
# Wrap with 8-byte Call correlation header
119+
wrapped = struct.pack("<II", req_id, REQ_FLAG_REQ) + msg_bytes
120+
121+
# Send
122+
await write_frame(self._writer, wrapped)
123+
124+
# Read response — loop until we match our req_id
125+
while True:
126+
data = await read_frame(self._reader)
127+
if len(data) < 8:
128+
continue
129+
130+
resp_id, resp_flag = struct.unpack_from("<II", data, 0)
131+
if resp_flag != REQ_FLAG_RESP:
132+
continue
133+
if resp_id != req_id:
134+
continue
135+
136+
# Parse ZAP message (skip 8-byte Call header)
137+
msg = Message.parse(data[8:])
138+
return parse_cloud_response(msg)
139+
140+
async def chat_completion(
141+
self,
142+
model: str,
143+
messages: list[dict[str, Any]],
144+
auth_token: str = "",
145+
**kwargs: Any,
146+
) -> dict[str, Any]:
147+
"""
148+
High-level: send an OpenAI-compatible chat completion request via ZAP.
149+
150+
Returns the parsed JSON response dict.
151+
"""
152+
request_body: dict[str, Any] = {"model": model, "messages": messages}
153+
request_body.update(kwargs)
154+
155+
body_bytes = json.dumps(request_body).encode("utf-8")
156+
auth = f"Bearer {auth_token}" if auth_token and not auth_token.startswith("Bearer ") else auth_token
157+
158+
status, resp_body, error = await self.call("chat.completions", auth, body_bytes)
159+
160+
if status != 200:
161+
err_msg = error or resp_body.decode("utf-8", errors="replace") or f"ZAP status {status}"
162+
raise RuntimeError(f"ZAP cloud error: {err_msg}")
163+
164+
return json.loads(resp_body)
165+
166+
async def close(self) -> None:
167+
"""Close the connection."""
168+
self._writer.close()
169+
await self._writer.wait_closed()
170+
171+
async def __aenter__(self) -> "CloudClient":
172+
return self
173+
174+
async def __aexit__(self, *args: Any) -> None:
175+
await self.close()

0 commit comments

Comments
 (0)