@@ -130,6 +130,7 @@ struct io_cb_cancel_data {
130
130
};
131
131
132
132
static void create_io_worker (struct io_wq * wq , struct io_wqe * wqe , int index );
133
+ static void io_wqe_dec_running (struct io_worker * worker );
133
134
134
135
static bool io_worker_get (struct io_worker * worker )
135
136
{
@@ -168,26 +169,21 @@ static void io_worker_exit(struct io_worker *worker)
168
169
{
169
170
struct io_wqe * wqe = worker -> wqe ;
170
171
struct io_wqe_acct * acct = io_wqe_get_acct (worker );
171
- unsigned flags ;
172
172
173
173
if (refcount_dec_and_test (& worker -> ref ))
174
174
complete (& worker -> ref_done );
175
175
wait_for_completion (& worker -> ref_done );
176
176
177
- preempt_disable ();
178
- current -> flags &= ~PF_IO_WORKER ;
179
- flags = worker -> flags ;
180
- worker -> flags = 0 ;
181
- if (flags & IO_WORKER_F_RUNNING )
182
- atomic_dec (& acct -> nr_running );
183
- worker -> flags = 0 ;
184
- preempt_enable ();
185
-
186
177
raw_spin_lock_irq (& wqe -> lock );
187
- if (flags & IO_WORKER_F_FREE )
178
+ if (worker -> flags & IO_WORKER_F_FREE )
188
179
hlist_nulls_del_rcu (& worker -> nulls_node );
189
180
list_del_rcu (& worker -> all_list );
190
181
acct -> nr_workers -- ;
182
+ preempt_disable ();
183
+ io_wqe_dec_running (worker );
184
+ worker -> flags = 0 ;
185
+ current -> flags &= ~PF_IO_WORKER ;
186
+ preempt_enable ();
191
187
raw_spin_unlock_irq (& wqe -> lock );
192
188
193
189
kfree_rcu (worker , rcu );
@@ -214,15 +210,19 @@ static bool io_wqe_activate_free_worker(struct io_wqe *wqe)
214
210
struct hlist_nulls_node * n ;
215
211
struct io_worker * worker ;
216
212
217
- n = rcu_dereference (hlist_nulls_first_rcu (& wqe -> free_list ));
218
- if (is_a_nulls (n ))
219
- return false;
220
-
221
- worker = hlist_nulls_entry (n , struct io_worker , nulls_node );
222
- if (io_worker_get (worker )) {
223
- wake_up_process (worker -> task );
213
+ /*
214
+ * Iterate free_list and see if we can find an idle worker to
215
+ * activate. If a given worker is on the free_list but in the process
216
+ * of exiting, keep trying.
217
+ */
218
+ hlist_nulls_for_each_entry_rcu (worker , n , & wqe -> free_list , nulls_node ) {
219
+ if (!io_worker_get (worker ))
220
+ continue ;
221
+ if (wake_up_process (worker -> task )) {
222
+ io_worker_release (worker );
223
+ return true;
224
+ }
224
225
io_worker_release (worker );
225
- return true;
226
226
}
227
227
228
228
return false;
@@ -247,10 +247,19 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
247
247
ret = io_wqe_activate_free_worker (wqe );
248
248
rcu_read_unlock ();
249
249
250
- if (!ret && acct -> nr_workers < acct -> max_workers ) {
251
- atomic_inc (& acct -> nr_running );
252
- atomic_inc (& wqe -> wq -> worker_refs );
253
- create_io_worker (wqe -> wq , wqe , acct -> index );
250
+ if (!ret ) {
251
+ bool do_create = false;
252
+
253
+ raw_spin_lock_irq (& wqe -> lock );
254
+ if (acct -> nr_workers < acct -> max_workers ) {
255
+ atomic_inc (& acct -> nr_running );
256
+ atomic_inc (& wqe -> wq -> worker_refs );
257
+ acct -> nr_workers ++ ;
258
+ do_create = true;
259
+ }
260
+ raw_spin_unlock_irq (& wqe -> lock );
261
+ if (do_create )
262
+ create_io_worker (wqe -> wq , wqe , acct -> index );
254
263
}
255
264
}
256
265
@@ -271,9 +280,17 @@ static void create_worker_cb(struct callback_head *cb)
271
280
{
272
281
struct create_worker_data * cwd ;
273
282
struct io_wq * wq ;
283
+ struct io_wqe * wqe ;
284
+ struct io_wqe_acct * acct ;
274
285
275
286
cwd = container_of (cb , struct create_worker_data , work );
276
- wq = cwd -> wqe -> wq ;
287
+ wqe = cwd -> wqe ;
288
+ wq = wqe -> wq ;
289
+ acct = & wqe -> acct [cwd -> index ];
290
+ raw_spin_lock_irq (& wqe -> lock );
291
+ if (acct -> nr_workers < acct -> max_workers )
292
+ acct -> nr_workers ++ ;
293
+ raw_spin_unlock_irq (& wqe -> lock );
277
294
create_io_worker (wq , cwd -> wqe , cwd -> index );
278
295
kfree (cwd );
279
296
}
@@ -635,6 +652,9 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
635
652
kfree (worker );
636
653
fail :
637
654
atomic_dec (& acct -> nr_running );
655
+ raw_spin_lock_irq (& wqe -> lock );
656
+ acct -> nr_workers -- ;
657
+ raw_spin_unlock_irq (& wqe -> lock );
638
658
io_worker_ref_put (wq );
639
659
return ;
640
660
}
@@ -650,9 +670,8 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
650
670
worker -> flags |= IO_WORKER_F_FREE ;
651
671
if (index == IO_WQ_ACCT_BOUND )
652
672
worker -> flags |= IO_WORKER_F_BOUND ;
653
- if (! acct -> nr_workers && (worker -> flags & IO_WORKER_F_BOUND ))
673
+ if (( acct -> nr_workers == 1 ) && (worker -> flags & IO_WORKER_F_BOUND ))
654
674
worker -> flags |= IO_WORKER_F_FIXED ;
655
- acct -> nr_workers ++ ;
656
675
raw_spin_unlock_irq (& wqe -> lock );
657
676
wake_up_new_task (tsk );
658
677
}
0 commit comments