Skip to content

Commit d5902e9

Browse files
committed
major: use atomic chunked drainer, improved bench results
1 parent 1328f74 commit d5902e9

File tree

8 files changed

+217
-243
lines changed

8 files changed

+217
-243
lines changed

README.md

Lines changed: 90 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,22 @@
77
[![GoDoc](https://pkg.go.dev/badge/github.com/rubengp99/go-pool)](https://pkg.go.dev/github.com/rubengp99/go-pool)
88
[![License](https://img.shields.io/badge/license-MIT-blue.svg)](https://github.com/rubengp99/go-pool/blob/dev/LICENSE.md)
99

10-
> A lightweight, type-safe, and retryable concurrent worker pool for Go — built on **`sync.WaitGroup`**, **semaphores**, **context**, and **condition variables**, _not_ `errgroup`.
10+
> A lightweight, type-safe, and retryable concurrent worker pool for Go — built on **`sync.WaitGroup`**, **semaphores**, **context**, and **atomic operations**, _not_ `errgroup`.
1111
1212
`go-pool` provides deterministic, leak-free concurrency with automatic retries, result draining, and type-safe tasks, suitable for high-throughput Go applications.
1313

1414
---
1515

1616
## Features
1717

18-
- Type-safe generic workers (`Task[T]`)
19-
- Graceful error propagation
20-
- Built-in retry with exponential backoff + jitter
21-
- Concurrent result draining (`Drain`)
18+
- Type-safe generic drainer (`Drainer[T]`)
19+
- Plain `Task` functions (`func() error`) for simplicity
20+
- Optional retry with exponential backoff + jitter
21+
- Concurrent result draining
2222
- Deterministic shutdown (no goroutine leaks)
23-
- Mutex + condition variable–protected data aggregation
24-
- Fluent functional composition (`WithRetry`, `DrainTo`, `WithInput`)
25-
- Implemented with `sync.WaitGroup`, semaphores, `context`, and `sync.Cond`
23+
- Minimal allocations, lock-free or mutex-protected where necessary
24+
- Fluent functional composition (`WithRetry`)
25+
- Implemented with `sync.WaitGroup`, semaphores, `context`, and `atomic` operations
2626

2727
---
2828

@@ -38,22 +38,21 @@ go get github.com/rubengp99/go-pool
3838

3939
| Type | Description |
4040
|------|-------------|
41-
| `Task[T]` | Represents a unit of async work |
42-
| `Pool` | Manages concurrent execution using WaitGroup and semaphores |
43-
| `Drain[T]` | Collects results concurrently using mutex + condition variable |
44-
| `Args[T]` | Provides task input and drainer reference |
41+
| `Task` | Represents a unit of async work (`func() error`) |
42+
| `Pool` | Manages concurrent execution using `WaitGroup` and semaphores |
43+
| `Drainer[T]` | Collects results concurrently; safe with unbuffered channels |
4544
| `Worker` | Interface for executable and retryable tasks |
45+
| `Retryable` | Allows wrapping a `Task` with retries |
4646

4747
---
4848

4949
## How It Works
5050

51-
`Pool` orchestrates multiple `Worker`s concurrently:
52-
1. Each worker runs in a separate goroutine managed by a `WaitGroup`.
51+
1. Each `Worker` runs in a separate goroutine managed by a `WaitGroup`.
5352
2. Concurrency is controlled with a semaphore.
5453
3. Shared `context` handles cancellation.
55-
4. `Drain[T]` concurrently collects results.
56-
5. On completion, resources and channels close automatically.
54+
4. `Drainer[T]` safely collects results concurrently.
55+
5. On completion, resources and channels are closed deterministically.
5756

5857
---
5958

@@ -62,24 +61,24 @@ go get github.com/rubengp99/go-pool
6261
### Basic Task
6362

6463
```go
65-
output := gopool.NewDrainer[User]()
66-
task := gopool.NewTask(func(t gopool.Args[User]) error {
67-
t.Drainer.Send(User{Name: "Alice"})
64+
drainer := gopool.NewDrainer[User]()
65+
task := gopool.NewTask(func() error {
66+
drainer.Send(User{Name: "Alice"})
6867
return nil
69-
}).DrainTo(output)
68+
})
7069

7170
pool := gopool.NewPool()
71+
pool.Go(task).Wait()
7272

73-
gopool.Go(task).Wait()
74-
results := output.Drain()
73+
results := drainer.Drain()
7574
fmt.Println(results[0].Name) // Alice
7675
```
7776

7877
### Task With Retry
7978

8079
```go
8180
var numRetries int
82-
task := gopool.NewTask(func(t gopool.Args[any]) error {
81+
task := gopool.NewTask(func() error {
8382
numRetries++
8483
if numRetries < 3 {
8584
return fmt.Errorf("transient error")
@@ -88,69 +87,71 @@ task := gopool.NewTask(func(t gopool.Args[any]) error {
8887
}).WithRetry(3, 200*time.Millisecond)
8988

9089
pool := gopool.NewPool()
91-
gopool.Go(task).Wait()
90+
pool.Go(task).Wait()
9291
```
9392

94-
### Multiple Task Types
93+
### Multiple Independent Tasks
9594

9695
```go
97-
outA := gopool.NewDrainer[A]()
98-
outB := gopool.NewDrainer[B]()
96+
drainerA := gopool.NewDrainer[A]()
97+
drainerB := gopool.NewDrainer[B]()
9998

100-
// Task A
101-
t1 := gopool.NewTask(func(t gopool.Args[A]) error {
102-
t.Drainer.Send(A{Value: "Hello"})
99+
t1 := gopool.NewTask(func() error {
100+
drainerA.Send(A{Value: "Hello"})
103101
return nil
104-
}).DrainTo(outA)
102+
})
105103

106-
// Task B
107-
t2 := gopool.NewTask(func(t gopool.Args[B]) error {
108-
t.Drainer.Send(B{Value: 42.5})
104+
t2 := gopool.NewTask(func() error {
105+
drainerB.Send(B{Value: 42.5})
109106
return nil
110-
}).DrainTo(outB)
107+
})
111108

112109
pool := gopool.NewPool()
113-
gopool.Go(t1, t2).Wait()
110+
pool.Go(t1, t2).Wait()
114111

115-
fmt.Println(outA.Drain())
116-
fmt.Println(outB.Drain())
112+
fmt.Println(drainerA.Drain())
113+
fmt.Println(drainerB.Drain())
117114
```
118115

119116
---
120117

121118
## Interfaces
122119

123120
```go
124-
type Worker interface { Executable; Retryable }
125-
type Executable interface { Execute() error }
126-
type Retryable interface { WithRetry(attempts uint, sleep time.Duration) Worker }
121+
type Worker interface {
122+
Execute() error
123+
Retryable
124+
}
125+
126+
type Retryable interface {
127+
WithRetry(attempts uint, sleep time.Duration) Worker
128+
}
127129
```
128130

131+
> No `WithInput` or `DrainTo` exists anymore; tasks handle input and result sending themselves.
132+
129133
---
130134

131-
## Structs and Functions
135+
## Drainer
136+
137+
`Drainer[T]` collects results safely even with unbuffered channels.
132138

133-
### Task[T]
134-
- `Execute()` - run the task
135-
- `WithRetry(attempts, sleep)` - add retry logic
136-
- `DrainTo(d *Drain[T])` - send output to drain
137-
- `WithInput(input *T)` - provide task input
139+
```go
140+
type Drainer[T any] chan T
138141

139-
### Pool
140-
- `Go(tasks ...Worker)` - run tasks concurrently
141-
- `WithRetry(attempts, sleep)` - global retry policy
142-
- `WithLimit(limit)` - set concurrency limit
143-
- `Wait()` - wait for all tasks
144-
- `Close()` - cancel and cleanup
142+
func NewDrainer[T any]() Drainer[T]
143+
func (d Drainer[T]) Send(v T)
144+
func (d Drainer[T]) Drain() []T
145+
func (d Drainer[T]) Close()
146+
```
145147

146-
### Drain[T]
147-
- `Send(input T)` - safely push a value
148-
- `Drain()` - collect all values
149-
- `Count()` - get collected count
148+
- `Send()` pushes a value into the drain.
149+
- `Drain()` returns a snapshot of all collected values.
150+
- `Close()` marks the drain as finished.
150151

151152
---
152153

153-
## Benchmarks
154+
## Benchmarks (v1)
154155

155156
```
156157
goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
@@ -168,81 +169,62 @@ goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
168169

169170
![Benchmark Comparison](benchmark_chart.png)
170171

171-
Even though `go-pool` adds a small constant overhead compared to `errgroup` (≈100–130 ns per operation),
172-
it provides type safety, retries, automatic draining, and deterministic cleanup — all while staying within ~1.7× of native concurrency performance.
172+
## Benchmarks (v2)
173+
174+
```
175+
goos: linux, goarch: amd64, cpu: 13th Gen Intel i9-13900KS
176+
```
177+
178+
| Name | Iterations | ns/op | B/op | allocs/op |
179+
|------------------------------------|------------:|--------:|------:|-----------:|
180+
| **ErrGroup** | 6,203,892 | **183.5** | **24** | **1** |
181+
| **GoPool** | 6,145,203 | **192.0** | 32 | 1 |
182+
| GoPoolWithDrainer | 5,508,412 | 209.0 | 127 | 2 |
183+
| ChannelsWithOutputAndErrChannel | 4,461,849 | 262.0 | 72 | 2 |
184+
| ChannelsWithWaitGroup | 4,431,901 | 271.8 | 80 | 2 |
185+
| ChannelsWithErrGroup | 4,459,243 | 274.8 | 80 | 2 |
186+
| MutexWithErrGroup | 2,896,214 | 378.3 | 135 | 2 |
173187

174-
### Benchmark Insights
175188

176-
- `GoPool` and `GoPoolWithDrainer` show consistent sub-microsecond operation times.
177-
- Memory allocations remain extremely low — under 250 B/op even with drainer support.
178-
- The performance delta vs `errgroup` reflects controlled synchronization overhead (mutex + condition variable).
179-
- In practice, `go-pool` scales linearly with worker count and maintains predictable latency under load.
189+
![Benchmark Comparison](benchmark_chart_v2.png)
190+
191+
---
192+
193+
Even though `GoPool` adds a small constant overhead compared to `ErrGroup (≈8.5 ns per operation, 192 ns vs 183.5 ns)`,
194+
it provides type safety, retries, deterministic cleanup, and concurrent draining — while staying well within ~1.05× of native concurrency performance.
195+
196+
Memory-wise, `GoPool` uses slightly more: `32 B/op` vs `24 B/op` and `1 vs 1 allocs/op`, negligible for most workloads considering the added features.
180197

181198
---
182199

183200
## Design Highlights
184201

185202
- Structured concurrency with `sync.WaitGroup`
186203
- Controlled parallelism via semaphores
187-
- Mutex + `sync.Cond`–protected drains
188204
- Context-based cancellation and cleanup
189205
- Exponential backoff retries
190206
- Leak-free, deterministic shutdown
207+
- Drainer supports unbuffered channels for high-volume inputs
191208

192209
---
193210

194-
## ⚠️ Notes and Best Practices
195-
196-
### General
211+
## Notes and Best Practices
197212

198-
- Thread Safety — never access internal slices or channels directly.
199-
- Non-blocking design — use `Drain()` or wait for pool completion instead of manual `close()` calls.
213+
- **Thread Safety:** Never access internal slices/channels directly.
214+
- **Drainer:** Use `Send()` and `Drain()`, do not close manually if multiple producers exist.
215+
- **Task Management:** Wrap work with `NewTask(func() error)` and optionally `.WithRetry()`.
200216

201-
### Drainer (Drain)
202-
203-
- Create via `gopool.NewDrainer[T]()`
204-
- Use `Send()` to safely push results
205-
- Collect values using `Drain()`
206-
- Internally guarded by `sync.Mutex` and `sync.Cond`
207-
208-
### Task and Worker Management
209-
210-
- Wrap async functions with `gopool.NewTask()`
211-
- Chain configuration fluently using `.WithRetry()` and `.DrainTo()`
212-
- Provide inputs using `.WithInput()`
213-
214-
### Pool
215-
216-
- Use `gopool.NewPool()` for controlled concurrency
217-
- Limit parallelism with `.WithLimit(limit)`
218-
- Apply retry policy globally with `.WithRetry(attempts, sleep)`
219-
- Wait for all tasks to complete using `.Wait()`
217+
---
220218

221-
### Testing
219+
## Testing
222220

223-
- Run deterministic tests with:
224221
```bash
225222
go test -v ./...
226-
```
227-
- Benchmark performance with:
228-
```bash
229223
go test -bench . -benchmem -memprofile=mem.prof
230224
```
231-
---
232-
233-
## Summary
234-
235-
`go-pool` provides a modern, type-safe, and retryable abstraction over Go’s native synchronization primitives — combining simplicity, determinism, and high throughput.
236-
237-
Built for developers who want concurrency that’s:
238-
239-
- Readable
240-
- Deterministic
241-
- Retry-aware
242-
- Leak-free
243225

244226
---
245227

246228
## License
247229

248-
MIT License © 2025 [rubengp99](https://github.com/rubengp99)
230+
MIT License © 2025 [rubengp99](https://github.com/rubengp99)

benchmark_chart_v2.png

96.7 KB
Loading

benchmark_test.go

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,7 @@ func BenchmarkAsyncPackage(b *testing.B) {
2525

2626
b.Run("GoPool", func(b *testing.B) {
2727
for i := 0; i < b.N; i++ {
28-
d.Go(gopool.NewTask(func(arg gopool.Args[int]) error {
29-
return SimulatedTask()
30-
}))
28+
d.Go(gopool.NewTask(SimulatedTask))
3129
}
3230

3331
if err := d.Wait(); err != nil {
@@ -45,11 +43,11 @@ func BenchmarkAsyncPackageWithDrainer(b *testing.B) {
4543
o := gopool.NewDrainer[int]()
4644
// Create a Drain channel for async operations
4745
for i := 0; i < b.N; i++ {
48-
d.Go(gopool.NewTask(func(arg gopool.Args[int]) error {
46+
d.Go(gopool.NewTask(func() error {
4947
i := i
50-
arg.Drainer.Send(i)
48+
o.Send(i)
5149
return SimulatedTask()
52-
}).DrainTo(o))
50+
}))
5351
}
5452

5553
if err := d.Wait(); err != nil {

0 commit comments

Comments
 (0)