Skip to content

Commit d26f929

Browse files
committed
refactored lazy tx
1 parent 10bd41a commit d26f929

16 files changed

+383
-270
lines changed

internal/query/client.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -336,12 +336,12 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute
336336
func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) {
337337
settings := options.ExecuteSettings(opts...)
338338
err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
339-
_, r, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
339+
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, withTrace(s.trace))
340340
if err != nil {
341341
return xerrors.WithStackTrace(err)
342342
}
343343

344-
err = readAll(ctx, r)
344+
err = readAll(ctx, streamResult)
345345
if err != nil {
346346
return xerrors.WithStackTrace(err)
347347
}
@@ -380,7 +380,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
380380
) {
381381
settings := options.ExecuteSettings(opts...)
382382
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
383-
_, streamResult, err := execute(ctx, s.ID(), s.client, q,
383+
streamResult, err := execute(ctx, s.ID(), s.client, q,
384384
options.ExecuteSettings(opts...), withTrace(s.trace),
385385
)
386386
if err != nil {
@@ -432,12 +432,12 @@ func clientQueryResultSet(
432432
ctx context.Context, pool sessionPool, q string, settings executeSettings, resultOpts ...resultOption,
433433
) (rs result.ClosableResultSet, finalErr error) {
434434
err := do(ctx, pool, func(ctx context.Context, s *Session) error {
435-
_, r, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
435+
streamResult, err := execute(ctx, s.ID(), s.client, q, settings, resultOpts...)
436436
if err != nil {
437437
return xerrors.WithStackTrace(err)
438438
}
439439

440-
rs, err = readMaterializedResultSet(ctx, r)
440+
rs, err = readMaterializedResultSet(ctx, streamResult)
441441
if err != nil {
442442
return xerrors.WithStackTrace(err)
443443
}

internal/query/client_test.go

Lines changed: 81 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -175,76 +175,80 @@ func TestClient(t *testing.T) {
175175
t.Run("DoTx", func(t *testing.T) {
176176
t.Run("HappyWay", func(t *testing.T) {
177177
ctrl := gomock.NewController(t)
178-
client := NewMockQueryServiceClient(ctrl)
179-
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
180-
stream.EXPECT().Recv().Return(&Ydb_Query.ExecuteQueryResponsePart{
181-
Status: Ydb.StatusIds_SUCCESS,
182-
TxMeta: &Ydb_Query.TransactionMeta{
183-
Id: "456",
184-
},
185-
ResultSetIndex: 0,
186-
ResultSet: &Ydb.ResultSet{
187-
Columns: []*Ydb.Column{
188-
{
189-
Name: "a",
190-
Type: &Ydb.Type{
191-
Type: &Ydb.Type_TypeId{
192-
TypeId: Ydb.Type_UINT64,
193-
},
194-
},
195-
},
196-
{
197-
Name: "b",
198-
Type: &Ydb.Type{
199-
Type: &Ydb.Type_TypeId{
200-
TypeId: Ydb.Type_UTF8,
201-
},
202-
},
178+
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
179+
client := NewMockQueryServiceClient(ctrl)
180+
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
181+
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
182+
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
183+
Status: Ydb.StatusIds_SUCCESS,
184+
}, nil)
185+
186+
return &Ydb_Query.ExecuteQueryResponsePart{
187+
Status: Ydb.StatusIds_SUCCESS,
188+
TxMeta: &Ydb_Query.TransactionMeta{
189+
Id: "456",
203190
},
204-
},
205-
Rows: []*Ydb.Value{
206-
{
207-
Items: []*Ydb.Value{{
208-
Value: &Ydb.Value_Uint64Value{
209-
Uint64Value: 1,
210-
},
211-
}, {
212-
Value: &Ydb.Value_TextValue{
213-
TextValue: "1",
191+
ResultSetIndex: 0,
192+
ResultSet: &Ydb.ResultSet{
193+
Columns: []*Ydb.Column{
194+
{
195+
Name: "a",
196+
Type: &Ydb.Type{
197+
Type: &Ydb.Type_TypeId{
198+
TypeId: Ydb.Type_UINT64,
199+
},
200+
},
214201
},
215-
}},
216-
},
217-
{
218-
Items: []*Ydb.Value{{
219-
Value: &Ydb.Value_Uint64Value{
220-
Uint64Value: 2,
202+
{
203+
Name: "b",
204+
Type: &Ydb.Type{
205+
Type: &Ydb.Type_TypeId{
206+
TypeId: Ydb.Type_UTF8,
207+
},
208+
},
221209
},
222-
}, {
223-
Value: &Ydb.Value_TextValue{
224-
TextValue: "2",
210+
},
211+
Rows: []*Ydb.Value{
212+
{
213+
Items: []*Ydb.Value{{
214+
Value: &Ydb.Value_Uint64Value{
215+
Uint64Value: 1,
216+
},
217+
}, {
218+
Value: &Ydb.Value_TextValue{
219+
TextValue: "1",
220+
},
221+
}},
225222
},
226-
}},
227-
},
228-
{
229-
Items: []*Ydb.Value{{
230-
Value: &Ydb.Value_Uint64Value{
231-
Uint64Value: 3,
223+
{
224+
Items: []*Ydb.Value{{
225+
Value: &Ydb.Value_Uint64Value{
226+
Uint64Value: 2,
227+
},
228+
}, {
229+
Value: &Ydb.Value_TextValue{
230+
TextValue: "2",
231+
},
232+
}},
232233
},
233-
}, {
234-
Value: &Ydb.Value_TextValue{
235-
TextValue: "3",
234+
{
235+
Items: []*Ydb.Value{{
236+
Value: &Ydb.Value_Uint64Value{
237+
Uint64Value: 3,
238+
},
239+
}, {
240+
Value: &Ydb.Value_TextValue{
241+
TextValue: "3",
242+
},
243+
}},
236244
},
237-
}},
245+
},
238246
},
239-
},
240-
},
241-
}, nil)
242-
stream.EXPECT().Recv().Return(nil, io.EOF)
243-
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
244-
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).Return(&Ydb_Query.CommitTransactionResponse{
245-
Status: Ydb.StatusIds_SUCCESS,
246-
}, nil)
247-
err := doTx(ctx, testPool(ctx, func(ctx context.Context) (*Session, error) {
247+
}, nil
248+
})
249+
stream.EXPECT().Recv().Return(nil, io.EOF)
250+
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).Return(stream, nil)
251+
248252
return newTestSessionWithClient("123", client), nil
249253
}), func(ctx context.Context, tx query.TxActor) error {
250254
defer func() {
@@ -283,7 +287,7 @@ func TestClient(t *testing.T) {
283287
})
284288
t.Run("TxLeak", func(t *testing.T) {
285289
t.Run("OnExec", func(t *testing.T) {
286-
t.Run("WithExplicitCommit", func(t *testing.T) {
290+
t.Run("WithoutCommit", func(t *testing.T) {
287291
xtest.TestManyTimes(t, func(t testing.TB) {
288292
txInFlight := 0
289293
ctrl := gomock.NewController(t)
@@ -297,12 +301,11 @@ func TestClient(t *testing.T) {
297301
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
298302
}
299303

304+
txInFlight++
305+
300306
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
301307
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
302-
txInFlight++
303-
304308
stream.EXPECT().Recv().Return(nil, io.EOF)
305-
306309
client.EXPECT().CommitTransaction(gomock.Any(), gomock.Any()).DoAndReturn(
307310
func(ctx context.Context, request *Ydb_Query.CommitTransactionRequest, option ...grpc.CallOption) (
308311
*Ydb_Query.CommitTransactionResponse, error,
@@ -334,7 +337,7 @@ func TestClient(t *testing.T) {
334337
require.Zero(t, txInFlight)
335338
})
336339
})
337-
t.Run("WithLazyCommit", func(t *testing.T) {
340+
t.Run("WithCommit", func(t *testing.T) {
338341
xtest.TestManyTimes(t, func(t testing.TB) {
339342
ctrl := gomock.NewController(t)
340343
txInFlight := 0
@@ -350,14 +353,16 @@ func TestClient(t *testing.T) {
350353
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
351354
}
352355

356+
txInFlight++
357+
353358
stream := NewMockQueryService_ExecuteQueryClient(ctrl)
354359
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
355360
if rand.Int31n(100) < 50 {
361+
txInFlight--
362+
356363
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
357364
}
358365

359-
txInFlight++
360-
361366
stream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
362367
txInFlight--
363368

@@ -386,7 +391,7 @@ func TestClient(t *testing.T) {
386391
})
387392
})
388393
t.Run("OnSecondExec", func(t *testing.T) {
389-
t.Run("WithExplicitCommit", func(t *testing.T) {
394+
t.Run("WithoutCommit", func(t *testing.T) {
390395
xtest.TestManyTimes(t, func(t testing.TB) {
391396
ctrl := gomock.NewController(t)
392397
txInFlight := 0
@@ -400,10 +405,10 @@ func TestClient(t *testing.T) {
400405
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
401406
}
402407

408+
txInFlight++
409+
403410
firstStream := NewMockQueryService_ExecuteQueryClient(ctrl)
404411
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
405-
txInFlight++
406-
407412
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
408413
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn(
409414
func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) (
@@ -476,7 +481,7 @@ func TestClient(t *testing.T) {
476481
require.NoError(t, err)
477482
})
478483
})
479-
t.Run("WithLazyCommit", func(t *testing.T) {
484+
t.Run("WithCommit", func(t *testing.T) {
480485
xtest.TestManyTimes(t, func(t testing.TB) {
481486
ctrl := gomock.NewController(t)
482487
txInFlight := 0
@@ -490,10 +495,10 @@ func TestClient(t *testing.T) {
490495
return nil, xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_BAD_SESSION))
491496
}
492497

498+
txInFlight++
499+
493500
firstStream := NewMockQueryService_ExecuteQueryClient(ctrl)
494501
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
495-
txInFlight++
496-
497502
firstStream.EXPECT().Recv().DoAndReturn(func() (*Ydb_Query.ExecuteQueryResponsePart, error) {
498503
client.EXPECT().ExecuteQuery(gomock.Any(), gomock.Any()).DoAndReturn(
499504
func(ctx context.Context, request *Ydb_Query.ExecuteQueryRequest, option ...grpc.CallOption) (

internal/query/errors.go

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,14 +7,13 @@ import (
77
)
88

99
var (
10-
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
11-
errWrongNextResultSetIndex = errors.New("wrong result set index")
12-
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
13-
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
14-
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
15-
errNoResultSets = errors.New("no result sets")
16-
errUnexpectedTxIDOnCommitFlag = errors.New("unexpected transaction ID on commit flag")
17-
errExpectedTxID = errors.New("expected transaction ID but nil")
18-
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
19-
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
10+
ErrTransactionRollingBack = xerrors.Wrap(errors.New("ydb: the transaction is rolling back"))
11+
errWrongNextResultSetIndex = errors.New("wrong result set index")
12+
errWrongResultSetIndex = errors.New("critical violation of the logic - wrong result set index")
13+
errMoreThanOneRow = errors.New("unexpected more than one row in result set")
14+
errMoreThanOneResultSet = errors.New("unexpected more than one result set")
15+
errNoResultSets = errors.New("no result sets")
16+
errNilOption = errors.New("nil option")
17+
ErrOptionNotForTxExecute = errors.New("option is not for execute on transaction")
18+
errExecuteOnCompletedTx = errors.New("execute on completed transaction")
2019
)

internal/query/execute_query.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
1616
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
17-
"github.com/ydb-platform/ydb-go-sdk/v3/internal/tx"
1817
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
1918
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2019
"github.com/ydb-platform/ydb-go-sdk/v3/query"
@@ -100,7 +99,7 @@ func execute(
10099
ctx context.Context, sessionID string, c Ydb_Query_V1.QueryServiceClient,
101100
q string, settings executeSettings, opts ...resultOption,
102101
) (
103-
_ tx.Identifier, _ *streamResult, finalErr error,
102+
_ *streamResult, finalErr error,
104103
) {
105104
a := allocator.New()
106105
defer a.Free()
@@ -111,19 +110,15 @@ func execute(
111110

112111
stream, err := c.ExecuteQuery(executeCtx, request, callOptions...)
113112
if err != nil {
114-
return nil, nil, xerrors.WithStackTrace(err)
113+
return nil, xerrors.WithStackTrace(err)
115114
}
116115

117-
r, txID, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
116+
r, err := newResult(ctx, stream, append(opts, withStatsCallback(settings.StatsCallback()))...)
118117
if err != nil {
119-
return nil, nil, xerrors.WithStackTrace(err)
120-
}
121-
122-
if txID == "" {
123-
return nil, r, nil
118+
return nil, xerrors.WithStackTrace(err)
124119
}
125120

126-
return tx.ID(txID), r, nil
121+
return r, nil
127122
}
128123

129124
func readAll(ctx context.Context, r *streamResult) error {

0 commit comments

Comments
 (0)