-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathserial.go
More file actions
151 lines (124 loc) · 3.37 KB
/
serial.go
File metadata and controls
151 lines (124 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package parallel
import (
"context"
"runtime"
"sync"
)
type SerialOption struct {
// Routines indicates the number of goroutines for parallel execution.
//
// By default, runtime.GOMAXPROCS(0) is used.
Routines int
// Window limits the maximum number of cached task execution results to handle in sequence.
//
// By default, 0 indicates no limit to execute all tasks as fast as it can.
//
// Otherwise, please set a proper value for some considerations, e.g.
// - Limits the memory usage to cache task execution results.
// - Business logic required.
Window int
}
func (opt *SerialOption) Normalize(tasks int) {
// 0 < routines <= tasks
if opt.Routines <= 0 {
opt.Routines = runtime.GOMAXPROCS(0)
}
if opt.Routines > tasks {
opt.Routines = tasks
}
if opt.Window <= 0 {
return
}
// routines <= window <= tasks
if opt.Window < opt.Routines {
opt.Window = opt.Routines
}
if opt.Window > tasks {
opt.Window = tasks
}
}
// Serial schedules tasks in sequence and handles the execution result in sequence as well.
// If any task execution failed, it will terminate and return error immediately.
func Serial[T any](ctx context.Context, parallelizable Interface[T], tasks int, option ...SerialOption) error {
if tasks <= 0 {
return nil
}
// normalize option
var opt SerialOption
if len(option) > 0 {
opt = option[0]
}
opt.Normalize(tasks)
// create channels
chLen := max(opt.Routines, opt.Window)
taskCh := make(chan int, chLen)
defer close(taskCh)
resultCh := make(chan *Result[T], chLen)
defer close(resultCh)
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(ctx)
// start goroutines to execute tasks
for i := 0; i < opt.Routines; i++ {
wg.Add(1)
go doWork(ctx, parallelizable, i, taskCh, resultCh, &wg)
}
// fill task channel at first
for i := 0; i < chLen; i++ {
taskCh <- i
}
// collect execution results
err := serialCollect(ctx, parallelizable, taskCh, resultCh, tasks, opt)
// notify all goroutines to terminate
cancel()
// wait for terminations of all goroutines
wg.Wait()
return err
}
func doWork[T any](ctx context.Context, parallelizable Interface[T], routine int, taskCh <-chan int, resultCh chan<- *Result[T], wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-ctx.Done():
return
case task := <-taskCh:
val, err := parallelizable.ParallelDo(ctx, routine, task)
resultCh <- &Result[T]{routine, task, val, err}
}
}
}
func serialCollect[T any](ctx context.Context, parallelizable Interface[T], taskCh chan<- int, resultCh <-chan *Result[T], tasks int, opt SerialOption) error {
nextTask := max(opt.Routines, opt.Window)
cache := make(map[int]*Result[T])
var nextResult int
for {
select {
case <-ctx.Done():
return ctx.Err()
case result := <-resultCh:
// immediately schedule new task if window disabled
if opt.Window <= 0 && nextTask < tasks {
taskCh <- nextTask
nextTask++
}
cache[result.Task] = result
// handle task in sequence
for cache[nextResult] != nil {
// schedule new task as soon as possible if window enabled
if opt.Window > 0 && nextTask < tasks {
taskCh <- nextTask
nextTask++
}
if err := parallelizable.ParallelCollect(ctx, cache[nextResult]); err != nil {
return err
}
// clear cache
delete(cache, nextResult)
nextResult++
}
// all tasks completed
if nextResult >= tasks {
return nil
}
}
}
}