Skip to content

Commit e791168

Browse files
authored
feat: add task executor implementation (#36)
1 parent c2a1826 commit e791168

File tree

4 files changed

+278
-4
lines changed

4 files changed

+278
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Async is a synchronization and asynchronous computation package for Go.
1313
* **ShardedMap** - Implements the generic `async.Map` interface in a thread-safe manner, delegating load/store operations to one of the underlying `async.SynchronizedMap`s (shards), using a key hash to calculate the shard number.
1414
* **Future** - A placeholder object for a value that may not yet exist.
1515
* **Promise** - While futures are defined as a type of read-only placeholder object created for a result which doesn’t yet exist, a promise can be thought of as a writable, single-assignment container, which completes a future.
16+
* **Executor** - A worker pool for executing asynchronous tasks, where each submission returns a Future instance representing the result of the task.
1617
* **Task** - A data type for controlling possibly lazy and asynchronous computations.
1718
* **Once** - An object similar to sync.Once having the Do method taking `f func() (T, error)` and returning `(T, error)`.
1819
* **Value** - An object similar to atomic.Value, but without the consistent type constraint.

examples/future/main.go

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,56 @@
11
package main
22

33
import (
4+
"context"
45
"log"
56
"time"
67

78
"github.com/reugn/async"
89
)
910

11+
const ok = "OK"
12+
1013
func main() {
11-
future := asyncAction()
12-
result, err := future.Join()
14+
// using a promise
15+
future1 := asyncAction()
16+
result1, err := future1.Join()
17+
if err != nil {
18+
log.Fatal(err)
19+
}
20+
log.Print(result1)
21+
22+
// using a task
23+
task := async.NewTask(func() (string, error) { return ok, nil })
24+
result2, err := task.Call().Join()
25+
if err != nil {
26+
log.Fatal(err)
27+
}
28+
log.Print(result2)
29+
30+
// using the executor
31+
ctx := context.Background()
32+
executor := async.NewExecutor[*string](ctx, async.NewExecutorConfig(2, 2))
33+
34+
future3, err := executor.Submit(func(_ context.Context) (*string, error) {
35+
value := ok
36+
return &value, nil
37+
})
38+
if err != nil {
39+
log.Fatal(err)
40+
}
41+
42+
result3, err := future3.Get(ctx)
1343
if err != nil {
1444
log.Fatal(err)
1545
}
16-
log.Print(result)
46+
log.Print(*result3)
1747
}
1848

1949
func asyncAction() async.Future[string] {
2050
promise := async.NewPromise[string]()
2151
go func() {
2252
time.Sleep(time.Second)
23-
promise.Success("OK")
53+
promise.Success(ok)
2454
}()
2555

2656
return promise.Future()

executor.go

Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"sync/atomic"
8+
)
9+
10+
// ExecutorStatus represents the status of an [ExecutorService].
11+
type ExecutorStatus uint32
12+
13+
const (
14+
ExecutorStatusRunning ExecutorStatus = iota
15+
ExecutorStatusTerminating
16+
ExecutorStatusShutdown
17+
)
18+
19+
var (
20+
ErrExecutorQueueFull = errors.New("async: executor queue is full")
21+
ErrExecutorShutdown = errors.New("async: executor is shut down")
22+
)
23+
24+
// ExecutorService is an interface that defines a task executor.
25+
type ExecutorService[T any] interface {
26+
// Submit submits a function to the executor service.
27+
// The function will be executed asynchronously and the result will be
28+
// available via the returned future.
29+
Submit(func(context.Context) (T, error)) (Future[T], error)
30+
31+
// Shutdown shuts down the executor service.
32+
// Once the executor service is shut down, no new tasks can be submitted
33+
// and any pending tasks will be cancelled.
34+
Shutdown() error
35+
36+
// Status returns the current status of the executor service.
37+
Status() ExecutorStatus
38+
}
39+
40+
// ExecutorConfig represents the Executor configuration.
41+
type ExecutorConfig struct {
42+
WorkerPoolSize int
43+
QueueSize int
44+
}
45+
46+
// NewExecutorConfig returns a new [ExecutorConfig].
47+
func NewExecutorConfig(workerPoolSize, queueSize int) *ExecutorConfig {
48+
return &ExecutorConfig{
49+
WorkerPoolSize: workerPoolSize,
50+
QueueSize: queueSize,
51+
}
52+
}
53+
54+
// Executor implements the [ExecutorService] interface.
55+
type Executor[T any] struct {
56+
cancel context.CancelFunc
57+
queue chan job[T]
58+
status atomic.Uint32
59+
}
60+
61+
var _ ExecutorService[any] = (*Executor[any])(nil)
62+
63+
type job[T any] struct {
64+
promise Promise[T]
65+
task func(context.Context) (T, error)
66+
}
67+
68+
// NewExecutor returns a new [Executor].
69+
func NewExecutor[T any](ctx context.Context, config *ExecutorConfig) *Executor[T] {
70+
ctx, cancel := context.WithCancel(ctx)
71+
executor := &Executor[T]{
72+
cancel: cancel,
73+
queue: make(chan job[T], config.QueueSize),
74+
}
75+
// init the workers pool
76+
go executor.startWorkers(ctx, config.WorkerPoolSize)
77+
78+
// set status to terminating when ctx is done
79+
go executor.monitorCtx(ctx)
80+
81+
// set the executor status to running
82+
executor.status.Store(uint32(ExecutorStatusRunning))
83+
84+
return executor
85+
}
86+
87+
func (e *Executor[T]) monitorCtx(ctx context.Context) {
88+
<-ctx.Done()
89+
e.status.Store(uint32(ExecutorStatusTerminating))
90+
}
91+
92+
func (e *Executor[T]) startWorkers(ctx context.Context, poolSize int) {
93+
var wg sync.WaitGroup
94+
for i := 0; i < poolSize; i++ {
95+
wg.Add(1)
96+
go func() {
97+
defer wg.Done()
98+
loop:
99+
for ExecutorStatus(e.status.Load()) == ExecutorStatusRunning {
100+
select {
101+
case job := <-e.queue:
102+
result, err := job.task(ctx)
103+
if err != nil {
104+
job.promise.Failure(err)
105+
} else {
106+
job.promise.Success(result)
107+
}
108+
case <-ctx.Done():
109+
break loop
110+
}
111+
}
112+
}()
113+
}
114+
115+
// wait for all workers to exit
116+
wg.Wait()
117+
// close the queue and cancel all pending tasks
118+
close(e.queue)
119+
for job := range e.queue {
120+
job.promise.Failure(ErrExecutorShutdown)
121+
}
122+
// mark the executor as shut down
123+
e.status.Store(uint32(ExecutorStatusShutdown))
124+
}
125+
126+
// Submit submits a function to the executor.
127+
// The function will be executed asynchronously and the result will be
128+
// available via the returned future.
129+
func (e *Executor[T]) Submit(f func(context.Context) (T, error)) (Future[T], error) {
130+
promise := NewPromise[T]()
131+
if ExecutorStatus(e.status.Load()) == ExecutorStatusRunning {
132+
select {
133+
case e.queue <- job[T]{promise, f}:
134+
default:
135+
return nil, ErrExecutorQueueFull
136+
}
137+
} else {
138+
return nil, ErrExecutorShutdown
139+
}
140+
return promise.Future(), nil
141+
}
142+
143+
// Shutdown shuts down the executor.
144+
// Once the executor service is shut down, no new tasks can be submitted
145+
// and any pending tasks will be cancelled.
146+
func (e *Executor[T]) Shutdown() error {
147+
e.cancel()
148+
return nil
149+
}
150+
151+
// Status returns the current status of the executor.
152+
func (e *Executor[T]) Status() ExecutorStatus {
153+
return ExecutorStatus(e.status.Load())
154+
}

executor_test.go

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package async
2+
3+
import (
4+
"context"
5+
"runtime"
6+
"testing"
7+
"time"
8+
9+
"github.com/reugn/async/internal/assert"
10+
)
11+
12+
func TestExecutor(t *testing.T) {
13+
ctx := context.Background()
14+
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))
15+
16+
job := func(_ context.Context) (int, error) {
17+
time.Sleep(time.Millisecond)
18+
return 1, nil
19+
}
20+
jobLong := func(_ context.Context) (int, error) {
21+
time.Sleep(10 * time.Millisecond)
22+
return 1, nil
23+
}
24+
25+
future1 := submitJob[int](t, executor, job)
26+
future2 := submitJob[int](t, executor, job)
27+
28+
// wait for the first two jobs to complete
29+
time.Sleep(3 * time.Millisecond)
30+
31+
// submit four more jobs
32+
future3 := submitJob[int](t, executor, jobLong)
33+
future4 := submitJob[int](t, executor, jobLong)
34+
future5 := submitJob[int](t, executor, jobLong)
35+
future6 := submitJob[int](t, executor, jobLong)
36+
37+
// the queue has reached its maximum capacity
38+
future7, err := executor.Submit(job)
39+
assert.ErrorIs(t, err, ErrExecutorQueueFull)
40+
assert.IsNil(t, future7)
41+
42+
assert.Equal(t, executor.Status(), ExecutorStatusRunning)
43+
44+
routines := runtime.NumGoroutine()
45+
46+
// shut down the executor
47+
executor.Shutdown()
48+
time.Sleep(time.Millisecond)
49+
50+
// verify that submit fails after the executor was shut down
51+
_, err = executor.Submit(job)
52+
assert.ErrorIs(t, err, ErrExecutorShutdown)
53+
54+
// validate the executor status
55+
assert.Equal(t, executor.Status(), ExecutorStatusTerminating)
56+
time.Sleep(10 * time.Millisecond)
57+
assert.Equal(t, executor.Status(), ExecutorStatusShutdown)
58+
59+
assert.Equal(t, routines, runtime.NumGoroutine()+4)
60+
61+
assertFutureResult(t, 1, future1, future2, future3, future4)
62+
assertFutureError(t, ErrExecutorShutdown, future5, future6)
63+
}
64+
65+
func submitJob[T any](t *testing.T, executor ExecutorService[T],
66+
f func(context.Context) (T, error)) Future[T] {
67+
future, err := executor.Submit(f)
68+
assert.IsNil(t, err)
69+
70+
time.Sleep(time.Millisecond) // switch context
71+
return future
72+
}
73+
74+
func assertFutureResult[T any](t *testing.T, expected T, futures ...Future[T]) {
75+
for _, future := range futures {
76+
result, err := future.Join()
77+
assert.IsNil(t, err)
78+
assert.Equal(t, expected, result)
79+
}
80+
}
81+
82+
func assertFutureError[T any](t *testing.T, expected error, futures ...Future[T]) {
83+
for _, future := range futures {
84+
result, err := future.Join()
85+
var zero T
86+
assert.Equal(t, zero, result)
87+
assert.ErrorIs(t, err, expected)
88+
}
89+
}

0 commit comments

Comments
 (0)