Skip to content

Commit 8fb8bb6

Browse files
authored
* Fixed deadlock with implicit usage of `internal.table.Client.intern… (#395)
* * Fixed deadlock with implicit usage of `internal.table.Client.internalPoolAsyncCloseSession` * added tests for check deadlocks in updateNodes and internalPoolGC * fix DATA RACE * increase deadline for single shot test
1 parent 703c43d commit 8fb8bb6

File tree

4 files changed

+145
-49
lines changed

4 files changed

+145
-49
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Fixed deadlock with implicit usage of `internal.table.Client.internalPoolAsyncCloseSession`
2+
13
## v3.38.0
24
* Fixed commit errors for experimental topic reader
35
* Updated `ydb-go-genproto` dependency

internal/table/client.go

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -134,11 +134,18 @@ func (c *Client) updateNodes(ctx context.Context, endpoints []endpoint.Info) {
134134
return nodeIDs[i] >= nodeID
135135
}) == len(nodeIDs) {
136136
for s := range c.nodes[nodeID] {
137-
if info, has := c.index[s]; has && info.idle != nil {
138-
c.internalPoolAsyncCloseSession(ctx, s)
139-
} else {
140-
s.SetStatus(table.SessionClosing)
141-
}
137+
func(s *session) {
138+
if info, has := c.index[s]; has && info.idle != nil {
139+
s.SetStatus(table.SessionClosing)
140+
c.wg.Add(1)
141+
go func() {
142+
defer c.wg.Done()
143+
c.internalPoolSyncCloseSession(ctx, s)
144+
}()
145+
} else {
146+
s.SetStatus(table.SessionClosing)
147+
}
148+
}(s)
142149
}
143150
}
144151
}
@@ -696,35 +703,49 @@ func (c *Client) DoTx(ctx context.Context, op table.TxOperation, opts ...table.O
696703
return nil
697704
}
698705

706+
func (c *Client) internalPoolGCTick(ctx context.Context, idleThreshold time.Duration) {
707+
c.mu.WithLock(func() {
708+
if c.isClosed() {
709+
return
710+
}
711+
for e := c.idle.Front(); e != nil; e = e.Next() {
712+
s := e.Value.(*session)
713+
info, has := c.index[s]
714+
if !has {
715+
panic("session not found in pool")
716+
}
717+
if info.idle == nil {
718+
panic("inconsistent session info")
719+
}
720+
if since := timeutil.Until(info.touched); since > idleThreshold {
721+
s.SetStatus(table.SessionClosing)
722+
c.wg.Add(1)
723+
go func() {
724+
defer c.wg.Done()
725+
c.internalPoolSyncCloseSession(ctx, s)
726+
}()
727+
}
728+
}
729+
})
730+
}
731+
699732
func (c *Client) internalPoolGC(ctx context.Context, idleThreshold time.Duration) {
700733
defer c.wg.Done()
734+
701735
timer := timeutil.NewTimer(idleThreshold)
736+
defer timer.Stop()
702737

703738
for {
704739
select {
705740
case <-c.done:
706741
return
707742

743+
case <-ctx.Done():
744+
return
745+
708746
case <-timer.C():
709-
c.mu.WithLock(func() {
710-
if c.isClosed() {
711-
return
712-
}
713-
for e := c.idle.Front(); e != nil; e = e.Next() {
714-
s := e.Value.(*session)
715-
info, has := c.index[s]
716-
if !has {
717-
panic("session not found in pool")
718-
}
719-
if info.idle == nil {
720-
panic("inconsistent session info")
721-
}
722-
if since := timeutil.Until(info.touched); since > idleThreshold {
723-
c.internalPoolAsyncCloseSession(ctx, s)
724-
}
725-
}
726-
timer.Reset(idleThreshold / 2)
727-
})
747+
c.internalPoolGCTick(ctx, idleThreshold)
748+
timer.Reset(idleThreshold / 2)
728749
}
729750
}
730751
}
@@ -815,20 +836,6 @@ func (c *Client) internalPoolNotify(s *session) (notified bool) {
815836
return false
816837
}
817838

818-
func (c *Client) internalPoolAsyncCloseSession(ctx context.Context, s *session) {
819-
s.SetStatus(table.SessionClosing)
820-
c.mu.WithLock(func() {
821-
if c.isClosed() {
822-
return
823-
}
824-
c.wg.Add(1)
825-
go func() {
826-
defer c.wg.Done()
827-
c.internalPoolSyncCloseSession(ctx, s)
828-
}()
829-
})
830-
}
831-
832839
func (c *Client) internalPoolSyncCloseSession(ctx context.Context, s *session) {
833840
var cancel context.CancelFunc
834841
ctx, cancel = context.WithTimeout(ctx, c.config.DeleteTimeout())

internal/table/client_test.go

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,13 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/stretchr/testify/require"
1415
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Table"
1516
"google.golang.org/grpc"
1617
"google.golang.org/protobuf/proto"
1718
"google.golang.org/protobuf/types/known/emptypb"
1819

20+
"github.com/ydb-platform/ydb-go-sdk/v3/internal/endpoint"
1921
"github.com/ydb-platform/ydb-go-sdk/v3/internal/table/config"
2022
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
2123
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
@@ -670,8 +672,7 @@ func TestSessionPoolCloseIdleSessions(t *testing.T) {
670672

671673
var (
672674
idleThreshold = 4 * time.Second
673-
674-
closedCount uint32
675+
closedCount uint32
675676
)
676677
p := newClientWithStubBuilder(
677678
t,
@@ -940,3 +941,81 @@ func whenWantWaitCh(p *Client) <-chan struct{} {
940941
}
941942
return ch
942943
}
944+
945+
func TestDeadlockOnUpdateNodes(t *testing.T) {
946+
xtest.TestManyTimes(t, func(t testing.TB) {
947+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
948+
defer cancel()
949+
nodes := make([]uint32, 0, 3)
950+
balancer := testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
951+
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
952+
sessionID := testutil.SessionID()
953+
nodeID, err := nodeID(sessionID)
954+
if err != nil {
955+
return nil, err
956+
}
957+
nodes = append(nodes, nodeID)
958+
return &Ydb_Table.CreateSessionResult{
959+
SessionId: sessionID,
960+
}, nil
961+
},
962+
}))
963+
c := newClientWithStubBuilder(t, balancer, 3)
964+
defer func() {
965+
_ = c.Close(ctx)
966+
}()
967+
s1, err := c.Get(ctx)
968+
require.NoError(t, err)
969+
s2, err := c.Get(ctx)
970+
require.NoError(t, err)
971+
s3, err := c.Get(ctx)
972+
require.NoError(t, err)
973+
require.Equal(t, 3, len(nodes))
974+
err = c.Put(ctx, s1)
975+
require.NoError(t, err)
976+
err = c.Put(ctx, s2)
977+
require.NoError(t, err)
978+
err = c.Put(ctx, s3)
979+
require.NoError(t, err)
980+
c.updateNodes(ctx, []endpoint.Info{})
981+
}, xtest.StopAfter(12*time.Second))
982+
}
983+
984+
func TestDeadlockOnInternalPoolGCTick(t *testing.T) {
985+
xtest.TestManyTimes(t, func(t testing.TB) {
986+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
987+
defer cancel()
988+
nodes := make([]uint32, 0, 3)
989+
balancer := testutil.NewBalancer(testutil.WithInvokeHandlers(testutil.InvokeHandlers{
990+
testutil.TableCreateSession: func(interface{}) (proto.Message, error) {
991+
sessionID := testutil.SessionID()
992+
nodeID, err := nodeID(sessionID)
993+
if err != nil {
994+
return nil, err
995+
}
996+
nodes = append(nodes, nodeID)
997+
return &Ydb_Table.CreateSessionResult{
998+
SessionId: sessionID,
999+
}, nil
1000+
},
1001+
}))
1002+
c := newClientWithStubBuilder(t, balancer, 3)
1003+
defer func() {
1004+
_ = c.Close(ctx)
1005+
}()
1006+
s1, err := c.Get(ctx)
1007+
require.NoError(t, err)
1008+
s2, err := c.Get(ctx)
1009+
require.NoError(t, err)
1010+
s3, err := c.Get(ctx)
1011+
require.NoError(t, err)
1012+
require.Equal(t, 3, len(nodes))
1013+
err = c.Put(ctx, s1)
1014+
require.NoError(t, err)
1015+
err = c.Put(ctx, s2)
1016+
require.NoError(t, err)
1017+
err = c.Put(ctx, s3)
1018+
require.NoError(t, err)
1019+
c.internalPoolGCTick(ctx, 0)
1020+
}, xtest.StopAfter(12*time.Second))
1021+
}

internal/table/session.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,23 +54,31 @@ type session struct {
5454
closeOnce sync.Once
5555
}
5656

57+
func nodeID(sessionID string) (uint32, error) {
58+
u, err := url.Parse(sessionID)
59+
if err != nil {
60+
return 0, err
61+
}
62+
id, err := strconv.ParseUint(u.Query().Get("node_id"), 10, 32)
63+
if err != nil {
64+
return 0, err
65+
}
66+
return uint32(id), err
67+
}
68+
5769
func (s *session) NodeID() uint32 {
5870
if s == nil {
5971
return 0
6072
}
61-
if nodeID := atomic.LoadUint32(&s.nodeID); nodeID != 0 {
62-
return nodeID
63-
}
64-
u, err := url.Parse(s.id)
65-
if err != nil {
66-
panic(err)
73+
if id := atomic.LoadUint32(&s.nodeID); id != 0 {
74+
return id
6775
}
68-
nodeID, err := strconv.ParseUint(u.Query().Get("node_id"), 10, 32)
76+
id, err := nodeID(s.id)
6977
if err != nil {
7078
return 0
7179
}
72-
atomic.StoreUint32(&s.nodeID, uint32(nodeID))
73-
return uint32(nodeID)
80+
atomic.StoreUint32(&s.nodeID, id)
81+
return id
7482
}
7583

7684
func (s *session) Status() table.SessionStatus {

0 commit comments

Comments
 (0)