Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions core/syncx/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,34 @@ func (p *Pool) Put(x any) {
p.cond.Signal()
}

// DestroyAll destroys all resources in the pool.
// It calls the destroy function on each resource and resets the pool state.
// This is useful when you need to forcefully clean up all resources, for example:
// - When removing an obsolete pool
// - When refreshing all resources after configuration changes
// - When avoiding resource leaks in dynamic pool scenarios
func (p *Pool) DestroyAll() {
p.lock.Lock()
defer p.lock.Unlock()

// Iterate through the linked list and destroy all resources
current := p.head
for current != nil {
next := current.next
if p.destroy != nil {
p.destroy(current.item)
}
current = next
}

// Reset pool state
p.head = nil
p.created = 0

// Wake up all waiting goroutines since the pool is now empty
p.cond.Broadcast()
}

// WithMaxAge returns a function to customize a Pool with given max age.
func WithMaxAge(duration time.Duration) PoolOption {
return func(pool *Pool) {
Expand Down
149 changes: 149 additions & 0 deletions core/syncx/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,155 @@ func TestNewPoolPanics(t *testing.T) {
})
}

func TestPoolDestroyAll(t *testing.T) {
var destroyed []int
var destroyCount int32

destroyFunc := func(item any) {
destroyed = append(destroyed, item.(int))
atomic.AddInt32(&destroyCount, 1)
}

pool := NewPool(limit, create, destroyFunc)

// Put some resources into the pool
pool.Put(10)
pool.Put(20)
pool.Put(30)

// Destroy all resources
pool.DestroyAll()

// Verify all resources were destroyed
assert.Equal(t, int32(3), atomic.LoadInt32(&destroyCount))
assert.Contains(t, destroyed, 10)
assert.Contains(t, destroyed, 20)
assert.Contains(t, destroyed, 30)

// Verify pool is empty - next Get should create new resource
val := pool.Get()
assert.Equal(t, 1, val) // create() returns 1
}

func TestPoolDestroyAllEmpty(t *testing.T) {
var destroyCount int32
destroyFunc := func(_ any) {
atomic.AddInt32(&destroyCount, 1)
}

pool := NewPool(limit, create, destroyFunc)

// DestroyAll on empty pool should not panic
pool.DestroyAll()

// No resources should have been destroyed
assert.Equal(t, int32(0), atomic.LoadInt32(&destroyCount))

// Pool should still work normally
val := pool.Get()
assert.Equal(t, 1, val)
}

func TestPoolDestroyAllWithNilDestroy(t *testing.T) {
pool := NewPool(limit, create, nil)

// Put some resources into the pool
pool.Put(10)
pool.Put(20)

// DestroyAll with nil destroy function should not panic
pool.DestroyAll()

// Pool should be empty and work normally
val := pool.Get()
assert.Equal(t, 1, val)
}

func TestPoolDestroyAllConcurrency(t *testing.T) {
var destroyCount int32
var createCount int32

createFunc := func() any {
return atomic.AddInt32(&createCount, 1)
}

destroyFunc := func(_ any) {
atomic.AddInt32(&destroyCount, 1)
}

pool := NewPool(limit, createFunc, destroyFunc)

// Add some initial resources
for i := 0; i < 5; i++ {
pool.Put(i + 100)
}

var wg sync.WaitGroup
const goroutines = 10

// Concurrently perform various operations
for i := 0; i < goroutines; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()

switch id % 4 {
case 0:
// DestroyAll
pool.DestroyAll()
case 1:
// Get resources
val := pool.Get()
pool.Put(val)
case 2:
// Put resources
pool.Put(id + 1000)
case 3:
// Get and don't put back
pool.Get()
}
}(i)
}

wg.Wait()

// Final DestroyAll to clean up
pool.DestroyAll()

// Pool should work after concurrent operations
val := pool.Get()
assert.NotNil(t, val)
}

func TestPoolDestroyAllWakesWaitingGoroutines(t *testing.T) {
pool := NewPool(1, create, destroy) // Small pool size

// Fill the pool
resource := pool.Get()
assert.Equal(t, 1, resource)

var wg sync.WaitGroup
var gotResource bool

// Start a goroutine that will wait for a resource
wg.Add(1)
go func() {
defer wg.Done()
val := pool.Get() // This will block since pool is full
gotResource = true
assert.Equal(t, 1, val) // Should get a newly created resource after DestroyAll
}()

// Give the goroutine time to start waiting
time.Sleep(10 * time.Millisecond)

// DestroyAll should wake up the waiting goroutine
pool.DestroyAll()

wg.Wait()
assert.True(t, gotResource)
}

func create() any {
return 1
}
Expand Down
Loading