Skip to content

Commit 1c141c5

Browse files
authored
Merge pull request #1300 from ydb-platform/operation-status
* Fixed bug with fail cast of grpc response to `operation.{Response,Status}`
2 parents b8f9bed + 21ee389 commit 1c141c5

File tree

8 files changed

+344
-88
lines changed

8 files changed

+344
-88
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed bug with fail cast of grpc response to `operation.{Response,Status}`
2+
13
## v3.74.3
24
* Removed check the node is available for query and table service sessions
35
* Refactored the `balancers.PreferLocations()` function - it is a clean/pure function

internal/conn/conn.go

Lines changed: 106 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import (
1515

1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1717
"github.com/ydb-platform/ydb-go-sdk/v3/internal/meta"
18-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/response"
18+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
1919
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
2020
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
2121
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
@@ -299,109 +299,151 @@ func (c *conn) Close(ctx context.Context) (err error) {
299299
return c.wrapError(err)
300300
}
301301

302+
var (
303+
onTransportErrorStub = func(ctx context.Context, err error) {}
304+
wrapErrorStub = func(err error) error { return err }
305+
)
306+
302307
//nolint:funlen
303-
func (c *conn) Invoke(
308+
func invoke(
304309
ctx context.Context,
305310
method string,
306-
req interface{},
307-
res interface{},
311+
req, reply any,
312+
cc grpc.ClientConnInterface,
313+
onTransportError func(context.Context, error),
314+
address string,
315+
wrapError func(err error) error,
308316
opts ...grpc.CallOption,
309-
) (err error) {
310-
var (
311-
opID string
312-
issues []trace.Issue
313-
useWrapping = UseWrapping(ctx)
314-
onDone = trace.DriverOnConnInvoke(
315-
c.config.Trace(), &ctx,
316-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*conn).Invoke"),
317-
c.endpoint, trace.Method(method),
318-
)
319-
cc *grpc.ClientConn
320-
md = metadata.MD{}
321-
)
322-
defer func() {
323-
meta.CallTrailerCallback(ctx, md)
324-
onDone(err, issues, opID, c.GetState(), md)
325-
}()
317+
) (
318+
opID string,
319+
issues []trace.Issue,
320+
_ error,
321+
) {
322+
useWrapping := UseWrapping(ctx)
326323

327-
cc, err = c.realConn(ctx)
324+
ctx, traceID, err := meta.TraceID(ctx)
328325
if err != nil {
329-
return c.wrapError(err)
326+
return opID, issues, xerrors.WithStackTrace(err)
330327
}
331328

332-
stop := c.lastUsage.Start()
333-
defer stop()
329+
ctx, sentMark := markContext(meta.WithTraceID(ctx, traceID))
334330

335-
ctx, traceID, err := meta.TraceID(ctx)
336-
if err != nil {
337-
return xerrors.WithStackTrace(err)
331+
if onTransportError == nil {
332+
onTransportError = onTransportErrorStub
338333
}
339334

340-
ctx, sentMark := markContext(meta.WithTraceID(ctx, traceID))
335+
if wrapError == nil {
336+
wrapError = wrapErrorStub
337+
}
341338

342-
err = cc.Invoke(ctx, method, req, res, append(opts, grpc.Trailer(&md))...)
339+
err = cc.Invoke(ctx, method, req, reply, opts...)
343340
if err != nil {
344341
if xerrors.IsContextError(err) {
345-
return xerrors.WithStackTrace(err)
342+
return opID, issues, xerrors.WithStackTrace(err)
346343
}
347344

348-
defer func() {
349-
c.onTransportError(ctx, err)
350-
}()
345+
defer onTransportError(ctx, err)
351346

352347
if useWrapping {
353348
err = xerrors.Transport(err,
354-
xerrors.WithAddress(c.Address()),
349+
xerrors.WithAddress(address),
355350
xerrors.WithTraceID(traceID),
356351
)
357352
if sentMark.canRetry() {
358-
return c.wrapError(xerrors.Retryable(err, xerrors.WithName("Invoke")))
353+
return opID, issues, wrapError(xerrors.Retryable(err, xerrors.WithName("Invoke")))
359354
}
360355

361-
return c.wrapError(err)
356+
return opID, issues, wrapError(err)
362357
}
363358

364-
return err
365-
}
366-
367-
err = c.handleResponse(res, &opID, &issues, traceID, useWrapping)
368-
if err != nil {
369-
return err
359+
return opID, issues, err
370360
}
371361

372-
return err
373-
}
374-
375-
func (c *conn) handleResponse(
376-
res interface{},
377-
opID *string,
378-
issues *[]trace.Issue,
379-
traceID string,
380-
useWrapping bool,
381-
) error {
382-
if o, ok := res.(response.Response); ok {
383-
*opID = o.GetOperation().GetId()
384-
for _, issue := range o.GetOperation().GetIssues() {
385-
*issues = append(*issues, issue)
362+
switch t := reply.(type) {
363+
case operation.Response:
364+
opID = t.GetOperation().GetId()
365+
for _, issue := range t.GetOperation().GetIssues() {
366+
issues = append(issues, issue)
386367
}
387368
if useWrapping {
388369
switch {
389-
case !o.GetOperation().GetReady():
390-
return c.wrapError(errOperationNotReady)
370+
case !t.GetOperation().GetReady():
371+
return opID, issues, wrapError(errOperationNotReady)
391372

392-
case o.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS:
393-
return c.wrapError(
373+
case t.GetOperation().GetStatus() != Ydb.StatusIds_SUCCESS:
374+
return opID, issues, wrapError(
394375
xerrors.Operation(
395-
xerrors.FromOperation(o.GetOperation()),
396-
xerrors.WithAddress(c.Address()),
376+
xerrors.FromOperation(t.GetOperation()),
377+
xerrors.WithAddress(address),
378+
xerrors.WithTraceID(traceID),
379+
),
380+
)
381+
}
382+
}
383+
case operation.Status:
384+
for _, issue := range t.GetIssues() {
385+
issues = append(issues, issue)
386+
}
387+
if useWrapping {
388+
if t.GetStatus() != Ydb.StatusIds_SUCCESS {
389+
return opID, issues, wrapError(
390+
xerrors.Operation(
391+
xerrors.FromOperation(t),
392+
xerrors.WithAddress(address),
397393
xerrors.WithTraceID(traceID),
398394
),
399395
)
400396
}
401397
}
402398
}
403399

404-
return nil
400+
return opID, issues, nil
401+
}
402+
403+
func (c *conn) Invoke(
404+
ctx context.Context,
405+
method string,
406+
req interface{},
407+
res interface{},
408+
opts ...grpc.CallOption,
409+
) (err error) {
410+
var (
411+
opID string
412+
issues []trace.Issue
413+
onDone = trace.DriverOnConnInvoke(
414+
c.config.Trace(), &ctx,
415+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/conn.(*conn).Invoke"),
416+
c.endpoint, trace.Method(method),
417+
)
418+
cc *grpc.ClientConn
419+
md = metadata.MD{}
420+
)
421+
defer func() {
422+
meta.CallTrailerCallback(ctx, md)
423+
onDone(err, issues, opID, c.GetState(), md)
424+
}()
425+
426+
cc, err = c.realConn(ctx)
427+
if err != nil {
428+
return c.wrapError(err)
429+
}
430+
431+
stop := c.lastUsage.Start()
432+
defer stop()
433+
434+
opID, issues, err = invoke(
435+
ctx,
436+
method,
437+
req,
438+
res,
439+
cc,
440+
c.onTransportError,
441+
c.Address(),
442+
c.wrapError,
443+
append(opts, grpc.Trailer(&md))...,
444+
)
445+
446+
return err
405447
}
406448

407449
//nolint:funlen

internal/conn/conn_test.go

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package conn
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
8+
"github.com/stretchr/testify/require"
9+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Discovery_V1"
10+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
11+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
12+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Discovery"
13+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
14+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
15+
"go.uber.org/mock/gomock"
16+
"google.golang.org/grpc"
17+
grpcCodes "google.golang.org/grpc/codes"
18+
grpcStatus "google.golang.org/grpc/status"
19+
20+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
21+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
22+
)
23+
24+
//go:generate mockgen -destination grpc_client_conn_interface_mock_test.go -package conn -write_package_comment=false google.golang.org/grpc ClientConnInterface
25+
26+
var _ grpc.ClientConnInterface = (*connMock)(nil)
27+
28+
type connMock struct {
29+
cc grpc.ClientConnInterface
30+
}
31+
32+
func (c connMock) Invoke(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
33+
_, _, err := invoke(ctx, method, args, reply, c.cc, nil, "", nil, opts...)
34+
35+
return err
36+
}
37+
38+
func (c connMock) NewStream(
39+
ctx context.Context, desc *grpc.StreamDesc, method string, opts ...grpc.CallOption,
40+
) (grpc.ClientStream, error) {
41+
return c.cc.NewStream(ctx, desc, method, opts...)
42+
}
43+
44+
func TestConn(t *testing.T) {
45+
t.Run("Invoke", func(t *testing.T) {
46+
t.Run("HappyWay", func(t *testing.T) {
47+
ctx := xtest.Context(t)
48+
ctrl := gomock.NewController(t)
49+
cc := NewMockClientConnInterface(ctrl)
50+
cc.EXPECT().Invoke(
51+
gomock.Any(),
52+
Ydb_Discovery_V1.DiscoveryService_WhoAmI_FullMethodName,
53+
&Ydb_Discovery.WhoAmIRequest{},
54+
&Ydb_Discovery.WhoAmIResponse{},
55+
).DoAndReturn(func(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
56+
res, ok := reply.(*Ydb_Discovery.WhoAmIResponse)
57+
if !ok {
58+
return fmt.Errorf("reply is not *Ydb_Discovery.WhoAmIResponse: %T", reply)
59+
}
60+
61+
res.Operation = &Ydb_Operations.Operation{
62+
Ready: true,
63+
Status: Ydb.StatusIds_SUCCESS,
64+
}
65+
66+
return nil
67+
})
68+
client := Ydb_Discovery_V1.NewDiscoveryServiceClient(&connMock{
69+
cc,
70+
})
71+
response, err := client.WhoAmI(ctx, &Ydb_Discovery.WhoAmIRequest{})
72+
require.NoError(t, err)
73+
require.NotNil(t, response)
74+
})
75+
t.Run("TransportError", func(t *testing.T) {
76+
ctx := xtest.Context(t)
77+
ctrl := gomock.NewController(t)
78+
cc := NewMockClientConnInterface(ctrl)
79+
expectedErr := grpcStatus.Error(grpcCodes.Unavailable, "")
80+
cc.EXPECT().Invoke(
81+
gomock.Any(),
82+
Ydb_Discovery_V1.DiscoveryService_WhoAmI_FullMethodName,
83+
&Ydb_Discovery.WhoAmIRequest{},
84+
&Ydb_Discovery.WhoAmIResponse{},
85+
).Return(expectedErr)
86+
client := Ydb_Discovery_V1.NewDiscoveryServiceClient(&connMock{
87+
cc,
88+
})
89+
response, err := client.WhoAmI(ctx, &Ydb_Discovery.WhoAmIRequest{})
90+
require.Error(t, err)
91+
require.True(t, xerrors.IsTransportError(err, grpcCodes.Unavailable))
92+
require.Nil(t, response)
93+
})
94+
t.Run("OperationError", func(t *testing.T) {
95+
ctx := xtest.Context(t)
96+
ctrl := gomock.NewController(t)
97+
t.Run("discovery.WhoAmI", func(t *testing.T) {
98+
cc := NewMockClientConnInterface(ctrl)
99+
cc.EXPECT().Invoke(
100+
gomock.Any(),
101+
Ydb_Discovery_V1.DiscoveryService_WhoAmI_FullMethodName,
102+
&Ydb_Discovery.WhoAmIRequest{},
103+
&Ydb_Discovery.WhoAmIResponse{},
104+
).DoAndReturn(func(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
105+
res, ok := reply.(*Ydb_Discovery.WhoAmIResponse)
106+
if !ok {
107+
return fmt.Errorf("reply is not *Ydb_Discovery.WhoAmIResponse: %T", reply)
108+
}
109+
110+
res.Operation = &Ydb_Operations.Operation{
111+
Ready: true,
112+
Status: Ydb.StatusIds_UNAVAILABLE,
113+
}
114+
115+
return nil
116+
})
117+
client := Ydb_Discovery_V1.NewDiscoveryServiceClient(&connMock{
118+
cc,
119+
})
120+
response, err := client.WhoAmI(ctx, &Ydb_Discovery.WhoAmIRequest{})
121+
require.Error(t, err)
122+
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
123+
require.Nil(t, response)
124+
})
125+
t.Run("query.BeginTransaction", func(t *testing.T) {
126+
cc := NewMockClientConnInterface(ctrl)
127+
cc.EXPECT().Invoke(
128+
gomock.Any(),
129+
Ydb_Query_V1.QueryService_BeginTransaction_FullMethodName,
130+
&Ydb_Query.BeginTransactionRequest{},
131+
&Ydb_Query.BeginTransactionResponse{},
132+
).DoAndReturn(func(ctx context.Context, method string, args any, reply any, opts ...grpc.CallOption) error {
133+
res, ok := reply.(*Ydb_Query.BeginTransactionResponse)
134+
if !ok {
135+
return fmt.Errorf("reply is not *Ydb_Query.BeginTransactionResponse: %T", reply)
136+
}
137+
138+
res.Status = Ydb.StatusIds_UNAVAILABLE
139+
140+
return nil
141+
})
142+
client := Ydb_Query_V1.NewQueryServiceClient(&connMock{
143+
cc,
144+
})
145+
response, err := client.BeginTransaction(ctx, &Ydb_Query.BeginTransactionRequest{})
146+
require.Error(t, err)
147+
require.True(t, xerrors.IsOperationError(err, Ydb.StatusIds_UNAVAILABLE))
148+
require.Nil(t, response)
149+
})
150+
})
151+
})
152+
}

0 commit comments

Comments
 (0)