Skip to content

Commit e19d583

Browse files
committed
feat(stream): 实现流式处理基础功能及并行处理
添加流式处理核心功能,包括Stream结构体、Filter操作和并行处理能力 实现顺序和并行两种处理模式,支持保序和乱序收集 添加单元测试和性能基准测试 配置CI/CD工作流和项目基础文件
1 parent a2dfd67 commit e19d583

File tree

10 files changed

+344
-0
lines changed

10 files changed

+344
-0
lines changed

.github/workflows/test.yml

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
name: Test
2+
name: Go CI/CD Pipeline
3+
4+
on:
5+
push:
6+
branches: [ main, master ]
7+
pull_request:
8+
branches: [ main, master ]
9+
10+
jobs:
11+
test:
12+
name: Test on ${{ matrix.os }} with Go ${{ matrix.go-version }}
13+
runs-on: ${{ matrix.os }}
14+
strategy:
15+
matrix:
16+
os: [ ubuntu-latest, windows-latest, macos-latest ]
17+
go-version: [ '1.21', '1.22', '1.23' ]
18+
fail-fast: false
19+
20+
steps:
21+
- name: Check out code
22+
uses: actions/checkout@v4
23+
24+
- name: Set up Go ${{ matrix.go-version }}
25+
uses: actions/setup-go@v5
26+
with:
27+
go-version: ${{ matrix.go-version }}
28+
cache-dependency-path: go.sum
29+
30+
- name: Cache Go modules
31+
uses: actions/cache@v4
32+
with:
33+
path: |
34+
~/.cache/go-build
35+
~/go/pkg/mod
36+
key: ${{ matrix.os }}-go-${{ matrix.go-version }}-${{ hashFiles('**/go.sum') }}
37+
restore-keys: |
38+
${{ matrix.os }}-go-${{ matrix.go-version }}-
39+
40+
- name: Install dependencies
41+
run: |
42+
go mod tidy
43+
go mod download
44+
45+
- name: Run Unit tests with coverage
46+
run: |
47+
go test -v -race -coverprofile=covprofile -covermode=atomic -coverpkg=./... ./...
48+
49+
- name: Install golangci-lint
50+
if: runner.os == 'Linux'
51+
run: go install github.com/golangci/golangci-lint/cmd/golangci-lint@latest
52+
53+
- name: Run lint (Linux only)
54+
if: runner.os == 'Linux'
55+
run: golangci-lint run --timeout=5m
56+
57+
- name: Install goveralls
58+
if: runner.os == 'Linux'
59+
run: go install github.com/mattn/goveralls@latest
60+
61+
- name: Send coverage to Coveralls (Linux only)
62+
if: runner.os == 'Linux' && github.event_name == 'push' && github.ref == 'refs/heads/main' || github.ref == 'refs/heads/master'
63+
env:
64+
COVERALLS_TOKEN: ${{ secrets.GITHUB_TOKEN }}
65+
run: goveralls -coverprofile=covprofile -service=github -repotoken=${{ secrets.GITHUB_TOKEN }}
66+
67+
- name: Build
68+
run: go build -v ./...

Makefile

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
.PHONY: clean test
2+
3+
test:
4+
go test ./...
5+
go test -bench=. -benchmem ./...
6+
7+
clean:
8+
go clean ./...

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,9 @@
11
# goex
2+
23
Practical Go extensions for everyday programming: streams, conditions, options, and more.
4+
5+
<a title="Build Status" target="_blank" href="https://github.com/xbmlz/goex/actions/workflows/test.yml"><img src="https://img.shields.io/github/actions/workflow/status/xbmlz/goex/test.yml?style=flat-square"></a>
6+
<a title="GoDoc" target="_blank" href="https://godoc.org/github.com/xbmlz/goex"><img src="http://img.shields.io/badge/godoc-reference-5272B4.svg?style=flat-square"></a>
7+
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/xbmlz/goex"><img src="https://goreportcard.com/badge/github.com/xbmlz/goex?style=flat-square"></a>
8+
<a title="Coverage Status" target="_blank" href="https://coveralls.io/github/xbmlz/goex"><img src="https://img.shields.io/coveralls/github/xbmlz/goex.svg?style=flat-square&color=CC9933"></a>
9+
<a title="Code Size" target="_blank" href="https://github.com/xbmlz/goex"><img src="https://img.shields.io/github/languages/code-size/xbmlz/goex.svg?style=flat-square"></a>

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/xbmlz/goex
2+
3+
go 1.24.2

