Skip to content

Commit 6e74c8b

Browse files
authored
Merge pull request #1244 from ydb-platform/query-read-row
* Added `query.ReadRow` method for execute query and read only one ro…
2 parents 56a634a + 6bb723e commit 6e74c8b

File tree

12 files changed

+564
-26
lines changed

12 files changed

+564
-26
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Added `query.ReadRow` method for execute query and read only one row from result
2+
13
## v3.68.1
24
* Downgraded minimal version of Go to 1.20
35
* Refactored internal packages by `ifshort` linter issues

internal/query/client.go

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package query
22

33
import (
44
"context"
5+
"io"
56

67
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
78
"google.golang.org/grpc"
@@ -94,18 +95,16 @@ func do(
9495
}
9596

9697
func (c *Client) Do(ctx context.Context, op query.Operation, opts ...options.DoOption) error {
97-
select {
98-
case <-c.done:
99-
return xerrors.WithStackTrace(errClosedClient)
100-
default:
101-
onDone := trace.QueryOnDo(c.config.Trace(), &ctx,
102-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Do"),
103-
)
104-
attempts, err := do(ctx, c.pool, op, c.config.Trace(), opts...)
105-
onDone(attempts, err)
106-
107-
return err
108-
}
98+
ctx, cancel := xcontext.WithDone(ctx, c.done)
99+
defer cancel()
100+
101+
onDone := trace.QueryOnDo(c.config.Trace(), &ctx,
102+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).Do"),
103+
)
104+
attempts, err := do(ctx, c.pool, op, c.config.Trace(), opts...)
105+
onDone(attempts, err)
106+
107+
return err
109108
}
110109

111110
func doTx(
@@ -150,19 +149,78 @@ func doTx(
150149
return attempts, nil
151150
}
152151

153-
func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (err error) {
154-
select {
155-
case <-c.done:
156-
return xerrors.WithStackTrace(errClosedClient)
157-
default:
158-
onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx,
159-
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).DoTx"),
160-
)
161-
attempts, err := doTx(ctx, c.pool, op, c.config.Trace(), opts...)
162-
onDone(attempts, err)
163-
164-
return err
152+
func readRow(ctx context.Context,
153+
pool *pool.Pool[*Session, Session],
154+
q string,
155+
t *trace.Query,
156+
opts ...options.ExecuteOption,
157+
) (row query.Row, err error) {
158+
_, err = do(ctx, pool, func(ctx context.Context, s query.Session) (err error) {
159+
_, r, err := s.Execute(ctx, q, opts...)
160+
if err != nil {
161+
return xerrors.WithStackTrace(err)
162+
}
163+
defer func() {
164+
_ = r.Close(ctx)
165+
}()
166+
rs, err := r.NextResultSet(ctx)
167+
if err != nil {
168+
return xerrors.WithStackTrace(err)
169+
}
170+
row, err = rs.NextRow(ctx)
171+
if err != nil {
172+
return xerrors.WithStackTrace(err)
173+
}
174+
175+
if _, err = rs.NextRow(ctx); err == nil || !xerrors.Is(err, io.EOF) {
176+
return xerrors.WithStackTrace(errMoreThanOneRow)
177+
}
178+
179+
if _, err = r.NextResultSet(ctx); err == nil || !xerrors.Is(err, io.EOF) {
180+
return xerrors.WithStackTrace(errMoreThanOneResultSet)
181+
}
182+
183+
return r.Err()
184+
}, t)
185+
if err != nil {
186+
return nil, xerrors.WithStackTrace(err)
187+
}
188+
189+
return row, nil
190+
}
191+
192+
// ReadRow is a helper which read only one row from first result set in result
193+
func (c *Client) ReadRow(ctx context.Context, q string, opts ...options.ExecuteOption) (row query.Row, err error) {
194+
ctx, cancel := xcontext.WithDone(ctx, c.done)
195+
defer cancel()
196+
197+
onDone := trace.QueryOnReadRow(c.config.Trace(), &ctx,
198+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).ReadRow"),
199+
q,
200+
)
201+
defer func() {
202+
onDone(err)
203+
}()
204+
205+
row, err = readRow(ctx, c.pool, q, c.config.Trace(), opts...)
206+
if err != nil {
207+
return nil, xerrors.WithStackTrace(err)
165208
}
209+
210+
return row, nil
211+
}
212+
213+
func (c *Client) DoTx(ctx context.Context, op query.TxOperation, opts ...options.DoTxOption) (err error) {
214+
ctx, cancel := xcontext.WithDone(ctx, c.done)
215+
defer cancel()
216+
217+
onDone := trace.QueryOnDoTx(c.config.Trace(), &ctx,
218+
stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Client).DoTx"),
219+
)
220+
attempts, err := doTx(ctx, c.pool, op, c.config.Trace(), opts...)
221+
onDone(attempts, err)
222+
223+
return err
166224
}
167225

