@@ -20,11 +20,15 @@ package genericclient
2020import (
2121 "context"
2222 "runtime"
23+ "sync"
2324
2425 "github.com/cloudwego/kitex/client"
2526 "github.com/cloudwego/kitex/client/callopt"
27+ "github.com/cloudwego/kitex/client/callopt/streamcall"
2628 "github.com/cloudwego/kitex/pkg/generic"
2729 "github.com/cloudwego/kitex/pkg/serviceinfo"
30+ "github.com/cloudwego/kitex/pkg/streaming"
31+ "github.com/cloudwego/kitex/transport"
2832)
2933
3034var _ Client = & genericServiceClient {}
@@ -40,25 +44,40 @@ func NewClientWithServiceInfo(destService string, g generic.Generic, svcInfo *se
4044 var options []client.Option
4145 options = append (options , client .WithGeneric (g ))
4246 options = append (options , client .WithDestService (destService ))
47+ options = append (options , client .WithTransportProtocol (transport .TTHeaderStreaming ))
4348 options = append (options , opts ... )
4449
4550 kc , err := client .NewClient (svcInfo , options ... )
4651 if err != nil {
4752 return nil , err
4853 }
54+ var mp * sync.Map
55+ if ! generic .HasIDLInfo (g ) {
56+ mp = & sync.Map {}
57+ }
4958 cli := & genericServiceClient {
5059 svcInfo : svcInfo ,
5160 kClient : kc ,
61+ sClient : kc .(client.Streaming ),
5262 g : g ,
63+ modeMap : mp ,
5364 }
5465 runtime .SetFinalizer (cli , (* genericServiceClient ).Close )
5566
5667 svcInfo .GenericMethod = func (name string ) serviceinfo.MethodInfo {
57- m := svcInfo .Methods [serviceinfo .GenericMethod ]
68+ key := serviceinfo .GenericMethod
69+ if mp != nil {
70+ if mode , ok := mp .Load (name ); ok {
71+ key = getGenericStreamingMethodInfoKey (mode .(serviceinfo.StreamingMode ))
72+ }
73+ return svcInfo .Methods [key ]
74+ }
5875 n , err := g .GetMethod (nil , name )
5976 if err != nil {
60- return m
77+ return svcInfo . Methods [ key ]
6178 }
79+ key = getGenericStreamingMethodInfoKey (n .StreamingMode )
80+ m := svcInfo .Methods [key ]
6281 return & methodInfo {
6382 MethodInfo : m ,
6483 oneway : n .Oneway ,
@@ -84,12 +103,23 @@ type Client interface {
84103
85104 // GenericCall generic call
86105 GenericCall (ctx context.Context , method string , request interface {}, callOptions ... callopt.Option ) (response interface {}, err error )
106+ // ClientStreaming creates an implementation of ClientStreamingClient
107+ ClientStreaming (ctx context.Context , method string , callOptions ... streamcall.Option ) (ClientStreamingClient , error )
108+ // ServerStreaming creates an implementation of ServerStreamingClient
109+ ServerStreaming (ctx context.Context , method string , req interface {}, callOptions ... streamcall.Option ) (ServerStreamingClient , error )
110+ // BidirectionalStreaming creates an implementation of BidiStreamingClient
111+ BidirectionalStreaming (ctx context.Context , method string , callOptions ... streamcall.Option ) (BidiStreamingClient , error )
87112}
88113
89114type genericServiceClient struct {
90115 svcInfo * serviceinfo.ServiceInfo
91116 kClient client.Client
117+ sClient client.Streaming
92118 g generic.Generic
119+ // modeMap stores the streaming mode of methods for binary generic
120+ // because the streaming mode of a method is not stored in binary generic which doesn't have IDL info
121+ // but we can know it when creating different streaming clients
122+ modeMap * sync.Map // map[string]serviceinfo.StreamingMode
93123}
94124
95125func (gc * genericServiceClient ) GenericCall (ctx context.Context , method string , request interface {}, callOptions ... callopt.Option ) (response interface {}, err error ) {
@@ -121,3 +151,153 @@ func (gc *genericServiceClient) Close() error {
121151 // Notice: don't need to close kClient because finalizer will close it.
122152 return gc .g .Close ()
123153}
154+
155+ func (gc * genericServiceClient ) ClientStreaming (ctx context.Context , method string , callOptions ... streamcall.Option ) (ClientStreamingClient , error ) {
156+ ctx = client .NewCtxWithCallOptions (ctx , streamcall .GetCallOptions (callOptions ))
157+ if gc .modeMap != nil {
158+ gc .modeMap .LoadOrStore (method , serviceinfo .StreamingClient )
159+ }
160+ st , err := gc .sClient .StreamX (ctx , method )
161+ if err != nil {
162+ return nil , err
163+ }
164+ return newClientStreamingClient (gc .svcInfo .MethodInfo (method ), method , st ), nil
165+ }
166+
167+ func (gc * genericServiceClient ) ServerStreaming (ctx context.Context , method string , req interface {}, callOptions ... streamcall.Option ) (ServerStreamingClient , error ) {
168+ ctx = client .NewCtxWithCallOptions (ctx , streamcall .GetCallOptions (callOptions ))
169+ if gc .modeMap != nil {
170+ gc .modeMap .LoadOrStore (method , serviceinfo .StreamingServer )
171+ }
172+ st , err := gc .sClient .StreamX (ctx , method )
173+ if err != nil {
174+ return nil , err
175+ }
176+ stream := newServerStreamingClient (gc .svcInfo .MethodInfo (method ), method , st ).(* serverStreamingClient )
177+
178+ args := stream .methodInfo .NewArgs ().(* generic.Args )
179+ args .Method = stream .method
180+ args .Request = req
181+ if err := st .SendMsg (ctx , args ); err != nil {
182+ return nil , err
183+ }
184+ if err := stream .CloseSend (ctx ); err != nil {
185+ return nil , err
186+ }
187+ return stream , nil
188+ }
189+
190+ func (gc * genericServiceClient ) BidirectionalStreaming (ctx context.Context , method string , callOptions ... streamcall.Option ) (BidiStreamingClient , error ) {
191+ ctx = client .NewCtxWithCallOptions (ctx , streamcall .GetCallOptions (callOptions ))
192+ if gc .modeMap != nil {
193+ gc .modeMap .LoadOrStore (method , serviceinfo .StreamingBidirectional )
194+ }
195+ st , err := gc .sClient .StreamX (ctx , method )
196+ if err != nil {
197+ return nil , err
198+ }
199+ return newBidiStreamingClient (gc .svcInfo .MethodInfo (method ), method , st ), nil
200+ }
201+
202+ // ClientStreamingClient define client side generic client streaming APIs
203+ type ClientStreamingClient interface {
204+ Send (ctx context.Context , req interface {}) error
205+ CloseAndRecv (ctx context.Context ) (interface {}, error )
206+ streaming.ClientStream
207+ }
208+
209+ type clientStreamingClient struct {
210+ methodInfo serviceinfo.MethodInfo
211+ method string
212+ streaming.ClientStream
213+ }
214+
215+ func newClientStreamingClient (methodInfo serviceinfo.MethodInfo , method string , st streaming.ClientStream ) ClientStreamingClient {
216+ return & clientStreamingClient {
217+ methodInfo : methodInfo ,
218+ method : method ,
219+ ClientStream : st ,
220+ }
221+ }
222+
223+ func (c * clientStreamingClient ) Send (ctx context.Context , req interface {}) error {
224+ args := c .methodInfo .NewArgs ().(* generic.Args )
225+ args .Method = c .method
226+ args .Request = req
227+ return c .ClientStream .SendMsg (ctx , args )
228+ }
229+
230+ func (c * clientStreamingClient ) CloseAndRecv (ctx context.Context ) (interface {}, error ) {
231+ if err := c .ClientStream .CloseSend (ctx ); err != nil {
232+ return nil , err
233+ }
234+ res := c .methodInfo .NewResult ().(* generic.Result )
235+ if err := c .ClientStream .RecvMsg (ctx , res ); err != nil {
236+ return nil , err
237+ }
238+ return res .GetSuccess (), nil
239+ }
240+
241+ // ServerStreamingClient define client side generic server streaming APIs
242+ type ServerStreamingClient interface {
243+ Recv (ctx context.Context ) (interface {}, error )
244+ streaming.ClientStream
245+ }
246+
247+ type serverStreamingClient struct {
248+ methodInfo serviceinfo.MethodInfo
249+ method string
250+ streaming.ClientStream
251+ }
252+
253+ func newServerStreamingClient (methodInfo serviceinfo.MethodInfo , method string , st streaming.ClientStream ) ServerStreamingClient {
254+ return & serverStreamingClient {
255+ methodInfo : methodInfo ,
256+ method : method ,
257+ ClientStream : st ,
258+ }
259+ }
260+
261+ func (c * serverStreamingClient ) Recv (ctx context.Context ) (interface {}, error ) {
262+ res := c .methodInfo .NewResult ().(* generic.Result )
263+ if err := c .ClientStream .RecvMsg (ctx , res ); err != nil {
264+ return nil , err
265+ }
266+ return res .GetSuccess (), nil
267+ }
268+
269+ // BidiStreamingClient define client side generic bidirectional streaming APIs
270+ type BidiStreamingClient interface {
271+ Send (ctx context.Context , req interface {}) error
272+ Recv (ctx context.Context ) (interface {}, error )
273+ streaming.ClientStream
274+ }
275+
276+ type bidiStreamingClient struct {
277+ methodInfo serviceinfo.MethodInfo
278+ method string
279+ streaming.ClientStream
280+ }
281+
282+ func newBidiStreamingClient (methodInfo serviceinfo.MethodInfo , method string , st streaming.ClientStream ) BidiStreamingClient {
283+ return & bidiStreamingClient {
284+ methodInfo : methodInfo ,
285+ method : method ,
286+ ClientStream : st ,
287+ }
288+ }
289+
290+ func (c * bidiStreamingClient ) Send (ctx context.Context , req interface {}) error {
291+ args := c .methodInfo .NewArgs ().(* generic.Args )
292+ args .Method = c .method
293+ args .Request = req
294+ return c .ClientStream .SendMsg (ctx , args )
295+ }
296+
297+ func (c * bidiStreamingClient ) Recv (ctx context.Context ) (interface {}, error ) {
298+ res := c .methodInfo .NewResult ().(* generic.Result )
299+ if err := c .ClientStream .RecvMsg (ctx , res ); err != nil {
300+ return nil , err
301+ }
302+ return res .GetSuccess (), nil
303+ }
0 commit comments