Skip to content

Commit 3588845

Browse files
committed
internal/pool createItem maked as async
1 parent f4968f1 commit 3588845

File tree

5 files changed

+133
-74
lines changed

5 files changed

+133
-74
lines changed

internal/pool/errors.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,5 @@ var (
88
errClosedPool = errors.New("closed pool")
99
errItemIsNotAlive = errors.New("item is not alive")
1010
errPoolIsOverflow = errors.New("pool is overflow")
11+
errNoProgress = errors.New("no progress")
1112
)

internal/pool/pool.go

Lines changed: 73 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func New[PT Item[T], T any](
9898
})
9999
}()
100100

101-
p.createItem = createItemWithTimeoutHandling(p.createItem, p)
101+
p.createItem = asyncCreateItemWithTimeout(p.createItem, p)
102102
p.sema = make(chan struct{}, p.limit)
103103
p.idle = make([]PT, 0, p.limit)
104104
p.index = make(map[PT]struct{}, p.limit)
@@ -114,90 +114,88 @@ func defaultCreateItem[T any, PT Item[T]](ctx context.Context) (PT, error) {
114114
return &item, nil
115115
}
116116

117-
// createItemWithTimeoutHandling wraps the createItem function with timeout handling
118-
func createItemWithTimeoutHandling[PT Item[T], T any](
117+
// asyncCreateItemWithTimeout wraps the createItem function with timeout handling
118+
func asyncCreateItemWithTimeout[PT Item[T], T any](
119119
createItem func(ctx context.Context) (PT, error),
120120
p *Pool[PT, T],
121121
) func(ctx context.Context) (PT, error) {
122122
return func(ctx context.Context) (PT, error) {
123-
var (
124-
ch = make(chan PT)
125-
createErr error
126-
)
123+
var ch = make(chan struct {
124+
item PT
125+
err error
126+
})
127+
127128
go func() {
128129
defer close(ch)
129-
createErr = createItemWithContext(ctx, p, createItem, ch)
130+
131+
createCtx, cancelCreate := xcontext.WithDone(xcontext.ValueOnly(ctx), p.done)
132+
defer cancelCreate()
133+
134+
if d := p.createTimeout; d > 0 {
135+
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
136+
defer cancelCreate()
137+
}
138+
139+
p.mu.WithLock(func() {
140+
p.stats.CreateInProgress++
141+
})
142+
143+
newItem, err := createItem(createCtx)
144+
p.mu.WithLock(func() {
145+
p.stats.CreateInProgress--
146+
})
147+
148+
select {
149+
case ch <- struct {
150+
item PT
151+
err error
152+
}{
153+
item: newItem,
154+
err: xerrors.WithStackTrace(err),
155+
}:
156+
default:
157+
if newItem == nil {
158+
return
159+
}
160+
161+
if !xsync.WithLock(&p.mu, func() bool {
162+
if len(p.idle) >= p.limit {
163+
return false // not appended
164+
}
165+
166+
p.idle = append(p.idle, newItem)
167+
p.stats.Idle++
168+
169+
p.index[newItem] = struct{}{}
170+
p.stats.Index++
171+
172+
return true // // item appended
173+
}) {
174+
_ = newItem.Close(ctx)
175+
}
176+
}
130177
}()
131178

132179
select {
133180
case <-p.done:
134181
return nil, xerrors.WithStackTrace(errClosedPool)
135182
case <-ctx.Done():
136183
return nil, xerrors.WithStackTrace(ctx.Err())
137-
case item, has := <-ch:
184+
case result, has := <-ch:
138185
if !has {
139-
if ctxErr := ctx.Err(); ctxErr == nil && xerrors.IsContextError(createErr) {
140-
return nil, xerrors.WithStackTrace(xerrors.Retryable(createErr))
141-
}
142-
143-
return nil, xerrors.WithStackTrace(createErr)
186+
return nil, xerrors.WithStackTrace(errNoProgress)
144187
}
145188

146-
return item, nil
147-
}
148-
}
149-
}
150-
151-
// createItemWithContext handles the creation of an item with context handling
152-
func createItemWithContext[PT Item[T], T any](
153-
ctx context.Context,
154-
p *Pool[PT, T],
155-
createItem func(ctx context.Context) (PT, error),
156-
ch chan PT,
157-
) error {
158-
var (
159-
createCtx = xcontext.ValueOnly(ctx)
160-
cancelCreate context.CancelFunc
161-
)
162-
163-
if d := p.createTimeout; d > 0 {
164-
createCtx, cancelCreate = xcontext.WithTimeout(createCtx, d)
165-
} else {
166-
createCtx, cancelCreate = xcontext.WithCancel(createCtx)
167-
}
168-
defer cancelCreate()
169-
170-
newItem, err := createItem(createCtx)
171-
if err != nil {
172-
return xerrors.WithStackTrace(err)
173-
}
174-
175-
needCloseItem := true
176-
defer func() {
177-
if needCloseItem {
178-
_ = p.closeItem(ctx, newItem)
179-
}
180-
}()
189+
if result.err != nil {
190+
if xerrors.IsContextError(result.err) {
191+
return nil, xerrors.WithStackTrace(xerrors.Retryable(result.err))
192+
}
181193

182-
select {
183-
case <-p.done:
184-
return xerrors.WithStackTrace(errClosedPool)
185-
case <-ctx.Done():
186-
p.mu.Lock()
187-
defer p.mu.Unlock()
194+
return nil, xerrors.WithStackTrace(result.err)
195+
}
188196

189-
if len(p.index) < p.limit {
190-
p.idle = append(p.idle, newItem)
191-
p.index[newItem] = struct{}{}
192-
p.stats.Index++
193-
needCloseItem = false
197+
return result.item, nil
194198
}
195-
196-
return xerrors.WithStackTrace(ctx.Err())
197-
case ch <- newItem:
198-
needCloseItem = false
199-
200-
return nil
201199
}
202200
}
203201

@@ -220,26 +218,31 @@ func (p *Pool[PT, T]) getItem(ctx context.Context) (item PT, finalErr error) {
220218
}()
221219

222220
p.mu.Lock()
223-
defer p.mu.Unlock()
224-
225221
if len(p.idle) > 0 {
226222
item, p.idle = p.idle[0], p.idle[1:]
227223
p.stats.Idle--
224+
}
225+
p.mu.Unlock()
228226

227+
if item != nil {
229228
if item.IsAlive() {
230229
return item, nil
231230
}
232231

233232
_ = p.closeItem(ctx, item)
233+
234+
return nil, xerrors.WithStackTrace(xerrors.Retryable(errItemIsNotAlive))
234235
}
235236

236237
newItem, err := p.createItem(ctx)
237238
if err != nil {
238239
return nil, xerrors.WithStackTrace(xerrors.Retryable(err))
239240
}
240241

241-
p.stats.Index++
242-
p.index[newItem] = struct{}{}
242+
p.mu.WithLock(func() {
243+
p.stats.Index++
244+
p.index[newItem] = struct{}{}
245+
})
243246

244247
return newItem, nil
245248
}

