Skip to content

Commit 06c990c

Browse files
joeybloggsjoeybloggs
authored andcommitted
Add batch WaitAll() function of when result checking is not necessary
1 parent e83f58b commit 06c990c

File tree

6 files changed

+93
-8
lines changed

6 files changed

+93
-8
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package pool
22
============
33

4-
![Project status](https://img.shields.io/badge/version-3.0.0-green.svg)
4+
![Project status](https://img.shields.io/badge/version-3.1.0-green.svg)
55
[![Build Status](https://semaphoreci.com/api/v1/joeybloggs/pool/branches/v3/badge.svg)](https://semaphoreci.com/joeybloggs/pool)
66
[![Coverage Status](https://coveralls.io/repos/go-playground/pool/badge.svg?branch=v3&service=github)](https://coveralls.io/github/go-playground/pool?branch=v3)
77
[![Go Report Card](https://goreportcard.com/badge/gopkg.in/go-playground/pool.v3)](https://goreportcard.com/report/gopkg.in/go-playground/pool.v3)
@@ -49,7 +49,7 @@ Important Information READ THIS!
4949

5050
- It is recommended that you cancel a pool or batch from the calling function and not inside of the Unit of Work, it will work fine, however because of the goroutine scheduler and context switching it may not cancel as soon as if called from outside.
5151
- When Batching DO NOT FORGET TO CALL batch.QueueComplete(), if you do the Batch WILL deadlock
52-
- It is your responsibility to call WorkUnit.IsCancelled() to check if it's cancelled after a blocking operation like waiting for a connection from a pool.
52+
- It is your responsibility to call WorkUnit.IsCancelled() to check if it's cancelled after a blocking operation like waiting for a connection from a pool. (optional)
5353

5454
Usage and documentation
5555
------

batch.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,13 @@ type Batch interface {
2222
// Results returns a Work Unit result channel that will output all
2323
// completed units of work.
2424
Results() <-chan WorkUnit
25+
26+
// WaitAll is an alternative to Results() where you
27+
// may want/need to wait until all work has been
28+
// processed, but don't need to check results.
29+
// eg. individual units of work may handle their own
30+
// errors, logging...
31+
WaitAll()
2532
}
2633

2734
// batch contains all information for a batch run of WorkUnits
@@ -111,3 +118,14 @@ func (b *batch) Results() <-chan WorkUnit {
111118

112119
return b.results
113120
}
121+
122+
// WaitAll is an alternative to Results() where you
123+
// may want/need to wait until all work has been
124+
// processed, but don't need to check results.
125+
// eg. individual units of work may handle their own
126+
// errors and logging...
127+
func (b *batch) WaitAll() {
128+
129+
for range b.Results() {
130+
}
131+
}

batch_limited_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pool
22

33
import (
4+
"sync"
45
"testing"
56
"time"
67

@@ -135,3 +136,37 @@ func TestLimitedBatchCancelItemsCancelledAfterward(t *testing.T) {
135136

136137
Equal(t, count, 40)
137138
}
139+
140+
func TestLimitedBatchWaitAll(t *testing.T) {
141+
142+
var count int
143+
var m sync.Mutex
144+
145+
newFunc := func(i int) func(WorkUnit) (interface{}, error) {
146+
return func(WorkUnit) (interface{}, error) {
147+
time.Sleep(time.Second * 1)
148+
m.Lock()
149+
count++
150+
m.Unlock()
151+
return i, nil
152+
}
153+
}
154+
155+
pool := NewLimited(4)
156+
defer pool.Close()
157+
158+
batch := pool.Batch()
159+
160+
go func() {
161+
162+
for i := 0; i < 10; i++ {
163+
batch.Queue(newFunc(i))
164+
}
165+
166+
batch.QueueComplete()
167+
}()
168+
169+
batch.WaitAll()
170+
171+
Equal(t, count, 10)
172+
}

batch_unlimited_test.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package pool
22

33
import (
4+
"sync"
45
"testing"
56
"time"
67

@@ -135,3 +136,37 @@ func TestUnlimitedBatchCancelItemsCancelledAfterward(t *testing.T) {
135136

136137
Equal(t, count, 40)
137138
}
139+
140+
func TestUnlimitedBatchWaitAll(t *testing.T) {
141+
142+
var count int
143+
var m sync.Mutex
144+
145+
newFunc := func(i int) func(WorkUnit) (interface{}, error) {
146+
return func(WorkUnit) (interface{}, error) {
147+
time.Sleep(time.Second * 1)
148+
m.Lock()
149+
count++
150+
m.Unlock()
151+
return i, nil
152+
}
153+
}
154+
155+
pool := New()
156+
defer pool.Close()
157+
158+
batch := pool.Batch()
159+
160+
go func() {
161+
162+
for i := 0; i < 10; i++ {
163+
batch.Queue(newFunc(i))
164+
}
165+
166+
batch.QueueComplete()
167+
}()
168+
169+
batch.WaitAll()
170+
171+
Equal(t, count, 10)
172+
}

doc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ important usage information
4141
if you do the Batch WILL deadlock
4242
4343
- It is your responsibility to call WorkUnit.IsCancelled() to check if it's cancelled
44-
after a blocking operation like waiting for a connection from a pool.
44+
after a blocking operation like waiting for a connection from a pool. (optional)
4545
4646
4747
Usage and documentation

unlimited_pool_test.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,12 +73,9 @@ func TestUnlimitedCancel(t *testing.T) {
7373

7474
go func(ch chan WorkUnit) {
7575
m.RLock()
76-
if closed {
77-
m.RUnlock()
78-
return
76+
if !closed {
77+
ch <- pool.Queue(newFunc(time.Second * 1))
7978
}
80-
81-
ch <- pool.Queue(newFunc(time.Second * 1))
8279
m.RUnlock()
8380
}(ch)
8481
}

0 commit comments

Comments
 (0)