Skip to content

Commit 081b6ae

Browse files
committed
internal/pool async create and close item
1 parent 3588845 commit 081b6ae

File tree

10 files changed

+228
-166
lines changed

10 files changed

+228
-166
lines changed

internal/pool/pool.go

Lines changed: 148 additions & 125 deletions
Large diffs are not rendered by default.

internal/pool/pool_test.go

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func TestPool(t *testing.T) {
5656
})
5757
t.Run("WithLimit", func(t *testing.T) {
5858
p := New[*testItem, testItem](rootCtx, WithLimit[*testItem, testItem](1))
59-
require.EqualValues(t, 1, p.limit)
59+
require.EqualValues(t, 1, p.config.limit)
6060
})
6161
t.Run("WithCreateFunc", func(t *testing.T) {
6262
var newCounter int64
@@ -73,7 +73,7 @@ func TestPool(t *testing.T) {
7373
return nil
7474
})
7575
require.NoError(t, err)
76-
require.EqualValues(t, p.limit, atomic.LoadInt64(&newCounter))
76+
require.EqualValues(t, p.config.limit, atomic.LoadInt64(&newCounter))
7777
})
7878
})
7979
t.Run("Retry", func(t *testing.T) {
@@ -247,6 +247,9 @@ func TestPool(t *testing.T) {
247247
return v, nil
248248
}),
249249
)
250+
p.closeItem = func(ctx context.Context, item *testItem) {
251+
_ = item.Close(ctx)
252+
}
250253
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
251254
if atomic.LoadInt64(&newItems) < 10 {
252255
return expErr
@@ -265,7 +268,11 @@ func TestPool(t *testing.T) {
265268
})
266269
t.Run("Stress", func(t *testing.T) {
267270
xtest.TestManyTimes(t, func(t testing.TB) {
268-
p := New[*testItem, testItem](rootCtx)
271+
trace := *defaultTrace
272+
trace.OnChange = func(info ChangeInfo) {
273+
require.GreaterOrEqual(t, info.Limit, info.Idle)
274+
}
275+
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
269276
var wg sync.WaitGroup
270277
wg.Add(DefaultLimit*2 + 1)
271278
for range make([]struct{}, DefaultLimit*2) {
@@ -290,7 +297,12 @@ func TestPool(t *testing.T) {
290297
})
291298
t.Run("ParallelCreation", func(t *testing.T) {
292299
xtest.TestManyTimes(t, func(t testing.TB) {
293-
p := New[*testItem, testItem](rootCtx)
300+
trace := *defaultTrace
301+
trace.OnChange = func(info ChangeInfo) {
302+
require.Equal(t, DefaultLimit, info.Limit)
303+
require.LessOrEqual(t, info.Idle, DefaultLimit)
304+
}
305+
p := New[*testItem, testItem](rootCtx, WithTrace[*testItem, testItem](&trace))
294306
var wg sync.WaitGroup
295307
for range make([]struct{}, DefaultLimit*10) {
296308
wg.Add(1)
@@ -303,15 +315,11 @@ func TestPool(t *testing.T) {
303315
t.Failed()
304316
}
305317
stats := p.Stats()
306-
require.LessOrEqual(t, stats.Idle+stats.InUse, DefaultLimit)
318+
require.LessOrEqual(t, stats.Idle, DefaultLimit)
307319
}()
308320
}
309321

310322
wg.Wait()
311-
stats := p.Stats()
312-
require.Equal(t, DefaultLimit, stats.Limit)
313-
require.Equal(t, 0, stats.InUse)
314-
require.LessOrEqual(t, stats.Idle, DefaultLimit)
315323
}, xtest.StopAfter(14*time.Second))
316324
})
317325
}

internal/pool/stats.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package pool
22

33
type Stats struct {
4-
Limit int
5-
Index int
6-
Idle int
7-
InUse int
8-
CreateInProgress int
4+
Limit int
5+
Idle int
96
}

internal/query/client.go

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package query
22

33
import (
44
"context"
5-
"sync/atomic"
65
"time"
76

87
"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
@@ -43,7 +42,6 @@ type (
4342
}
4443
poolStub struct {
4544
createSession func(ctx context.Context) (*Session, error)
46-
InUse atomic.Int32
4745
}
4846
Client struct {
4947
config *config.Config
@@ -264,20 +262,13 @@ func (p *poolStub) Close(ctx context.Context) error {
264262
func (p *poolStub) Stats() pool.Stats {
265263
return pool.Stats{
266264
Limit: -1,
267-
Index: 0,
268265
Idle: 0,
269-
InUse: int(p.InUse.Load()),
270266
}
271267
}
272268

273269
func (p *poolStub) With(
274270
ctx context.Context, f func(ctx context.Context, s *Session) error, opts ...retry.Option,
275271
) error {
276-
p.InUse.Add(1)
277-
defer func() {
278-
p.InUse.Add(-1)
279-
}()
280-
281272
err := retry.Retry(ctx, func(ctx context.Context) (err error) {
282273
s, err := p.createSession(ctx)
283274
if err != nil {
@@ -710,7 +701,7 @@ func poolTrace(t *trace.Query) *pool.Trace {
710701
}
711702
},
712703
OnChange: func(info pool.ChangeInfo) {
713-
trace.QueryOnPoolChange(t, info.Limit, info.Index, info.Idle, info.InUse)
704+
trace.QueryOnPoolChange(t, info.Limit, info.Idle)
714705
},
715706
}
716707
}

internal/xsync/set.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,18 @@ import (
55
"sync/atomic"
66
)
77

8-
type Set[K comparable] struct {
8+
type Set[T comparable] struct {
99
m sync.Map
1010
size atomic.Int32
1111
}
1212

13-
func (s *Set[K]) Has(key K) bool {
13+
func (s *Set[T]) Has(key T) bool {
1414
_, ok := s.m.Load(key)
1515

1616
return ok
1717
}
1818

19-
func (s *Set[K]) Add(key K) bool {
19+
func (s *Set[T]) Add(key T) bool {
2020
_, exists := s.m.LoadOrStore(key, struct{}{})
2121

2222
if !exists {
@@ -26,17 +26,29 @@ func (s *Set[K]) Add(key K) bool {
2626
return !exists
2727
}
2828

29-
func (s *Set[K]) Size() int {
29+
func (s *Set[T]) Size() int {
3030
return int(s.size.Load())
3131
}
3232

33-
func (s *Set[K]) Range(f func(key K) bool) {
33+
func (s *Set[T]) Range(f func(key T) bool) {
3434
s.m.Range(func(k, v any) bool {
35-
return f(k.(K)) //nolint:forcetypeassert
35+
return f(k.(T)) //nolint:forcetypeassert
3636
})
3737
}
3838

39-
func (s *Set[K]) Remove(key K) bool {
39+
func (s *Set[T]) Values() []T {
40+
values := make([]T, 0, s.size.Load())
41+
42+
s.m.Range(func(k, v any) bool {
43+
values = append(values, k.(T))
44+
45+
return true
46+
})
47+
48+
return values
49+
}
50+
51+
func (s *Set[T]) Remove(key T) bool {
4052
_, exists := s.m.LoadAndDelete(key)
4153

4254
if exists {
@@ -46,7 +58,7 @@ func (s *Set[K]) Remove(key K) bool {
4658
return exists
4759
}
4860

49-
func (s *Set[K]) Clear() (removed int) {
61+
func (s *Set[T]) Clear() (removed int) {
5062
s.m.Range(func(k, v any) bool {
5163
removed++
5264

internal/xsync/value.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package xsync
2+
3+
type Value[T any] struct {
4+
v T
5+
mu RWMutex
6+
}
7+
8+
func NewValue[T any](initValue T) *Value[T] {
9+
return &Value[T]{v: initValue}
10+
}
11+
12+
func (v *Value[T]) Get() T {
13+
v.mu.RLock()
14+
defer v.mu.RUnlock()
15+
16+
return v.v
17+
}
18+
19+
func (v *Value[T]) Change(change func(old T) T) {
20+
v.mu.WithLock(func() {
21+
v.v = change(v.v)
22+
})
23+
}

internal/xsync/value_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package xsync
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestValue(t *testing.T) {
10+
v := NewValue(5)
11+
require.Equal(t, 5, v.Get())
12+
v.Change(func(old int) int {
13+
return 6
14+
})
15+
require.Equal(t, 6, v.Get())
16+
}

metrics/query.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ func query(config Config) (t trace.Query) {
4242
sizeConfig := poolConfig.WithSystem("size")
4343
limit := sizeConfig.GaugeVec("limit")
4444
idle := sizeConfig.GaugeVec("idle")
45-
index := sizeConfig.GaugeVec("index")
46-
inUse := sizeConfig.WithSystem("in").GaugeVec("use")
4745

4846
t.OnPoolChange = func(stats trace.QueryPoolChange) {
4947
if sizeConfig.Details()&trace.QueryPoolEvents == 0 {
@@ -52,8 +50,6 @@ func query(config Config) (t trace.Query) {
5250

5351
limit.With(nil).Set(float64(stats.Limit))
5452
idle.With(nil).Set(float64(stats.Idle))
55-
inUse.With(nil).Set(float64(stats.InUse))
56-
index.With(nil).Set(float64(stats.Index))
5753
}
5854
}
5955
}

trace/query.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -547,8 +547,6 @@ type (
547547
// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
548548
QueryPoolChange struct {
549549
Limit int
550-
Index int
551550
Idle int
552-
InUse int
553551
}
554552
)

trace/query_gtrace.go

Lines changed: 1 addition & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)