Skip to content

Commit e879b4b

Browse files
authored
Merge pull request #1701 from ydb-platform/close-item
* Added query client session pool metrics: create_in_progress, in_use, waiters_queue + Added pool item closing for not-alived item
2 parents f6a74d6 + 4d262cd commit e879b4b

File tree

4 files changed

+126
-12
lines changed

4 files changed

+126
-12
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
* Added query client session pool metrics: create_in_progress, in_use, waiters_queue
2+
* Added pool item closing for not-alived item
3+
14
## v3.104.4
25
* Fixed bug with session query latency metric collector
36

internal/pool/pool.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -762,6 +762,12 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
762762

763763
return item, nil
764764
}
765+
766+
p.closeItem(ctx, item,
767+
closeItemWithLock(),
768+
closeItemNotifyStats(),
769+
closeItemWithDeleteFromPool(),
770+
)
765771
}
766772

767773
item, err := p.createItemFunc(ctx)
@@ -773,7 +779,9 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
773779
return nil, xerrors.WithStackTrace(xerrors.Join(err, lastErr))
774780
}
775781

776-
lastErr = err
782+
if err != nil {
783+
lastErr = err
784+
}
777785

778786
item, err = p.waitFromCh(ctx)
779787
if item != nil {
@@ -784,7 +792,13 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) { /
784792
return nil, xerrors.WithStackTrace(xerrors.Join(err, lastErr))
785793
}
786794

787-
lastErr = err
795+
if err != nil {
796+
lastErr = err
797+
}
798+
}
799+
800+
if lastErr == nil {
801+
lastErr = errNoProgress
788802
}
789803

790804
p.mu.RLock()

internal/pool/pool_test.go

Lines changed: 101 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
package pool
22

33
import (
4-
"bytes"
54
"context"
65
"errors"
76
"fmt"
87
"path"
98
"runtime"
10-
"runtime/debug"
119
"sync"
1210
"sync/atomic"
1311
"testing"
@@ -35,7 +33,7 @@ type (
3533
testItem struct {
3634
v int32
3735

38-
closed bytes.Buffer
36+
closed bool
3937

4038
onClose func() error
4139
onIsAlive func() bool
@@ -104,6 +102,10 @@ func (p *testWaitChPool) whenWantWaitCh() <-chan struct{} {
104102
func (p *testWaitChPool) Put(ch *chan *testItem) {}
105103

106104
func (t *testItem) IsAlive() bool {
105+
if t.closed {
106+
return false
107+
}
108+
107109
if t.onIsAlive != nil {
108110
return t.onIsAlive()
109111
}
@@ -124,13 +126,9 @@ func (t *testItem) NodeID() uint32 {
124126
}
125127

126128
func (t *testItem) Close(context.Context) error {
127-
if t.closed.Len() > 0 {
128-
debug.PrintStack()
129-
fmt.Println(t.closed.String())
130-
panic("item already closed")
131-
}
132-
133-
t.closed.Write(debug.Stack())
129+
defer func() {
130+
t.closed = true
131+
}()
134132

135133
if t.onClose != nil {
136134
return t.onClose()
@@ -889,6 +887,99 @@ func TestPool(t *testing.T) { //nolint:gocyclo
889887
})
890888
})
891889
t.Run("With", func(t *testing.T) {
890+
t.Run("ItemFromPoolIsNotAlive", func(t *testing.T) {
891+
var (
892+
created atomic.Int32
893+
closed atomic.Int32
894+
nextID atomic.Int32
895+
)
896+
assertCreated := func(exp int32) {
897+
if act := created.Load(); act != exp {
898+
t.Errorf(
899+
"unexpected number of created items: %v; want %v",
900+
act, exp,
901+
)
902+
}
903+
}
904+
assertClosed := func(exp int32) {
905+
if act := closed.Load(); act != exp {
906+
t.Errorf(
907+
"unexpected number of closed items: %v; want %v",
908+
act, exp,
909+
)
910+
}
911+
}
912+
p := New[*testItem, testItem](rootCtx,
913+
WithLimit[*testItem, testItem](1),
914+
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
915+
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
916+
WithCreateItemFunc(func(context.Context) (*testItem, error) {
917+
created.Add(1)
918+
alived := true
919+
v := testItem{
920+
v: nextID.Add(1),
921+
onIsAlive: func() bool {
922+
defer func() {
923+
alived = false
924+
}()
925+
926+
return alived
927+
},
928+
onClose: func() error {
929+
closed.Add(1)
930+
931+
return nil
932+
},
933+
}
934+
935+
return &v, nil
936+
}),
937+
)
938+
defer func() {
939+
_ = p.Close(context.Background())
940+
}()
941+
942+
s1 := mustGetItem(t, p)
943+
assertClosed(0)
944+
assertCreated(1)
945+
require.Len(t, p.index, 1)
946+
947+
mustPutItem(t, p, s1)
948+
assertClosed(0)
949+
assertCreated(1)
950+
require.Len(t, p.index, 1)
951+
require.Equal(t, 1, p.idle.Len())
952+
953+
s2, err := p.getItem(context.Background())
954+
require.NoError(t, err)
955+
assertCreated(2)
956+
assertClosed(1)
957+
require.Len(t, p.index, 1)
958+
require.Equal(t, 0, p.idle.Len())
959+
960+
_, err = p.getItem(context.Background())
961+
require.ErrorIs(t, err, errPoolIsOverflow)
962+
assertCreated(2)
963+
assertClosed(1)
964+
require.Len(t, p.index, 1)
965+
require.Equal(t, 0, p.idle.Len())
966+
967+
require.NoError(t, p.Close(context.Background()))
968+
assertCreated(2)
969+
assertClosed(1)
970+
971+
require.Len(t, p.index, 1)
972+
require.Equal(t, 0, p.idle.Len())
973+
974+
require.ErrorIs(t, p.putItem(context.Background(), s2), errClosedPool)
975+
assertClosed(2)
976+
977+
require.True(t, s2.closed)
978+
require.False(t, s2.IsAlive())
979+
980+
require.Len(t, p.index, 0)
981+
require.Equal(t, 0, p.idle.Len())
982+
})
892983
t.Run("ExplicitItemClose", func(t *testing.T) {
893984
var (
894985
created atomic.Int32

metrics/query.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ func query(config Config) (t trace.Query) {
3838
limit := sizeConfig.GaugeVec("limit")
3939
idle := sizeConfig.GaugeVec("idle")
4040
index := sizeConfig.GaugeVec("index")
41+
wait := sizeConfig.GaugeVec("waiters_queue")
42+
inUse := sizeConfig.GaugeVec("in_use")
43+
createInProgress := sizeConfig.GaugeVec("create_in_progress")
4144
t.OnPoolChange = func(stats trace.QueryPoolChange) {
4245
if sizeConfig.Details()&trace.QueryPoolEvents == 0 {
4346
return
@@ -46,6 +49,9 @@ func query(config Config) (t trace.Query) {
4649
limit.With(nil).Set(float64(stats.Limit))
4750
idle.With(nil).Set(float64(stats.Idle))
4851
index.With(nil).Set(float64(stats.Index))
52+
wait.With(nil).Set(float64(stats.Wait))
53+
createInProgress.With(nil).Set(float64(stats.CreateInProgress))
54+
inUse.With(nil).Set(float64(stats.Index - stats.Idle))
4955
}
5056
}
5157
}

0 commit comments

Comments
 (0)