Skip to content

Commit 84fef12

Browse files
authored
Merge pull request #104 from ydb-platform/rename-trailer-chech
rename trailer.Check to trailer.processHints
2 parents a77e736 + e78a45e commit 84fef12

File tree

4 files changed

+33
-36
lines changed

4 files changed

+33
-36
lines changed

internal/table/session.go

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -177,14 +177,11 @@ func (s *session) Close(ctx context.Context) (err error) {
177177
if m, _ := operation.ContextMode(ctx); m == operation.ModeUnknown {
178178
ctx = operation.WithMode(ctx, operation.ModeSync)
179179
}
180-
t := s.trailer()
181-
defer t.Check()
182180
_, err = s.tableService.DeleteSession(
183181
cluster.WithEndpoint(ctx, s),
184182
&Ydb_Table.DeleteSessionRequest{
185183
SessionId: s.id,
186184
},
187-
t.Trailer(),
188185
)
189186
return err
190187
}
@@ -204,7 +201,7 @@ func (s *session) KeepAlive(ctx context.Context) (err error) {
204201
ctx = operation.WithMode(ctx, operation.ModeSync)
205202
}
206203
t := s.trailer()
207-
defer t.Check()
204+
defer t.processHints()
208205
resp, err := s.tableService.KeepAlive(
209206
cluster.WithEndpoint(ctx, s),
210207
&Ydb_Table.KeepAliveRequest{
@@ -245,7 +242,7 @@ func (s *session) CreateTable(
245242
opt((*options.CreateTableDesc)(&request))
246243
}
247244
t := s.trailer()
248-
defer t.Check()
245+
defer t.processHints()
249246
_, err = s.tableService.CreateTable(
250247
cluster.WithEndpoint(ctx, s),
251248
&request,
@@ -403,7 +400,7 @@ func (s *session) DropTable(
403400
opt((*options.DropTableDesc)(&request))
404401
}
405402
t := s.trailer()
406-
defer t.Check()
403+
defer t.processHints()
407404
_, err = s.tableService.DropTable(
408405
cluster.WithEndpoint(ctx, s),
409406
&request,
@@ -426,7 +423,7 @@ func (s *session) AlterTable(
426423
opt((*options.AlterTableDesc)(&request))
427424
}
428425
t := s.trailer()
429-
defer t.Check()
426+
defer t.processHints()
430427
_, err = s.tableService.AlterTable(
431428
cluster.WithEndpoint(ctx, s),
432429
&request,
@@ -450,7 +447,7 @@ func (s *session) CopyTable(
450447
opt((*options.CopyTableDesc)(&request))
451448
}
452449
t := s.trailer()
453-
defer t.Check()
450+
defer t.processHints()
454451
_, err = s.tableService.CopyTable(
455452
cluster.WithEndpoint(ctx, s),
456453
&request,
@@ -475,7 +472,7 @@ func (s *session) Explain(
475472
ctx = operation.WithMode(ctx, operation.ModeSync)
476473
}
477474
t := s.trailer()
478-
defer t.Check()
475+
defer t.processHints()
479476
response, err = s.tableService.ExplainDataQuery(
480477
cluster.WithEndpoint(ctx, s),
481478
&Ydb_Table.ExplainDataQueryRequest{
@@ -581,7 +578,7 @@ func (s *session) Prepare(ctx context.Context, query string) (stmt table.Stateme
581578
ctx = operation.WithMode(ctx, operation.ModeSync)
582579
}
583580
t := s.trailer()
584-
defer t.Check()
581+
defer t.processHints()
585582
response, err = s.tableService.PrepareDataQuery(
586583
cluster.WithEndpoint(ctx, s),
587584
&Ydb_Table.PrepareDataQueryRequest{
@@ -695,7 +692,7 @@ func (s *session) executeDataQuery(
695692
ctx = operation.WithMode(ctx, operation.ModeSync)
696693
}
697694
t := s.trailer()
698-
defer t.Check()
695+
defer t.processHints()
699696
response, err = s.tableService.ExecuteDataQuery(
700697
cluster.WithEndpoint(ctx, s),
701698
request,
@@ -725,7 +722,7 @@ func (s *session) ExecuteSchemeQuery(
725722
opt((*options.ExecuteSchemeQueryDesc)(&request))
726723
}
727724
t := s.trailer()
728-
defer t.Check()
725+
defer t.processHints()
729726
_, err = s.tableService.ExecuteSchemeQuery(
730727
cluster.WithEndpoint(ctx, s),
731728
&request,
@@ -745,7 +742,7 @@ func (s *session) DescribeTableOptions(ctx context.Context) (
745742
)
746743
request := Ydb_Table.DescribeTableOptionsRequest{}
747744
t := s.trailer()
748-
defer t.Check()
745+
defer t.processHints()
749746
response, err = s.tableService.DescribeTableOptions(
750747
cluster.WithEndpoint(ctx, s),
751748
&request,
@@ -895,6 +892,12 @@ func (s *session) StreamReadTable(
895892
)
896893

897894
onDone := trace.TableOnSessionQueryStreamRead(s.trace, &ctx, s)
895+
896+
if stream != nil && checkHintSessionClose(stream.Trailer()) {
897+
s.SetStatus(options.SessionClosing)
898+
err = ErrSessionShutdown
899+
}
900+
898901
if err != nil {
899902
cancel()
900903
onDone(err)
@@ -969,6 +972,12 @@ func (s *session) StreamExecuteScanQuery(
969972
q,
970973
params,
971974
)
975+
976+
if stream != nil && checkHintSessionClose(stream.Trailer()) {
977+
s.SetStatus(options.SessionClosing)
978+
err = ErrSessionShutdown
979+
}
980+
972981
if err != nil {
973982
cancel()
974983
onDone(err)
@@ -1007,7 +1016,7 @@ func (s *session) StreamExecuteScanQuery(
10071016
// BulkUpsert uploads given list of ydb struct values to the table.
10081017
func (s *session) BulkUpsert(ctx context.Context, table string, rows types.Value) (err error) {
10091018
t := s.trailer()
1010-
defer t.Check()
1019+
defer t.processHints()
10111020
_, err = s.tableService.BulkUpsert(
10121021
cluster.WithEndpoint(ctx, s),
10131022
&Ydb_Table.BulkUpsertRequest{
@@ -1041,7 +1050,7 @@ func (s *session) BeginTransaction(
10411050
ctx = operation.WithMode(ctx, operation.ModeSync)
10421051
}
10431052
t := s.trailer()
1044-
defer t.Check()
1053+
defer t.processHints()
10451054
response, err = s.tableService.BeginTransaction(
10461055
cluster.WithEndpoint(ctx, s),
10471056
&Ydb_Table.BeginTransactionRequest{
@@ -1142,7 +1151,7 @@ func (tx *Transaction) CommitTx(
11421151
ctx = operation.WithMode(ctx, operation.ModeSync)
11431152
}
11441153
t := tx.s.trailer()
1145-
defer t.Check()
1154+
defer t.processHints()
11461155
response, err = tx.s.tableService.CommitTransaction(
11471156
cluster.WithEndpoint(ctx, tx.s),
11481157
request,
@@ -1182,7 +1191,7 @@ func (tx *Transaction) Rollback(ctx context.Context) (err error) {
11821191
ctx = operation.WithMode(ctx, operation.ModeSync)
11831192
}
11841193
t := tx.s.trailer()
1185-
defer t.Check()
1194+
defer t.processHints()
11861195
_, err = tx.s.tableService.RollbackTransaction(
11871196
cluster.WithEndpoint(ctx, tx.s),
11881197
&Ydb_Table.RollbackTransactionRequest{

internal/table/trailer.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ func checkHintSessionClose(md metadata.MD) bool {
2626
return false
2727
}
2828

29-
func (t *trailer) Check() {
30-
if checkHintSessionClose(t.md) {
29+
func (t *trailer) processHints() {
30+
switch {
31+
case checkHintSessionClose(t.md):
3132
t.s.SetStatus(options.SessionClosing)
33+
default:
34+
// pass
3235
}
3336
}

table/client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ type Client interface {
7979
// Do implements internal busy loop until one of the following conditions is met:
8080
// - deadline was canceled or deadlined
8181
// - retry operation returned nil as error
82-
// Warning: if context without deadline or cancellation func than Do will work infinite
82+
// Warning: if context without deadline or cancellation func than Do can run indefinitely
8383
Do(ctx context.Context, op Operation, opts ...Option) error
8484

8585
// DoTx provide the best effort for execute transaction
@@ -89,6 +89,6 @@ type Client interface {
8989
// DoTx makes auto begin, commit and rollback of transaction
9090
// If op TxOperation returns nil - transaction will be committed
9191
// If op TxOperation return non nil - transaction will be rollback
92-
// Warning: if context without deadline or cancellation func than DoTx will work infinite
92+
// Warning: if context without deadline or cancellation func than DoTx can run indefinitely
9393
DoTx(ctx context.Context, op TxOperation, opts ...Option) error
9494
}

test/table_test.go

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -150,16 +150,6 @@ func TestTable(t *testing.T) {
150150
}
151151
}
152152
},
153-
OnPoolSessionClose: func(
154-
info trace.PoolSessionCloseStartInfo,
155-
) func(
156-
trace.PoolSessionCloseDoneInfo,
157-
) {
158-
sessionsMtx.Lock()
159-
defer sessionsMtx.Unlock()
160-
delete(sessions, info.Session.ID())
161-
return nil
162-
},
163153
OnPoolGet: func(
164154
info trace.PoolGetStartInfo,
165155
) func(
@@ -187,11 +177,6 @@ func TestTable(t *testing.T) {
187177
},
188178
}
189179
)
190-
defer func() {
191-
if len(sessions) > 0 {
192-
t.Fatalf("after close some shutdownd sessions are not removed from pool")
193-
}
194-
}()
195180

196181
db, err := ydb.New(
197182
ctx,

0 commit comments

Comments
 (0)