Skip to content

Commit cd49a69

Browse files
committed
* Fixed data race on session stream queries
1 parent f7dd6ff commit cd49a69

File tree

2 files changed

+9
-18
lines changed

2 files changed

+9
-18
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
* Fixed data race on session stream queries
12
* Renamed `internal/router` package to `internal/balancer` for unambiguous understanding of package mission
23
* Implemented detection of local data-center with measuring tcp dial RTT
34
* Added `trace.Driver.OnBalancer{Init,Close,ChooseEndpoint,Update}` events

internal/table/session.go

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -917,25 +917,20 @@ func (s *session) StreamReadTable(
917917

918918
ctx, cancel := context.WithCancel(ctx)
919919

920+
t := s.trailer()
921+
defer t.processHints()
922+
920923
stream, err = s.tableService.StreamReadTable(
921924
balancer.WithEndpoint(ctx, s),
922925
&request,
926+
t.Trailer(),
923927
)
924928

925929
if err != nil {
926930
cancel()
927931
return nil, xerrors.WithStackTrace(err)
928932
}
929933

930-
select {
931-
case <-stream.Context().Done():
932-
// nop
933-
default:
934-
if checkHintSessionClose(stream.Trailer()) {
935-
s.SetStatus(options.SessionClosing)
936-
}
937-
}
938-
939934
return scanner.NewStream(
940935
func(ctx context.Context) (
941936
set *Ydb.ResultSet,
@@ -1006,25 +1001,20 @@ func (s *session) StreamExecuteScanQuery(
10061001

10071002
ctx, cancel := context.WithCancel(ctx)
10081003

1004+
t := s.trailer()
1005+
defer t.processHints()
1006+
10091007
stream, err = s.tableService.StreamExecuteScanQuery(
10101008
balancer.WithEndpoint(ctx, s),
10111009
&request,
1010+
t.Trailer(),
10121011
)
10131012

10141013
if err != nil {
10151014
cancel()
10161015
return nil, xerrors.WithStackTrace(err)
10171016
}
10181017

1019-
select {
1020-
case <-stream.Context().Done():
1021-
// nop
1022-
default:
1023-
if checkHintSessionClose(stream.Trailer()) {
1024-
s.SetStatus(options.SessionClosing)
1025-
}
1026-
}
1027-
10281018
return scanner.NewStream(
10291019
func(ctx context.Context) (
10301020
set *Ydb.ResultSet,

0 commit comments

Comments
 (0)