Skip to content

Commit 4830171

Browse files
authored
Merge pull request #1447 from ydb-platform/table-pool-replacement
replaced table client pool to internal/pool
2 parents 2b869c5 + f36b8f5 commit 4830171

File tree

10 files changed

+241
-1847
lines changed

10 files changed

+241
-1847
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
* Replaced internal table client pool entities to `internal/pool`
2+
13
## v3.79.2
24
* Enabled by default usage of `internal/pool` in `internal/query.Client`
35

internal/pool/pool.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,11 @@ func WithCreateItemFunc[PT Item[T], T any](f func(ctx context.Context) (PT, erro
6464
}
6565
}
6666

67-
func withCloseItemFunc[PT Item[T], T any](f func(ctx context.Context, item PT)) option[PT, T] {
67+
func WithSyncCloseItem[PT Item[T], T any]() option[PT, T] {
6868
return func(c *Config[PT, T]) {
69-
c.closeItem = f
69+
c.closeItem = func(ctx context.Context, item PT) {
70+
_ = item.Close(ctx)
71+
}
7072
}
7173
}
7274

internal/pool/pool_test.go

Lines changed: 66 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import (
2626
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xrand"
2727
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xsync"
2828
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
29+
"github.com/ydb-platform/ydb-go-sdk/v3/retry"
30+
"github.com/ydb-platform/ydb-go-sdk/v3/testutil"
2931
)
3032

3133
type (
@@ -229,9 +231,7 @@ func TestPool(t *testing.T) {
229231
return &v, nil
230232
}),
231233
// replace default async closer for sync testing
232-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
233-
_ = item.Close(ctx)
234-
}),
234+
WithSyncCloseItem[*testItem, testItem](),
235235
WithTrace[*testItem, testItem](defaultTrace),
236236
)
237237