168226
func New(ctx context.Context, balancer balancer, cfg *config.Config) *Client {

internal/query/client_test.go

Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package query
33
import (
44
"context"
55
"errors"
6+
"io"
67
"testing"
78
"time"
89

@@ -256,3 +257,237 @@ func TestDoTx(t *testing.T) {
256257
require.Equal(t, 10, counter)
257258
})
258259
}
260+
261+
func TestReadRow(t *testing.T) {
262+
ctx := xtest.Context(t)
263+
t.Run("HappyWay", func(t *testing.T) {
264+
ctrl := gomock.NewController(t)
265+
row, err := readRow(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
266+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
267+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
268+
Status: Ydb.StatusIds_SUCCESS,
269+
TxMeta: &Ydb_Query.TransactionMeta{
270+
Id: "456",
271+
},
272+
ResultSetIndex: 0,
273+
ResultSet: &Ydb.ResultSet{
274+
Columns: []*Ydb.Column{
275+
{
276+
Name: "a",
277+
Type: &Ydb.Type{
278+
Type: &Ydb.Type_TypeId{
279+
TypeId: Ydb.Type_UINT64,
280+
},
281+
},
282+
},
283+
{
284+
Name: "b",
285+
Type: &Ydb.Type{
286+
Type: &Ydb.Type_TypeId{
287+
TypeId: Ydb.Type_UTF8,
288+
},
289+
},
290+
},
291+
},
292+
Rows: []*Ydb.Value{
293+
{
294+
Items: []*Ydb.Value{{
295+
Value: &Ydb.Value_Uint64Value{
296+
Uint64Value: 1,
297+
},
298+
}, {
299+
Value: &Ydb.Value_TextValue{
300+
TextValue: "1",
301+
},
302+
}},
303+
},
304+
},
305+
},
306+
}, nil)
307+
stream.EXPECT().Recv().Return(nil, io.EOF)
308+
client := NewMockQueryServiceClient(ctrl)
309+
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
310+
311+
return &Session{
312+
id: "123",
313+
statusCode: statusIdle,
314+
cfg: config.New(),
315+
grpcClient: client,
316+
}, nil
317+
}), "", nil)
318+
require.NoError(t, err)
319+
var (
320+
a uint64
321+
b string
322+
)
323+
err = row.Scan(&a, &b)
324+
require.NoError(t, err)
325+
require.EqualValues(t, 1, a)
326+
require.EqualValues(t, "1", b)
327+
})
328+
t.Run("MoreThanOneRow", func(t *testing.T) {
329+
ctrl := gomock.NewController(t)
330+
row, err := readRow(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
331+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
332+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
333+
Status: Ydb.StatusIds_SUCCESS,
334+
TxMeta: &Ydb_Query.TransactionMeta{
335+
Id: "456",
336+
},
337+
ResultSetIndex: 0,
338+
ResultSet: &Ydb.ResultSet{
339+
Columns: []*Ydb.Column{
340+
{
341+
Name: "a",
342+
Type: &Ydb.Type{
343+
Type: &Ydb.Type_TypeId{
344+
TypeId: Ydb.Type_UINT64,
345+
},
346+
},
347+
},
348+
{
349+
Name: "b",
350+
Type: &Ydb.Type{
351+
Type: &Ydb.Type_TypeId{
352+
TypeId: Ydb.Type_UTF8,
353+
},
354+
},
355+
},
356+
},
357+
Rows: []*Ydb.Value{
358+
{
359+
Items: []*Ydb.Value{{
360+
Value: &Ydb.Value_Uint64Value{
361+
Uint64Value: 1,
362+
},
363+
}, {
364+
Value: &Ydb.Value_TextValue{
365+
TextValue: "1",
366+
},
367+
}},
368+
},
369+
{
370+
Items: []*Ydb.Value{{
371+
Value: &Ydb.Value_Uint64Value{
372+
Uint64Value: 2,
373+
},
374+
}, {
375+
Value: &Ydb.Value_TextValue{
376+
TextValue: "2",
377+
},
378+
}},
379+
},
380+
},
381+
},
382+
}, nil)
383+
client := NewMockQueryServiceClient(ctrl)
384+
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
385+
386+
return &Session{
387+
id: "123",
388+
statusCode: statusIdle,
389+
cfg: config.New(),
390+
grpcClient: client,
391+
}, nil
392+
}), "", nil)
393+
require.ErrorIs(t, err, errMoreThanOneRow)
394+
require.Nil(t, row)
395+
})
396+
t.Run("MoreThanOneResultSet", func(t *testing.T) {
397+
ctrl := gomock.NewController(t)
398+
row, err := readRow(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
399+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
400+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
401+
Status: Ydb.StatusIds_SUCCESS,
402+
TxMeta: &Ydb_Query.TransactionMeta{
403+
Id: "456",
404+
},
405+
ResultSetIndex: 0,
406+
ResultSet: &Ydb.ResultSet{
407+
Columns: []*Ydb.Column{
408+
{
409+
Name: "a",
410+
Type: &Ydb.Type{
411+
Type: &Ydb.Type_TypeId{
412+
TypeId: Ydb.Type_UINT64,
413+
},
414+
},
415+
},
416+
{
417+
Name: "b",
418+
Type: &Ydb.Type{
419+
Type: &Ydb.Type_TypeId{
420+
TypeId: Ydb.Type_UTF8,
421+
},
422+
},
423+
},
424+
},
425+
Rows: []*Ydb.Value{
426+
{
427+
Items: []*Ydb.Value{{
428+
Value: &Ydb.Value_Uint64Value{
429+
Uint64Value: 1,
430+
},
431+
}, {
432+
Value: &Ydb.Value_TextValue{
433+
TextValue: "1",
434+
},
435+
}},
436+
},
437+
},
438+
},
439+
}, nil)
440+
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
441+
Status: Ydb.StatusIds_SUCCESS,
442+
TxMeta: &Ydb_Query.TransactionMeta{
443+
Id: "456",
444+
},
445+
ResultSetIndex: 0,
446+
ResultSet: &Ydb.ResultSet{
447+
Columns: []*Ydb.Column{
448+
{
449+
Name: "a",
450+
Type: &Ydb.Type{
451+
Type: &Ydb.Type_TypeId{
452+
TypeId: Ydb.Type_UINT64,
453+
},
454+
},
455+
},
456+
{
457+
Name: "b",
458+
Type: &Ydb.Type{
459+
Type: &Ydb.Type_TypeId{
460+
TypeId: Ydb.Type_UTF8,
461+
},
462+
},
463+
},
464+
},
465+
Rows: []*Ydb.Value{
466+
{
467+
Items: []*Ydb.Value{{
468+
Value: &Ydb.Value_Uint64Value{
469+
Uint64Value: 1,
470+
},
471+
}, {
472+
Value: &Ydb.Value_TextValue{
473+
TextValue: "1",
474+
},
475+
}},
476+
},
477+
},
478+
},
479+
}, nil)
480+
client := NewMockQueryServiceClient(ctrl)
481+
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
482+
483+
return &Session{
484+
id: "123",
485+
statusCode: statusIdle,
486+
cfg: config.New(),
487+
grpcClient: client,
488+
}, nil
489+
}), "", nil)
490+
require.ErrorIs(t, err, errMoreThanOneRow)
491+
require.Nil(t, row)
492+
})
493+
}

internal/query/errors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ var (
88
ErrNotImplemented = errors.New("not implemented yet")
99
errWrongNextResultSetIndex = errors.New("wrong result set index")
1010
errClosedResult = errors.New("result closed early")
11-
errClosedClient = errors.New("query client closed early")
1211
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
12+
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
13+
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
1314
)

0 commit comments

Comments
 (0)