Skip to content

Commit c2f3d5b

Browse files
authored
test: improve code coverage (#38)
1 parent 56d4c67 commit c2f3d5b

File tree

3 files changed

+59
-18
lines changed

3 files changed

+59
-18
lines changed

executor.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,13 @@ func NewExecutorConfig(workerPoolSize, queueSize int) *ExecutorConfig {
5454
// Executor implements the [ExecutorService] interface.
5555
type Executor[T any] struct {
5656
cancel context.CancelFunc
57-
queue chan job[T]
57+
queue chan executorJob[T]
5858
status atomic.Uint32
5959
}
6060

6161
var _ ExecutorService[any] = (*Executor[any])(nil)
6262

63-
type job[T any] struct {
63+
type executorJob[T any] struct {
6464
promise Promise[T]
6565
task func(context.Context) (T, error)
6666
}
@@ -70,23 +70,24 @@ func NewExecutor[T any](ctx context.Context, config *ExecutorConfig) *Executor[T
7070
ctx, cancel := context.WithCancel(ctx)
7171
executor := &Executor[T]{
7272
cancel: cancel,
73-
queue: make(chan job[T], config.QueueSize),
73+
queue: make(chan executorJob[T], config.QueueSize),
7474
}
75+
// set the executor status to running explicitly
76+
executor.status.Store(uint32(ExecutorStatusRunning))
77+
7578
// init the workers pool
7679
go executor.startWorkers(ctx, config.WorkerPoolSize)
7780

7881
// set status to terminating when ctx is done
7982
go executor.monitorCtx(ctx)
8083

81-
// set the executor status to running
82-
executor.status.Store(uint32(ExecutorStatusRunning))
83-
8484
return executor
8585
}
8686

8787
func (e *Executor[T]) monitorCtx(ctx context.Context) {
8888
<-ctx.Done()
89-
e.status.Store(uint32(ExecutorStatusTerminating))
89+
_ = e.status.CompareAndSwap(uint32(ExecutorStatusRunning),
90+
uint32(ExecutorStatusTerminating))
9091
}
9192

9293
func (e *Executor[T]) startWorkers(ctx context.Context, poolSize int) {
@@ -130,7 +131,7 @@ func (e *Executor[T]) Submit(f func(context.Context) (T, error)) (Future[T], err
130131
promise := NewPromise[T]()
131132
if ExecutorStatus(e.status.Load()) == ExecutorStatusRunning {
132133
select {
133-
case e.queue <- job[T]{promise, f}:
134+
case e.queue <- executorJob[T]{promise, f}:
134135
default:
135136
return nil, ErrExecutorQueueFull
136137
}

executor_test.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package async
22

33
import (
44
"context"
5+
"errors"
56
"runtime"
67
"testing"
78
"time"
@@ -44,7 +45,7 @@ func TestExecutor(t *testing.T) {
4445
routines := runtime.NumGoroutine()
4546

4647
// shut down the executor
47-
executor.Shutdown()
48+
_ = executor.Shutdown()
4849
time.Sleep(time.Millisecond)
4950

5051
// verify that submit fails after the executor was shut down
@@ -62,6 +63,26 @@ func TestExecutor(t *testing.T) {
6263
assertFutureError(t, ErrExecutorShutdown, future5, future6)
6364
}
6465

66+
func TestExecutor_context(t *testing.T) {
67+
ctx, cancel := context.WithCancel(context.Background())
68+
executor := NewExecutor[int](ctx, NewExecutorConfig(2, 2))
69+
70+
job := func(_ context.Context) (int, error) {
71+
return 0, errors.New("error")
72+
}
73+
74+
future, err := executor.Submit(job)
75+
assert.IsNil(t, err)
76+
77+
result, err := future.Join()
78+
assert.Equal(t, result, 0)
79+
assert.ErrorContains(t, err, "error")
80+
81+
cancel()
82+
time.Sleep(5 * time.Millisecond)
83+
assert.Equal(t, executor.Status(), ExecutorStatusShutdown)
84+
}
85+
6586
func submitJob[T any](t *testing.T, executor ExecutorService[T],
6687
f func(context.Context) (T, error)) Future[T] {
6788
future, err := executor.Submit(f)

future_test.go

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func TestFuture(t *testing.T) {
1919
time.Sleep(100 * time.Millisecond)
2020
p.Success(true)
2121
}()
22+
2223
res, err := p.Future().Join()
2324

2425
assert.Equal(t, true, res)
@@ -42,6 +43,7 @@ func TestFuture_Utils(t *testing.T) {
4243
time.Sleep(300 * time.Millisecond)
4344
p3.Failure(err3)
4445
}()
46+
4547
arr := []Future[*int]{p1.Future(), p2.Future(), p3.Future()}
4648
res := []any{res1, res2, err3}
4749
futRes, _ := FutureSeq(arr).Join()
@@ -55,6 +57,7 @@ func TestFuture_FirstCompleted(t *testing.T) {
5557
time.Sleep(100 * time.Millisecond)
5658
p.Success(util.Ptr(true))
5759
}()
60+
5861
timeout := FutureTimer[*bool](10 * time.Millisecond)
5962
futRes, futErr := FutureFirstCompletedOf(p.Future(), timeout).Join()
6063

@@ -68,6 +71,7 @@ func TestFuture_Transform(t *testing.T) {
6871
time.Sleep(100 * time.Millisecond)
6972
p1.Success(util.Ptr(1))
7073
}()
74+
7175
future := p1.Future().Map(func(v *int) (*int, error) {
7276
inc := *v + 1
7377
return &inc, nil
@@ -96,6 +100,7 @@ func TestFuture_Recover(t *testing.T) {
96100
time.Sleep(10 * time.Millisecond)
97101
p2.Failure(errors.New("recover Future failure"))
98102
}()
103+
99104
future := p1.Future().Map(func(_ int) (int, error) {
100105
return 0, errors.New("map error")
101106
}).FlatMap(func(_ int) (Future[int], error) {
@@ -116,17 +121,30 @@ func TestFuture_Recover(t *testing.T) {
116121
}
117122

118123
func TestFuture_Failure(t *testing.T) {
119-
p1 := NewPromise[*int]()
120-
p2 := NewPromise[*int]()
124+
p1 := NewPromise[int]()
125+
p2 := NewPromise[int]()
126+
p3 := NewPromise[int]()
127+
err := errors.New("error")
121128
go func() {
122-
time.Sleep(10 * time.Millisecond)
123-
p1.Failure(errors.New("Future error"))
124-
time.Sleep(20 * time.Millisecond)
125-
p2.Success(util.Ptr(2))
129+
time.Sleep(5 * time.Millisecond)
130+
p1.Failure(err)
131+
time.Sleep(5 * time.Millisecond)
132+
p2.Failure(err)
133+
time.Sleep(5 * time.Millisecond)
134+
p3.Success(2)
126135
}()
127-
res, _ := p1.Future().RecoverWith(p2.Future()).Join()
128136

129-
assert.Equal(t, 2, *res)
137+
res, _ := p1.Future().
138+
Map(func(_ int) (int, error) { return 0, err }).
139+
FlatMap(func(_ int) (Future[int], error) { return p1.Future(), err }).
140+
RecoverWith(p2.Future()).
141+
RecoverWith(p3.Future()).
142+
FlatMap(func(_ int) (Future[int], error) { return p2.Future(), err }).
143+
RecoverWith(p3.Future()).
144+
RecoverWith(p3.Future()).
145+
Join()
146+
147+
assert.Equal(t, 2, res)
130148
}
131149

132150
func TestFuture_Timeout(t *testing.T) {
@@ -135,6 +153,7 @@ func TestFuture_Timeout(t *testing.T) {
135153
time.Sleep(100 * time.Millisecond)
136154
p.Success(true)
137155
}()
156+
138157
future := p.Future()
139158

140159
ctx, cancel := context.WithTimeout(context.Background(),
@@ -150,7 +169,6 @@ func TestFuture_Timeout(t *testing.T) {
150169

151170
func TestFuture_GoroutineLeak(t *testing.T) {
152171
var wg sync.WaitGroup
153-
154172
fmt.Println(runtime.NumGoroutine())
155173

156174
numFuture := 100
@@ -176,6 +194,7 @@ func TestFuture_GoroutineLeak(t *testing.T) {
176194
time.Sleep(10 * time.Millisecond)
177195
numGoroutine := runtime.NumGoroutine()
178196
fmt.Println(numGoroutine)
197+
179198
if numGoroutine > numFuture {
180199
t.Fatalf("numGoroutine is %d", numGoroutine)
181200
}

0 commit comments

Comments
 (0)