streams/intermediate.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package stream
2+
3+
func (s *Stream[T]) Filter(f func(T) bool) *Stream[T] {
4+
s.operations = append(s.operations, FilterOperation[T]{f: f})
5+
return s
6+
}

streams/operations.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package stream
2+
3+
type Operation[T any] interface {
4+
Apply(v T) (T, bool)
5+
IsOrdered() bool
6+
}
7+
8+
// FilterOperation filters the stream items.
9+
type FilterOperation[T any] struct {
10+
f func(T) bool
11+
}
12+
13+
// Apply implements Operation.
14+
func (op FilterOperation[T]) Apply(v T) (T, bool) {
15+
return v, op.f(v)
16+
}
17+
18+
func (op FilterOperation[T]) IsOrdered() bool {
19+
return true
20+
}

streams/stream.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package stream
2+
3+
type Stream[T any] struct {
4+
items []T
5+
operations []Operation[T]
6+
parallel bool
7+
ordered bool
8+
}
9+
10+
func Of[T any](items []T) *Stream[T] {
11+
return &Stream[T]{items: items, ordered: true}
12+
}
13+
14+
func (s *Stream[T]) Parallel() *Stream[T] {
15+
s.parallel = true
16+
return s
17+
}
18+
19+
func (s *Stream[T]) Unordered() *Stream[T] {
20+
s.ordered = false
21+
return s
22+
}

streams/stream_benchmark_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package stream
2+
3+
import (
4+
"testing"
5+
)
6+
7+
func BenchmarkSequentialFilter(b *testing.B) {
8+
// 构造大数据
9+
data := make([]int, 1_000_000)
10+
for i := 0; i < len(data); i++ {
11+
data[i] = i
12+
}
13+
14+
b.ResetTimer()
15+
for i := 0; i < b.N; i++ {
16+
_ = Of(data).
17+
Filter(func(x int) bool { return x%2 == 0 }).
18+
Collect()
19+
}
20+
}
21+
22+
func BenchmarkParallelFilterOrdered(b *testing.B) {
23+
data := make([]int, 1_000_000)
24+
for i := 0; i < len(data); i++ {
25+
data[i] = i
26+
}
27+
28+
b.ResetTimer()
29+
for i := 0; i < b.N; i++ {
30+
_ = Of(data).
31+
Parallel().
32+
Filter(func(x int) bool { return x%2 == 0 }).
33+
Collect()
34+
}
35+
}
36+
37+
func BenchmarkParallelFilterUnordered(b *testing.B) {
38+
data := make([]int, 1_000_000)
39+
for i := 0; i < len(data); i++ {
40+
data[i] = i
41+
}
42+
43+
b.ResetTimer()
44+
for i := 0; i < b.N; i++ {
45+
_ = Of(data).
46+
Parallel().
47+
Unordered().
48+
Filter(func(x int) bool { return x%2 == 0 }).
49+
Collect()
50+
}
51+
}

