Skip to content

fix: fix the worker queue reboot but the slice is empty#360

Merged
panjf2000 merged 6 commits intopanjf2000:devfrom
POABOB:fix-pre-alloc-reboot-panic
Mar 7, 2025
Merged

fix: fix the worker queue reboot but the slice is empty#360
panjf2000 merged 6 commits intopanjf2000:devfrom
POABOB:fix-pre-alloc-reboot-panic

Conversation

@POABOB
Copy link
Contributor

@POABOB POABOB commented Mar 5, 2025

1. Are you opening this pull request for bug-fixs, optimizations or new feature?

Yes, for bug-fixs.

2. Please describe how these code changes achieve your intention.

  1. I re-alloc the workerQueue when reboot.
func (p *poolCommon) Reboot() {
	if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
		atomic.StoreInt32(&p.purgeDone, 0)
		p.goPurge()
		atomic.StoreInt32(&p.ticktockDone, 0)
		p.goTicktock()
		p.allDone = make(chan struct{})
		p.once = &sync.Once{}
                // add this line
		if p.options.PreAlloc {
			p.workers = newWorkerQueue(queueTypeLoopQueue, int(p.capacity))
		}
	}
}
  1. Add the pool_test.go to test this scenario.

Below is the analysis

1. Variable Initialize

The problem is that when we use preAlloc option, the workerQueue will initialize once.
Then, It will put the worker into the items slice as much as user defined.
The len is {size} and cap = {size} of items []worker.

type loopQueue struct {
	items  []worker
	expiry []worker
	head   int
	tail   int
	size   int
	isFull bool
}

func newWorkerLoopQueue(size int) *loopQueue {
	if size <= 0 {
		return nil
	}
	return &loopQueue{
		items: make([]worker, size), // len = {size}, cap = {size}
		size:  size,
	}
}

2. Pool Release

However, when user call the Release() or ReleaseTimeout() function, it will trigger reset() function and make all workers in items be nil.
The len is 0 and cap = {size} of items []worker.

func (wq *loopQueue) reset() {
	if wq.isEmpty() {
		return
	}

retry:
	if w := wq.detach(); w != nil {
		w.finish()
		goto retry
	}
	wq.items = wq.items[:0]
	wq.size = 0
	wq.head = 0
	wq.tail = 0
}

3. Reboot and No Workers

When the user want to reuse the Pool and call Reboot() function, the pool was restart but the len of workerQueue still 0.

func (p *poolCommon) Reboot() {
	if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
		atomic.StoreInt32(&p.purgeDone, 0)
		p.goPurge()
		atomic.StoreInt32(&p.ticktockDone, 0)
		p.goTicktock()
		p.allDone = make(chan struct{})
		p.once = &sync.Once{}
	}
}

But retrieveWorker() still can get the workers until it put back to workerQueue.

// retrieveWorker returns an available worker to run the tasks.
func (p *poolCommon) retrieveWorker() (w worker, err error) {
	p.lock.Lock()

retry:
	// First try to fetch the worker from the queue.
	if w = p.workers.detach(); w != nil {
		p.lock.Unlock()
		return
	}

	// If the worker queue is empty, and we don't run out of the pool capacity,
	// then just spawn a new worker goroutine.
	if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		p.lock.Unlock()
		w = p.workerCache.Get().(worker)
		w.run()
		return
	}

        // ...
	goto retry
}

Here is the point. When the new worker is prepared to put back to the workerQueue, wq.items len is 0.
If we direct access it, it will panic.

func (wq *loopQueue) insert(w worker) error {
	if wq.isFull {
		return errQueueIsFull
	}
	wq.items[wq.tail] = w                  // panic because of len is 0
	wq.tail = (wq.tail + 1) % wq.size

	if wq.tail == wq.head {
		wq.isFull = true
	}

	return nil
}

3. Please link to the relevant issues (if any).

Issue #358.

4. Which documentation changes (if any) need to be made/updated because of this PR?

No document description changed.

4. Checklist

  • I have squashed all insignificant commits.
  • I have commented my code for explaining package types, values, functions, and non-obvious lines.
  • I have written unit tests and verified that all tests passes (if needed).
  • I have documented feature info on the README (only when this PR is adding a new feature).
  • (optional) I am willing to help maintain this change if there are issues with it later.

@panjf2000
Copy link
Owner

panjf2000 commented Mar 5, 2025

Good catch, thanks!

But can we reuse the queue instead of reallocating it?

@panjf2000
Copy link
Owner

panjf2000 commented Mar 5, 2025

I think we can just remove these lines:

ants/worker_loop_queue.go

Lines 173 to 174 in 1bf9cfd

wq.items = wq.items[:0]
wq.size = 0

@POABOB
Copy link
Contributor Author

POABOB commented Mar 5, 2025

I think we can just remove these lines:

ants/worker_loop_queue.go

Lines 173 to 174 in 1bf9cfd

wq.items = wq.items[:0]
wq.size = 0

Sure. It's a good idea.
I use defer to Unlock the mutex lock. I think the code will be clear when using retrieveWorker() and revertWorker(). Do you agree?

@panjf2000
Copy link
Owner

I use defer to Unlock the mutex lock. I think the code will be clear when using retrieveWorker() and revertWorker(). Do you agree?

Maybe, but generally speaking, a PR shouldn't introduce irrelevant changes when trying to fix something. Let's do this in a separate PR and make sure the defer directive will not hurt the performance.

@POABOB
Copy link
Contributor Author

POABOB commented Mar 6, 2025

I use defer to Unlock the mutex lock. I think the code will be clear when using retrieveWorker() and revertWorker(). Do you agree?

Maybe, but generally speaking, a PR shouldn't introduce irrelevant changes when trying to fix something. Let's do this in a separate PR and make sure the defer directive will not hurt the performance.

Agree. I have restored it.

@POABOB
Copy link
Contributor Author

POABOB commented Mar 6, 2025

By the way, I have read some articles, and these show defer statement has no good performance on very short time lock.
Because the project is using spinLock, goroutines don't give out the cpu time.
That’s impressive performance changes!

Copy link
Owner

@panjf2000 panjf2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename these two tests to TestRebootNewPoolCalc and TestRebootNewPoolWithPreAllocCalc. Then move them to ants_test.go

@panjf2000
Copy link
Owner

Also, you need to fix the lint issues.

@POABOB
Copy link
Contributor Author

POABOB commented Mar 6, 2025

Also, you need to fix the lint issues.

Done!

@codecov
Copy link

codecov bot commented Mar 6, 2025

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 94.09%. Comparing base (1bf9cfd) to head (6780f95).
Report is 1 commits behind head on dev.

Additional details and impacted files
@@            Coverage Diff             @@
##              dev     #360      +/-   ##
==========================================
- Coverage   94.11%   94.09%   -0.02%     
==========================================
  Files          14       14              
  Lines         968      966       -2     
==========================================
- Hits          911      909       -2     
  Misses         46       46              
  Partials       11       11              
Flag Coverage Δ
unittests 94.09% <ø> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Owner

@panjf2000 panjf2000 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@panjf2000 panjf2000 added the pending merged This PR has been reviewed and approved label Mar 7, 2025
@panjf2000 panjf2000 merged commit a445942 into panjf2000:dev Mar 7, 2025
13 checks passed
@POABOB POABOB deleted the fix-pre-alloc-reboot-panic branch March 8, 2025 05:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

pending merged This PR has been reviewed and approved

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants