Skip to content

Commit 880fae4

Browse files
committed
docs(generic streaming): update streamCli.Close() usage in code examples for clarity
1 parent 7f1c06a commit 880fae4

File tree

1 file changed

+286
-0
lines changed

1 file changed

+286
-0
lines changed
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
---
2+
title: "流式泛化调用用户指南"
3+
date: 2025-05-07
4+
weight: 6
5+
keywords: ["流式泛化调用用户指南"]
6+
description: ""
7+
---
8+
9+
## 简介
10+
11+
**Kitex v0.12.0 起支持流式接口的 JSON 泛化调用(仅客户端)**
12+
13+
## 使用方法
14+
15+
### 泛化流式客户端初始化
16+
17+
#### protobuf
18+
19+
以如下 Protobuf IDL 为例:
20+
21+
```protobuf
22+
syntax = "proto3";
23+
package pb;
24+
option go_package = "pb";
25+
26+
message Request {
27+
string message = 1;
28+
}
29+
30+
message Response {
31+
string message = 1;
32+
}
33+
34+
service StreamingService {
35+
rpc StreamRequestEcho (stream Request) returns (Response) {}
36+
rpc StreamResponseEcho (Request) returns (stream Response) {}
37+
rpc BidirectionalEcho (stream Request) returns (stream Response) {}
38+
rpc UnaryEcho (Request) returns (Response) {}
39+
}
40+
```
41+
42+
上述 IDL 包含四种方法,分别对应四种场景:
43+
44+
1. 客户端流:客户端发送多条消息,服务端返回一条消息后关闭流。
45+
2. 服务端流:客户端发送一条消息,服务端返回多条消息后关闭流,适合大模型等场景。
46+
3. 双向流:客户端和服务端可独立收发消息,顺序可自定义。
47+
4. 单次调用(非流式)。
48+
49+
流式客户端初始化示例:
50+
51+
```go
52+
ctx := context.Background()
53+
54+
// 初始化泛化客户端
55+
dOpts := proto.Options{}
56+
p, err := generic.NewPbFileProviderWithDynamicGo(idlPath, ctx, dOpts)
57+
if err != nil {
58+
log.Fatal(err)
59+
}
60+
61+
// 创建 JSON 泛化对象
62+
g, err := generic.JSONPbGeneric(p)
63+
if err != nil {
64+
log.Fatal(err)
65+
}
66+
67+
// 初始化流式客户端
68+
cli, err := genericclient.NewStreamingClient(
69+
"streaming",
70+
g,
71+
client.WithTransportProtocol(transport.GRPC),
72+
client.WithHostPorts("127.0.0.1:8888"),
73+
client.WithMetaHandler(transmeta.ClientHTTP2Handler),
74+
)
75+
76+
// ... 其他流式调用示例 ...
77+
```
78+
79+
#### thrift
80+
81+
以如下 Thrift IDL 为例:
82+
83+
```thrift
84+
namespace go echo
85+
86+
struct Request {
87+
1: required string message,
88+
}
89+
90+
struct Response {
91+
1: required string message,
92+
}
93+
94+
service TestService {
95+
Response Echo (1: Request req) (streaming.mode="bidirectional"),
96+
Response EchoClient (1: Request req) (streaming.mode="client"),
97+
Response EchoServer (1: Request req) (streaming.mode="server"),
98+
// Response EchoUnary (1: Request req) (streaming.mode="unary"), // 不推荐
99+
100+
Response EchoPingPong (1: Request req), // KitexThrift,非流式
101+
}
102+
```
103+
104+
上述 IDL 包含以下场景:
105+
106+
1. 客户端流:客户端发送多条消息,服务端返回一条消息后关闭流。
107+
2. 服务端流:客户端发送一条消息,服务端返回多条消息后关闭流,适合大模型等场景。
108+
3. 双向流:客户端和服务端可独立收发消息,顺序可自定义。
109+
4. 单次调用(gRPC):带 `streaming.mode` 注解的非流式(不推荐,性能有损失)。
110+
5. 单次调用(KitexThrift):非流式(推荐)。
111+
112+
流式客户端初始化示例:
113+
114+
```go
115+
// 1. 创建 Thrift 文件提供者
116+
p, err := generic.NewThriftFileProvider("../idl/streaming.thrift")
117+
if err != nil {
118+
log.Fatal(err)
119+
}
120+
121+
// 2. 创建 JSON Thrift 泛化调用
122+
g, err := generic.JSONThriftGeneric(p)
123+
if err != nil {
124+
log.Fatal(err)
125+
}
126+
127+
// 3. 创建流式客户端
128+
cli, err := genericclient.NewStreamingClient(
129+
"streaming_service",
130+
g,
131+
client.WithTransportProtocol(transport.GRPC),
132+
client.WithHostPorts("127.0.0.1:8888"),
133+
)
134+
// ... 其他流式调用示例 ...
135+
```
136+
137+
### 客户端流(Client Streaming)
138+
139+
示例:
140+
141+
```go
142+
// 使用已创建的流式客户端初始化 client streaming
143+
streamCli, err := genericclient.NewClientStreaming(ctx, cli, "EchoClient")
144+
145+
// 发送多个请求
146+
for i := 0; i < 3; i++ {
147+
req := fmt.Sprintf(`{"message": "grpc client streaming generic %dth request"}`, i)
148+
if err = streamCli.Send(req); err != nil {
149+
return fmt.Errorf("failed to send: %v", err)
150+
}
151+
time.Sleep(time.Second)
152+
}
153+
154+
// 接收最终响应
155+
resp, err := streamCli.CloseAndRecv()
156+
strResp, ok := resp.(string) // 响应为 json 字符串
157+
```
158+
159+
### 服务端流(Server Streaming)
160+
161+
注意:`Recv` 返回非 nil 错误(包括 `io.EOF`)表示服务端已发送完毕或出错。
162+
163+
示例:
164+
165+
```go
166+
// 使用已创建的流式客户端初始化 server streaming,并发送消息
167+
streamCli, err := genericclient.NewServerStreaming(ctx, cli, "EchoServer", `{"message": "grpc server streaming generic request"}`)
168+
169+
// 接收多个响应
170+
for {
171+
resp, err := streamCli.Recv()
172+
if err == io.EOF {
173+
fmt.Println("Server streaming message receive done. stream is closed")
174+
break
175+
} else if err != nil {
176+
return fmt.Errorf("failed to receive: %v", err)
177+
}
178+
179+
strResp, ok := resp.(string)
180+
}
181+
```
182+
183+
### 双向流(Bidirectional Streaming)
184+
185+
示例:
186+
187+
```go
188+
// 使用已创建的流式客户端初始化 bidirectional streaming
189+
streamCli, err := genericclient.NewBidirectionalStreaming(ctx, cli, "Echo")
190+
if err != nil {
191+
return fmt.Errorf("failed to create bidirectional streaming: %v", err)
192+
}
193+
194+
wg := &sync.WaitGroup{}
195+
wg.Add(2)
196+
var sendErr, recvErr error
197+
198+
// 发送消息
199+
go func() {
200+
defer func() {
201+
if p := recover(); p != nil {
202+
sendErr = fmt.Errorf("panic: %v", p)
203+
}
204+
wg.Done()
205+
}()
206+
207+
for i := 0; i < 3; i++ {
208+
req := fmt.Sprintf(`{"message": "grpc bidirectional streaming generic %dth request"}`, i)
209+
if err = streamCli.Send(req); err != nil {
210+
sendErr = fmt.Errorf("bidirectionalStreaming send: failed, err = %v", err)
211+
break
212+
}
213+
klog.Infof("BidirectionalStreamingTest send: req = %+v", req)
214+
}
215+
216+
// 发送完所有消息后关闭客户端到服务端的流方向
217+
if cerr := streamCli.Close(); cerr != nil {
218+
sendErr = fmt.Errorf("stream close failed: %v", cerr)
219+
}
220+
}()
221+
222+
223+
// 接收消息
224+
go func() {
225+
defer func() {
226+
if p := recover(); p != nil {
227+
recvErr = fmt.Errorf("panic: %v", p)
228+
}
229+
wg.Done()
230+
}()
231+
232+
for {
233+
resp, err := streamCli.Recv()
234+
if err == io.EOF {
235+
klog.Infof("bidirectionalStreaming message receive done. stream is closed")
236+
break
237+
} else if err != nil {
238+
recvErr = fmt.Errorf("failed to recv: %v", err)
239+
break
240+
}
241+
242+
strResp, ok := resp.(string)
243+
}
244+
}()
245+
246+
wg.Wait()
247+
```
248+
249+
### 单次调用(PingPong)
250+
251+
用法与普通(非流式)泛化调用类似。
252+
253+
示例:
254+
255+
```go
256+
resp, err := cli.GenericCall(ctx, "EchoPingPong", `{"message": "unary request"}`)
257+
strResp, ok := resp.(string) // 响应为 json 字符串
258+
```
259+
260+
## 常见问题(FAQ)
261+
262+
### Recv() got err: rpc error: code = 12 desc = Method not found!
263+
264+
该错误出现在 Kitex **protobuf** 泛化流式调用下游为 **gRPC-python**(或其他语言 gRPC 库)时。
265+
266+
根因是 Kitex 没有解析 protobuf idl 的 package,导致 gRPC 请求的 `:path` 缺少 package 部分,gRPC-python 找不到对应方法。
267+
268+
例如:
269+
270+
- 普通客户端
271+
272+
`:path` - /search.gpt_engine.GPTStreamService/GPTGeneration
273+
274+
- protobuf 泛化客户端
275+
276+
`:path` - /GPTStreamService/GPTGeneration
277+
278+
#### 解决办法
279+
280+
可用如下分支修复,等待 Kitex v1.18.1 正式发布后即可解决:
281+
282+
```shell
283+
go get -u github.com/cloudwego/[email protected]
284+
```
285+
286+
如需完整 main 函数示例,请参考官方 demo。

0 commit comments

Comments
 (0)