streams/stream_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package stream
2+
3+
import (
4+
"reflect"
5+
"testing"
6+
)
7+
8+
func TestFilter(t *testing.T) {
9+
nums := []int{1, 2, 3, 4, 5, 6}
10+
11+
// 顺序流
12+
expected := []int{2, 4, 6}
13+
result := Of(nums).
14+
Filter(func(x int) bool { return x%2 == 0 }).
15+
Collect()
16+
if !reflect.DeepEqual(result, expected) {
17+
t.Errorf("Sequential Filter failed. Got %v, want %v", result, expected)
18+
}
19+
20+
// 并行保序流
21+
resultParallel := Of(nums).
22+
Parallel().
23+
Filter(func(x int) bool { return x%2 == 0 }).
24+
Collect()
25+
if !reflect.DeepEqual(resultParallel, expected) {
26+
t.Errorf("Parallel Ordered Filter failed. Got %v, want %v", resultParallel, expected)
27+
}
28+
29+
// 并行乱序流
30+
resultUnordered := Of(nums).
31+
Parallel().
32+
Unordered().
33+
Filter(func(x int) bool { return x%2 == 0 }).
34+
Collect()
35+
// 乱序流只保证元素存在,不保证顺序
36+
if len(resultUnordered) != len(expected) {
37+
t.Errorf("Parallel Unordered Filter length mismatch. Got %v, want %v", resultUnordered, expected)
38+
}
39+
for _, v := range expected {
40+
found := false
41+
for _, u := range resultUnordered {
42+
if u == v {
43+
found = true
44+
break
45+
}
46+
}
47+
if !found {
48+
t.Errorf("Parallel Unordered Filter missing element %v", v)
49+
}
50+
}
51+
}

streams/terminal.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
package stream
2+
3+
import (
4+
"runtime"
5+
"sync"
6+
)
7+
8+
// processItem 处理单个元素,应用所有操作
9+
func processItem[T any](item T, operations []Operation[T]) (T, bool) {
10+
res := item
11+
ok := true
12+
for _, op := range operations {
13+
res, ok = op.Apply(res)
14+
if !ok {
15+
break
16+
}
17+
}
18+
return res, ok
19+
}
20+
21+
func CollectParallel[T any](s *Stream[T]) []T {
22+
if !s.ordered {
23+
// 乱序收集
24+
var wg sync.WaitGroup
25+
ch := make(chan T, len(s.items))
26+
27+
for _, item := range s.items {
28+
wg.Add(1)
29+
go func(v T) {
30+
defer wg.Done()
31+
res, ok := processItem(v, s.operations)
32+
if ok {
33+
ch <- res
34+
}
35+
}(item)
36+
}
37+
wg.Wait()
38+
close(ch)
39+
40+
result := make([]T, 0, len(s.items))
41+
for v := range ch {
42+
result = append(result, v)
43+
}
44+
return result
45+
}
46+
// 保存处理结果和是否保留的信息
47+
results := make([]T, len(s.items))
48+
keep := make([]bool, len(s.items))
49+
var wg sync.WaitGroup
50+
51+
// 确定工作协程数量 - 不超过CPU核心数或元素数量
52+
numWorkers := min(len(s.items), runtime.NumCPU())
53+
jobs := make(chan int, len(s.items))
54+
55+
for w := 0; w < numWorkers; w++ {
56+
wg.Add(1)
57+
go func() {
58+
defer wg.Done()
59+
for idx := range jobs {
60+
// 处理单个元素
61+
res, ok := processItem(s.items[idx], s.operations)
62+
// 保存结果并标记为保留
63+
if ok {
64+
results[idx] = res
65+
keep[idx] = true
66+
}
67+
}
68+
}()
69+
}
70+
71+
// 发送任务
72+
for i := range s.items {
73+
jobs <- i
74+
}
75+
close(jobs)
76+
wg.Wait()
77+
78+
// 收集保留的结果
79+
finalResult := make([]T, 0, len(s.items))
80+
for i, ok := range keep {
81+
if ok {
82+
finalResult = append(finalResult, results[i])
83+
}
84+
}
85+
return finalResult
86+
}
87+
88+
func (s *Stream[T]) Collect() []T {
89+
var result []T
90+
if !s.parallel {
91+
for _, item := range s.items {
92+
res := item
93+
ok := true
94+
for _, op := range s.operations {
95+
res, ok = op.Apply(res)
96+
if !ok {
97+
break
98+
}
99+
}
100+
if ok {
101+
result = append(result, res)
102+
}
103+
}
104+
} else {
105+
result = CollectParallel(s)
106+
}
107+
return result
108+
}

0 commit comments

Comments
 (0)