diff --git a/core/syncx/pool.go b/core/syncx/pool.go index c84d83102faf..6b7981078f06 100644 --- a/core/syncx/pool.go +++ b/core/syncx/pool.go @@ -100,6 +100,34 @@ func (p *Pool) Put(x any) { p.cond.Signal() } +// DestroyAll destroys all resources in the pool. +// It calls the destroy function on each resource and resets the pool state. +// This is useful when you need to forcefully clean up all resources, for example: +// - When removing an obsolete pool +// - When refreshing all resources after configuration changes +// - When avoiding resource leaks in dynamic pool scenarios +func (p *Pool) DestroyAll() { + p.lock.Lock() + defer p.lock.Unlock() + + // Iterate through the linked list and destroy all resources + current := p.head + for current != nil { + next := current.next + if p.destroy != nil { + p.destroy(current.item) + } + current = next + } + + // Reset pool state + p.head = nil + p.created = 0 + + // Wake up all waiting goroutines since the pool is now empty + p.cond.Broadcast() +} + // WithMaxAge returns a function to customize a Pool with given max age. func WithMaxAge(duration time.Duration) PoolOption { return func(pool *Pool) { diff --git a/core/syncx/pool_test.go b/core/syncx/pool_test.go index 19c73934043e..322d52b54d9a 100644 --- a/core/syncx/pool_test.go +++ b/core/syncx/pool_test.go @@ -107,6 +107,155 @@ func TestNewPoolPanics(t *testing.T) { }) } +func TestPoolDestroyAll(t *testing.T) { + var destroyed []int + var destroyCount int32 + + destroyFunc := func(item any) { + destroyed = append(destroyed, item.(int)) + atomic.AddInt32(&destroyCount, 1) + } + + pool := NewPool(limit, create, destroyFunc) + + // Put some resources into the pool + pool.Put(10) + pool.Put(20) + pool.Put(30) + + // Destroy all resources + pool.DestroyAll() + + // Verify all resources were destroyed + assert.Equal(t, int32(3), atomic.LoadInt32(&destroyCount)) + assert.Contains(t, destroyed, 10) + assert.Contains(t, destroyed, 20) + assert.Contains(t, destroyed, 30) + + // Verify pool is empty - next Get should create new resource + val := pool.Get() + assert.Equal(t, 1, val) // create() returns 1 +} + +func TestPoolDestroyAllEmpty(t *testing.T) { + var destroyCount int32 + destroyFunc := func(_ any) { + atomic.AddInt32(&destroyCount, 1) + } + + pool := NewPool(limit, create, destroyFunc) + + // DestroyAll on empty pool should not panic + pool.DestroyAll() + + // No resources should have been destroyed + assert.Equal(t, int32(0), atomic.LoadInt32(&destroyCount)) + + // Pool should still work normally + val := pool.Get() + assert.Equal(t, 1, val) +} + +func TestPoolDestroyAllWithNilDestroy(t *testing.T) { + pool := NewPool(limit, create, nil) + + // Put some resources into the pool + pool.Put(10) + pool.Put(20) + + // DestroyAll with nil destroy function should not panic + pool.DestroyAll() + + // Pool should be empty and work normally + val := pool.Get() + assert.Equal(t, 1, val) +} + +func TestPoolDestroyAllConcurrency(t *testing.T) { + var destroyCount int32 + var createCount int32 + + createFunc := func() any { + return atomic.AddInt32(&createCount, 1) + } + + destroyFunc := func(_ any) { + atomic.AddInt32(&destroyCount, 1) + } + + pool := NewPool(limit, createFunc, destroyFunc) + + // Add some initial resources + for i := 0; i < 5; i++ { + pool.Put(i + 100) + } + + var wg sync.WaitGroup + const goroutines = 10 + + // Concurrently perform various operations + for i := 0; i < goroutines; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + + switch id % 4 { + case 0: + // DestroyAll + pool.DestroyAll() + case 1: + // Get resources + val := pool.Get() + pool.Put(val) + case 2: + // Put resources + pool.Put(id + 1000) + case 3: + // Get and don't put back + pool.Get() + } + }(i) + } + + wg.Wait() + + // Final DestroyAll to clean up + pool.DestroyAll() + + // Pool should work after concurrent operations + val := pool.Get() + assert.NotNil(t, val) +} + +func TestPoolDestroyAllWakesWaitingGoroutines(t *testing.T) { + pool := NewPool(1, create, destroy) // Small pool size + + // Fill the pool + resource := pool.Get() + assert.Equal(t, 1, resource) + + var wg sync.WaitGroup + var gotResource bool + + // Start a goroutine that will wait for a resource + wg.Add(1) + go func() { + defer wg.Done() + val := pool.Get() // This will block since pool is full + gotResource = true + assert.Equal(t, 1, val) // Should get a newly created resource after DestroyAll + }() + + // Give the goroutine time to start waiting + time.Sleep(10 * time.Millisecond) + + // DestroyAll should wake up the waiting goroutine + pool.DestroyAll() + + wg.Wait() + assert.True(t, gotResource) +} + func create() any { return 1 }