internal/pool/stats.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package pool
22

33
type Stats struct {
4-
Limit int
5-
Index int
6-
Idle int
7-
InUse int
4+
Limit int
5+
Index int
6+
Idle int
7+
InUse int
8+
CreateInProgress int
89
}

internal/xsync/mutex.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,23 @@ func (l *RWMutex) WithRLock(f func()) {
3232

3333
f()
3434
}
35+
36+
func WithLock[T any](l interface {
37+
Lock()
38+
Unlock()
39+
}, f func() T) T {
40+
l.Lock()
41+
defer l.Unlock()
42+
43+
return f()
44+
}
45+
46+
func WithRLock[T any](l interface {
47+
RLock()
48+
RUnlock()
49+
}, f func() T) T {
50+
l.RLock()
51+
defer l.RUnlock()
52+
53+
return f()
54+
}

internal/xsync/mutex_test.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,3 +102,37 @@ func TestRWMutex(t *testing.T) {
102102
require.Equal(t, int64(0), badSummCount)
103103
})
104104
}
105+
106+
func TestWithLock(t *testing.T) {
107+
t.Run("sync.Mutex", func(t *testing.T) {
108+
mtx := sync.Mutex{}
109+
v := WithLock(&mtx, func() int {
110+
return 123
111+
})
112+
require.Equal(t, 123, v)
113+
})
114+
t.Run("xsync.Mutex", func(t *testing.T) {
115+
mtx := Mutex{}
116+
v := WithLock(&mtx, func() int {
117+
return 123
118+
})
119+
require.Equal(t, 123, v)
120+
})
121+
}
122+
123+
func TestWithRLock(t *testing.T) {
124+
t.Run("sync.RWMutex", func(t *testing.T) {
125+
mtx := sync.RWMutex{}
126+
v := WithLock(&mtx, func() int {
127+
return 123
128+
})
129+
require.Equal(t, 123, v)
130+
})
131+
t.Run("xsync.RWMutex", func(t *testing.T) {
132+
mtx := RWMutex{}
133+
v := WithLock(&mtx, func() int {
134+
return 123
135+
})
136+
require.Equal(t, 123, v)
137+
})
138+
}

0 commit comments

Comments
 (0)