Skip to content

Commit 32bee0b

Browse files
author
anatoly32322
committed
Rollback changes
1 parent de98eb7 commit 32bee0b

File tree

3 files changed

+39
-83
lines changed

3 files changed

+39
-83
lines changed

internal/query/session.go

Lines changed: 29 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66
"sync/atomic"
77

88
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
9-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
109
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
1110

1211
"github.com/ydb-platform/ydb-go-sdk/v3/internal/allocator"
12+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/config"
1313
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
1414
"github.com/ydb-platform/ydb-go-sdk/v3/internal/stack"
1515
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xcontext"
@@ -23,11 +23,11 @@ var _ query.Session = (*Session)(nil)
2323

2424
type (
2525
Session struct {
26+
cfg *config.Config
2627
id string
2728
nodeID int64
2829
grpcClient Ydb_Query_V1.QueryServiceClient
2930
statusCode statusCode
30-
trace *trace.Query
3131
closeOnce func(ctx context.Context) error
3232
checks []func(s *Session) bool
3333
}
@@ -40,19 +40,13 @@ func withSessionCheck(f func(*Session) bool) sessionOption {
4040
}
4141
}
4242

43-
func withSessionTrace(t *trace.Query) sessionOption {
44-
return func(s *Session) {
45-
s.trace = s.trace.Compose(t)
46-
}
47-
}
48-
4943
func createSession(
50-
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, opts ...sessionOption,
44+
ctx context.Context, client Ydb_Query_V1.QueryServiceClient, cfg *config.Config, opts ...sessionOption,
5145
) (s *Session, finalErr error) {
5246
s = &Session{
47+
cfg: cfg,
5348
grpcClient: client,
5449
statusCode: statusUnknown,
55-
trace: &trace.Query{},
5650
checks: []func(*Session) bool{
5751
func(s *Session) bool {
5852
switch s.status() {
@@ -74,22 +68,14 @@ func createSession(
7468
opt(s)
7569
}
7670

77-
onDone := trace.QueryOnSessionCreate(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.createSession"))
71+
onDone := trace.QueryOnSessionCreate(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.createSession"))
7872
defer func() {
7973
onDone(s, finalErr)
8074
}()
8175

8276
response, err := client.CreateSession(ctx, &Ydb_Query.CreateSessionRequest{})
8377
if err != nil {
84-
return nil, xerrors.WithStackTrace(
85-
xerrors.Transport(err),
86-
)
87-
}
88-
89-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
90-
return nil, xerrors.WithStackTrace(
91-
xerrors.FromOperation(response),
92-
)
78+
return nil, xerrors.WithStackTrace(err)
9379
}
9480

9581
defer func() {
@@ -103,9 +89,7 @@ func createSession(
10389

10490
err = s.attach(ctx)
10591
if err != nil {
106-
return nil, xerrors.WithStackTrace(
107-
xerrors.Transport(err),
108-
)
92+
return nil, xerrors.WithStackTrace(err)
10993
}
11094

11195
s.setStatus(statusIdle)
@@ -114,7 +98,7 @@ func createSession(
11498
}
11599

116100
func (s *Session) attach(ctx context.Context) (finalErr error) {
117-
onDone := trace.QueryOnSessionAttach(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).attach"), s)
101+
onDone := trace.QueryOnSessionAttach(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).attach"), s)
118102
defer func() {
119103
onDone(finalErr)
120104
}()
@@ -125,30 +109,30 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
125109
SessionId: s.id,
126110
})
127111
if err != nil {
128-
return xerrors.WithStackTrace(
129-
xerrors.Transport(err),
130-
)
112+
return xerrors.WithStackTrace(err)
131113
}
132114

133-
state, err := attach.Recv()
115+
_, err = attach.Recv()
134116
if err != nil {
135117
cancelAttach()
136118

137-
return xerrors.WithStackTrace(xerrors.Transport(err))
138-
}
139-
140-
if state.GetStatus() != Ydb.StatusIds_SUCCESS {
141-
cancelAttach()
142-
143-
return xerrors.WithStackTrace(xerrors.FromOperation(state))
119+
return xerrors.WithStackTrace(err)
144120
}
145121

146122
s.closeOnce = xsync.OnceFunc(func(ctx context.Context) (err error) {
147-
cancelAttach()
123+
defer cancelAttach()
148124

149125
s.setStatus(statusClosing)
150126
defer s.setStatus(statusClosed)
151127

128+
var cancel context.CancelFunc
129+
if d := s.cfg.SessionDeleteTimeout(); d > 0 {
130+
ctx, cancel = xcontext.WithTimeout(ctx, d)
131+
} else {
132+
ctx, cancel = xcontext.WithCancel(ctx)
133+
}
134+
defer cancel()
135+
152136
if err = deleteSession(ctx, s.grpcClient, s.id); err != nil {
153137
return xerrors.WithStackTrace(err)
154138
}
@@ -158,26 +142,21 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
158142

159143
go func() {
160144
defer func() {
161-
_ = s.closeOnce(ctx)
145+
_ = s.closeOnce(xcontext.ValueOnly(ctx))
162146
}()
163147

164148
for {
165149
if !s.IsAlive() {
166150
return
167151
}
168-
recv, recvErr := attach.Recv()
152+
_, recvErr := attach.Recv()
169153
if recvErr != nil {
170154
if xerrors.Is(recvErr, io.EOF) {
171155
s.setStatus(statusClosed)
172156
} else {
173157
s.setStatus(statusError)
174158
}
175159

176-
return
177-
}
178-
if recv.GetStatus() != Ydb.StatusIds_SUCCESS {
179-
s.setStatus(statusError)
180-
181160
return
182161
}
183162
}
@@ -187,16 +166,13 @@ func (s *Session) attach(ctx context.Context) (finalErr error) {
187166
}
188167

189168
func deleteSession(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID string) error {
190-
response, err := client.DeleteSession(ctx,
169+
_, err := client.DeleteSession(ctx,
191170
&Ydb_Query.DeleteSessionRequest{
192171
SessionId: sessionID,
193172
},
194173
)
195174
if err != nil {
196-
return xerrors.WithStackTrace(xerrors.Transport(err))
197-
}
198-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
199-
return xerrors.WithStackTrace(xerrors.FromOperation(response))
175+
return xerrors.WithStackTrace(err)
200176
}
201177

202178
return nil
@@ -213,7 +189,7 @@ func (s *Session) IsAlive() bool {
213189
}
214190

215191
func (s *Session) Close(ctx context.Context) (err error) {
216-
onDone := trace.QueryOnSessionDelete(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Close"), s)
192+
onDone := trace.QueryOnSessionDelete(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Close"), s)
217193
defer func() {
218194
onDone(err)
219195
}()
@@ -240,13 +216,10 @@ func begin(
240216
},
241217
)
242218
if err != nil {
243-
return nil, xerrors.WithStackTrace(xerrors.Transport(err))
244-
}
245-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
246-
return nil, xerrors.WithStackTrace(xerrors.FromOperation(response))
219+
return nil, xerrors.WithStackTrace(err)
247220
}
248221

249-
return newTransaction(response.GetTxMeta().GetId(), s, s.trace), nil
222+
return newTransaction(response.GetTxMeta().GetId(), s), nil
250223
}
251224

252225
func (s *Session) Begin(
@@ -257,7 +230,7 @@ func (s *Session) Begin(
257230
) {
258231
var tx *transaction
259232

260-
onDone := trace.QueryOnSessionBegin(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Begin"), s)
233+
onDone := trace.QueryOnSessionBegin(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Begin"), s)
261234
defer func() {
262235
onDone(err, tx)
263236
}()
@@ -294,7 +267,7 @@ func (s *Session) Status() string {
294267
func (s *Session) Execute(
295268
ctx context.Context, q string, opts ...options.ExecuteOption,
296269
) (_ query.Transaction, _ query.Result, err error) {
297-
onDone := trace.QueryOnSessionExecute(s.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Execute"), s, q)
270+
onDone := trace.QueryOnSessionExecute(s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.(*Session).Execute"), s, q)
298271
defer func() {
299272
onDone(err)
300273
}()

internal/query/transaction.go

Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55

66
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
7-
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
87
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
98

109
"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/options"
@@ -17,20 +16,14 @@ import (
1716
var _ query.Transaction = (*transaction)(nil)
1817

1918
type transaction struct {
20-
id string
21-
s *Session
22-
trace *trace.Query
19+
id string
20+
s *Session
2321
}
2422

25-
func newTransaction(id string, s *Session, t *trace.Query) *transaction {
26-
if t == nil {
27-
t = &trace.Query{}
28-
}
29-
23+
func newTransaction(id string, s *Session) *transaction {
3024
return &transaction{
31-
id: id,
32-
s: s,
33-
trace: t,
25+
id: id,
26+
s: s,
3427
}
3528
}
3629

@@ -41,7 +34,7 @@ func (tx transaction) ID() string {
4134
func (tx transaction) Execute(ctx context.Context, q string, opts ...options.TxExecuteOption) (
4235
r query.Result, finalErr error,
4336
) {
44-
onDone := trace.QueryOnTxExecute(tx.trace, &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.transaction.Execute"), tx.s, tx, q)
37+
onDone := trace.QueryOnTxExecute(tx.s.cfg.Trace(), &ctx, stack.FunctionID("github.com/ydb-platform/ydb-go-sdk/3/internal/query.transaction.Execute"), tx.s, tx, q)
4538
defer func() {
4639
onDone(finalErr)
4740
}()
@@ -55,15 +48,12 @@ func (tx transaction) Execute(ctx context.Context, q string, opts ...options.TxE
5548
}
5649

5750
func commitTx(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID, txID string) error {
58-
response, err := client.CommitTransaction(ctx, &Ydb_Query.CommitTransactionRequest{
51+
_, err := client.CommitTransaction(ctx, &Ydb_Query.CommitTransactionRequest{
5952
SessionId: sessionID,
6053
TxId: txID,
6154
})
6255
if err != nil {
63-
return xerrors.WithStackTrace(xerrors.Transport(err))
64-
}
65-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
66-
return xerrors.WithStackTrace(xerrors.FromOperation(response))
56+
return xerrors.WithStackTrace(err)
6757
}
6858

6959
return nil
@@ -74,15 +64,12 @@ func (tx transaction) CommitTx(ctx context.Context) (err error) {
7464
}
7565

7666
func rollback(ctx context.Context, client Ydb_Query_V1.QueryServiceClient, sessionID, txID string) error {
77-
response, err := client.RollbackTransaction(ctx, &Ydb_Query.RollbackTransactionRequest{
67+
_, err := client.RollbackTransaction(ctx, &Ydb_Query.RollbackTransactionRequest{
7868
SessionId: sessionID,
7969
TxId: txID,
8070
})
8171
if err != nil {
82-
return xerrors.WithStackTrace(xerrors.Transport(err))
83-
}
84-
if response.GetStatus() != Ydb.StatusIds_SUCCESS {
85-
return xerrors.WithStackTrace(xerrors.FromOperation(response))
72+
return xerrors.WithStackTrace(err)
8673
}
8774

8875
return nil

internal/stack/record.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,6 @@ func (c call) FunctionID() string {
178178
return c.Record(Lambda(false), FileName(false))
179179
}
180180

181-
func (c call) File() string {
182-
return c.file
183-
}
184-
185181
func Record(depth int, opts ...recordOption) string {
186182
return Call(depth + 1).Record(opts...)
187183
}

0 commit comments

Comments
 (0)