Skip to content

Commit 03086a2

Browse files
committed
feat: add code example of using workerpool and ants package
1 parent 04f1072 commit 03086a2

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/distribution/reference v0.5.0
2121
github.com/envoyproxy/protoc-gen-validate v1.0.2
2222
github.com/fatih/color v1.15.0
23+
github.com/gammazero/workerpool v1.1.3
2324
github.com/ghodss/yaml v1.0.0
2425
github.com/gin-contrib/pprof v1.4.0
2526
github.com/gin-gonic/gin v1.8.1
@@ -50,6 +51,7 @@ require (
5051
github.com/olekukonko/tablewriter v0.0.5
5152
github.com/onsi/ginkgo/v2 v2.13.0
5253
github.com/onsi/gomega v1.29.0
54+
github.com/panjf2000/ants/v2 v2.9.1
5355
github.com/parnurzeal/gorequest v0.2.16
5456
github.com/prometheus/client_golang v1.17.0
5557
github.com/redis/go-redis/extra/rediscensus/v9 v9.0.5
@@ -113,6 +115,7 @@ require (
113115
github.com/elazarl/goproxy v0.0.0-20210110162100-a92cc753f88e // indirect
114116
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
115117
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
118+
github.com/gammazero/deque v0.2.0 // indirect
116119
github.com/gin-contrib/sse v0.1.0 // indirect
117120
github.com/glebarez/go-sqlite v1.19.1 // indirect
118121
github.com/glebarez/sqlite v1.5.0 // indirect

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
789789
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
790790
github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA=
791791
github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM=
792+
github.com/gammazero/deque v0.2.0 h1:SkieyNB4bg2/uZZLxvya0Pq6diUlwx7m2TeT7GAIWaA=
793+
github.com/gammazero/deque v0.2.0/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU=
794+
github.com/gammazero/workerpool v1.1.3 h1:WixN4xzukFoN0XSeXF6puqEqFTl2mECI9S6W44HWy9Q=
795+
github.com/gammazero/workerpool v1.1.3/go.mod h1:wPjyBLDbyKnUn2XwwyD3EEwo9dHutia9/fwNmSHWACc=
792796
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
793797
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
794798
github.com/gin-contrib/pprof v1.4.0 h1:XxiBSf5jWZ5i16lNOPbMTVdgHBdhfGRD5PZ1LWazzvg=
@@ -1308,6 +1312,8 @@ github.com/onsi/gomega v1.29.0 h1:KIA/t2t5UBzoirT4H9tsML45GEbo3ouUnBHsCfD2tVg=
13081312
github.com/onsi/gomega v1.29.0/go.mod h1:9sxs+SwGrKI0+PWe4Fxa9tFQQBG5xSsSbMXOI8PPpoQ=
13091313
github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=
13101314
github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
1315+
github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8Kw=
1316+
github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
13111317
github.com/parnurzeal/gorequest v0.2.16 h1:T/5x+/4BT+nj+3eSknXmCTnEVGSzFzPGdpqmUVVZXHQ=
13121318
github.com/parnurzeal/gorequest v0.2.16/go.mod h1:3Kh2QUMJoqw3icWAecsyzkpY7UzRfDhbRdTjtNwNiUE=
13131319
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
@@ -1464,6 +1470,7 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
14641470
github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals=
14651471
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
14661472
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
1473+
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
14671474
github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
14681475
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
14691476
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
@@ -1781,6 +1788,7 @@ golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJ
17811788
golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
17821789
golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
17831790
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
1791+
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
17841792
golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE=
17851793
golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
17861794
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

internal/usercenter/biz/user/list.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ import (
1010
"context"
1111
"sync"
1212

13+
"github.com/gammazero/workerpool"
1314
"github.com/jinzhu/copier"
15+
"github.com/panjf2000/ants/v2"
1416
"golang.org/x/sync/errgroup"
1517
"google.golang.org/protobuf/types/known/timestamppb"
1618

@@ -68,6 +70,99 @@ func (b *userBiz) List(ctx context.Context, rq *v1.ListUserRequest) (*v1.ListUse
6870
return &v1.ListUserResponse{TotalCount: count, Users: users}, nil
6971
}
7072

73+
// ListWithWorkerPool retrieves a list of all users from the database use workerpool package.
74+
// Concurrency limits can effectively protect downstream services and control the resource
75+
// consumption of components.
76+
func (b *userBiz) ListWithWorkerPool(ctx context.Context, rq *v1.ListUserRequest) (*v1.ListUserResponse, error) {
77+
count, list, err := b.ds.Users().List(ctx, meta.WithOffset(rq.Offset), meta.WithLimit(rq.Limit))
78+
if err != nil {
79+
log.C(ctx).Errorw(err, "Failed to list users from storage")
80+
return nil, err
81+
}
82+
83+
var m sync.Map
84+
wp := workerpool.New(100)
85+
86+
// Use goroutine to improve interface performance
87+
for _, user := range list {
88+
wp.Submit(func() {
89+
count, _, err := b.ds.Secrets().List(ctx, user.UserID)
90+
if err != nil {
91+
log.C(ctx).Errorw(err, "Failed to list secrets")
92+
return
93+
}
94+
95+
u := ModelToReply(user)
96+
u.Secrets = count
97+
m.Store(user.ID, u)
98+
99+
return
100+
})
101+
}
102+
103+
wp.StopWait()
104+
105+
// The following code block is used to maintain the consistency of query order.
106+
users := make([]*v1.UserReply, 0, len(list))
107+
for _, item := range list {
108+
user, _ := m.Load(item.ID)
109+
users = append(users, user.(*v1.UserReply))
110+
}
111+
112+
log.C(ctx).Debugw("Get users from backend storage", "count", len(users))
113+
114+
return &v1.ListUserResponse{TotalCount: count, Users: users}, nil
115+
}
116+
117+
// ListWithAnts retrieves a list of all users from the database use ants package.
118+
// Concurrency limits can effectively protect downstream services and control the
119+
// resource consumption of components.
120+
func (b *userBiz) ListWithAnts(ctx context.Context, rq *v1.ListUserRequest) (*v1.ListUserResponse, error) {
121+
count, list, err := b.ds.Users().List(ctx, meta.WithOffset(rq.Offset), meta.WithLimit(rq.Limit))
122+
if err != nil {
123+
log.C(ctx).Errorw(err, "Failed to list users from storage")
124+
return nil, err
125+
}
126+
127+
var m sync.Map
128+
var wg sync.WaitGroup
129+
pool, _ := ants.NewPool(100)
130+
defer pool.Release()
131+
132+
// Use goroutine to improve interface performance
133+
for _, user := range list {
134+
wg.Add(1)
135+
_ = pool.Submit(func() {
136+
defer wg.Done()
137+
138+
count, _, err := b.ds.Secrets().List(ctx, user.UserID)
139+
if err != nil {
140+
log.C(ctx).Errorw(err, "Failed to list secrets")
141+
return
142+
}
143+
144+
u := ModelToReply(user)
145+
u.Secrets = count
146+
m.Store(user.ID, u)
147+
148+
return
149+
})
150+
}
151+
152+
wg.Wait()
153+
154+
// The following code block is used to maintain the consistency of query order.
155+
users := make([]*v1.UserReply, 0, len(list))
156+
for _, item := range list {
157+
user, _ := m.Load(item.ID)
158+
users = append(users, user.(*v1.UserReply))
159+
}
160+
161+
log.C(ctx).Debugw("Get users from backend storage", "count", len(users))
162+
163+
return &v1.ListUserResponse{TotalCount: count, Users: users}, nil
164+
}
165+
71166
// ListWithBadPerformance is a poor performance implementation of List.
72167
func (b *userBiz) ListWithBadPerformance(ctx context.Context, rq *v1.ListUserRequest) (*v1.ListUserResponse, error) {
73168
count, list, err := b.ds.Users().List(ctx, meta.WithOffset(rq.Offset), meta.WithLimit(rq.Limit))

0 commit comments

Comments
 (0)