@@ -301,9 +301,7 @@ func TestPool(t *testing.T) {
301301
}
302302
p := New[*testItem, testItem](rootCtx,
303303
// replace default async closer for sync testing
304-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
305-
_ = item.Close(ctx)
306-
}),
304+
WithSyncCloseItem[*testItem, testItem](),
307305
WithLimit[*testItem, testItem](1),
308306
WithTrace[*testItem, testItem](&Trace{
309307
onWait: func() func(item any, err error) {
@@ -394,9 +392,7 @@ func TestPool(t *testing.T) {
394392
}),
395393
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
396394
// replace default async closer for sync testing
397-
withCloseItemFunc[*testItem, testItem](func(ctx context.Context, item *testItem) {
398-
_ = item.Close(ctx)
399-
}),
395+
WithSyncCloseItem[*testItem, testItem](),
400396
WithClock[*testItem, testItem](fakeClock),
401397
WithIdleThreshold[*testItem, testItem](idleThreshold),
402398
WithTrace[*testItem, testItem](defaultTrace),
@@ -560,9 +556,7 @@ func TestPool(t *testing.T) {
560556
defer cancel()
561557
p := New[*testItem, testItem](rootCtx,
562558
// replace default async closer for sync testing
563-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
564-
_ = item.Close(ctx)
565-
}),
559+
WithSyncCloseItem[*testItem, testItem](),
566560
)
567561
defer func() {
568562
_ = p.Close(context.Background())
@@ -618,6 +612,60 @@ func TestPool(t *testing.T) {
618612
})
619613
})
620614
})
615+
t.Run("DoBackoffRetryCancelation", func(t *testing.T) {
616+
for _, testErr := range []error{
617+
// Errors leading to Wait repeat.
618+
xerrors.Transport(
619+
grpcStatus.Error(grpcCodes.ResourceExhausted, ""),
620+
),
621+
fmt.Errorf("wrap transport error: %w", xerrors.Transport(
622+
grpcStatus.Error(grpcCodes.ResourceExhausted, ""),
623+
)),
624+
xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_OVERLOADED)),
625+
fmt.Errorf("wrap op error: %w", xerrors.Operation(xerrors.WithStatusCode(Ydb.StatusIds_OVERLOADED))),
626+
} {
627+
t.Run("", func(t *testing.T) {
628+
backoff := make(chan chan time.Time)
629+
ctx, cancel := xcontext.WithCancel(context.Background())
630+
p := New[*testItem, testItem](ctx, WithLimit[*testItem, testItem](1))
631+
632+
results := make(chan error)
633+
go func() {
634+
err := p.With(ctx,
635+
func(ctx context.Context, item *testItem) error {
636+
return testErr
637+
},
638+
retry.WithFastBackoff(
639+
testutil.BackoffFunc(func(n int) <-chan time.Time {
640+
ch := make(chan time.Time)
641+
backoff <- ch
642+
643+
return ch
644+
}),
645+
),
646+
retry.WithSlowBackoff(
647+
testutil.BackoffFunc(func(n int) <-chan time.Time {
648+
ch := make(chan time.Time)
649+
backoff <- ch
650+
651+
return ch
652+
}),
653+
),
654+
)
655+
results <- err
656+
}()
657+
658+
select {
659+
case <-backoff:
660+
t.Logf("expected result")
661+
case res := <-results:
662+
t.Fatalf("unexpected result: %v", res)
663+
}
664+
665+
cancel()
666+
})
667+
}
668+
})
621669
})
622670
t.Run("Item", func(t *testing.T) {
623671
t.Run("Close", func(t *testing.T) {
@@ -642,9 +690,7 @@ func TestPool(t *testing.T) {
642690
return v, nil
643691
}),
644692
// replace default async closer for sync testing
645-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
646-
_ = item.Close(ctx)
647-
}),
693+
WithSyncCloseItem[*testItem, testItem](),
648694
)
649695
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
650696
return nil
@@ -684,9 +730,7 @@ func TestPool(t *testing.T) {
684730
return v, nil
685731
}),
686732
// replace default async closer for sync testing
687-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
688-
_ = item.Close(ctx)
689-
}),
733+
WithSyncCloseItem[*testItem, testItem](),
690734
)
691735
err := p.With(rootCtx, func(ctx context.Context, testItem *testItem) error {
692736
if newItems.Load() < 10 {
@@ -744,9 +788,7 @@ func TestPool(t *testing.T) {
744788
return &v, nil
745789
}),
746790
// replace default async closer for sync testing
747-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
748-
_ = item.Close(ctx)
749-
}),
791+
WithSyncCloseItem[*testItem, testItem](),
750792
)
751793
defer func() {
752794
_ = p.Close(context.Background())
@@ -778,9 +820,7 @@ func TestPool(t *testing.T) {
778820
p := New[*testItem, testItem](rootCtx,
779821
WithTrace[*testItem, testItem](trace),
780822
// replace default async closer for sync testing
781-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
782-
_ = item.Close(ctx)
783-
}),
823+
WithSyncCloseItem[*testItem, testItem](),
784824
)
785825
r := xrand.New(xrand.WithLock())
786826
var wg sync.WaitGroup
@@ -848,9 +888,7 @@ func TestPool(t *testing.T) {
848888
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
849889
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
850890
// replace default async closer for sync testing
851-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
852-
_ = item.Close(ctx)
853-
}),
891+
WithSyncCloseItem[*testItem, testItem](),
854892
)
855893
item := mustGetItem(t, p)
856894
if err := p.putItem(context.Background(), item); err != nil {
@@ -867,9 +905,7 @@ func TestPool(t *testing.T) {
867905
WithCreateItemTimeout[*testItem, testItem](50*time.Millisecond),
868906
WithCloseItemTimeout[*testItem, testItem](50*time.Millisecond),
869907
// replace default async closer for sync testing
870-
withCloseItemFunc(func(ctx context.Context, item *testItem) {
871-
_ = item.Close(ctx)
872-
}),
908+
WithSyncCloseItem[*testItem, testItem](),
873909
)
874910
item := mustGetItem(t, p)
875911
mustPutItem(t, p, item)

0 commit comments

Comments
 (0)