Skip to content

Commit 86898fa

Browse files
committed
workqueue: Implement disable/enable for (delayed) work items
While (delayed) work items could be flushed and canceled, there was no way to prevent them from being queued in the future. While this didn't lead to functional deficiencies, it sometimes required a bit more effort from the workqueue users to e.g. sequence shutdown steps with more care. Workqueue is currently in the process of replacing tasklet which does support disabling and enabling. The feature is used relatively widely to, for example, temporarily suppress main path while a control plane operation (reset or config change) is in progress. To enable easy conversion of tasklet users and as it seems like an inherent useful feature, this patch implements disabling and enabling of work items. - A work item carries 16bit disable count in work->data while not queued. The access to the count is synchronized by the PENDING bit like all other parts of work->data. - If the count is non-zero, the work item cannot be queued. Any attempt to queue the work item fails and returns %false. - disable_work[_sync](), enable_work(), disable_delayed_work[_sync]() and enable_delayed_work() are added. v3: enable_work() was using local_irq_enable() instead of local_irq_restore() to undo IRQ-disable by work_grab_pending(). This is awkward now and will become incorrect as enable_work() will later be used from IRQ context too. (Lai) v2: Lai noticed that queue_work_node() wasn't checking the disable count. Fixed. queue_rcu_work() is updated to trigger warning if the inner work item is disabled. Signed-off-by: Tejun Heo <[email protected]> Reviewed-by: Lai Jiangshan <[email protected]>
1 parent 1211f3b commit 86898fa

File tree

2 files changed

+182
-13
lines changed

2 files changed

+182
-13
lines changed

include/linux/workqueue.h

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,23 @@ enum work_bits {
5151
* data contains off-queue information when !WORK_STRUCT_PWQ.
5252
*
5353
* MSB
54-
* [ pool ID ] [ OFFQ flags ] [ STRUCT flags ]
55-
* 1 bit 4 or 5 bits
54+
* [ pool ID ] [ disable depth ] [ OFFQ flags ] [ STRUCT flags ]
55+
* 16 bits 1 bit 4 or 5 bits
5656
*/
5757
WORK_OFFQ_FLAG_SHIFT = WORK_STRUCT_FLAG_BITS,
5858
WORK_OFFQ_CANCELING_BIT = WORK_OFFQ_FLAG_SHIFT,
5959
WORK_OFFQ_FLAG_END,
6060
WORK_OFFQ_FLAG_BITS = WORK_OFFQ_FLAG_END - WORK_OFFQ_FLAG_SHIFT,
6161

62+
WORK_OFFQ_DISABLE_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
63+
WORK_OFFQ_DISABLE_BITS = 16,
64+
6265
/*
6366
* When a work item is off queue, the high bits encode off-queue flags
6467
* and the last pool it was on. Cap pool ID to 31 bits and use the
6568
* highest number to indicate that no pool is associated.
6669
*/
67-
WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_FLAG_SHIFT + WORK_OFFQ_FLAG_BITS,
70+
WORK_OFFQ_POOL_SHIFT = WORK_OFFQ_DISABLE_SHIFT + WORK_OFFQ_DISABLE_BITS,
6871
WORK_OFFQ_LEFT = BITS_PER_LONG - WORK_OFFQ_POOL_SHIFT,
6972
WORK_OFFQ_POOL_BITS = WORK_OFFQ_LEFT <= 31 ? WORK_OFFQ_LEFT : 31,
7073
};
@@ -98,6 +101,7 @@ enum wq_misc_consts {
98101
/* Convenience constants - of type 'unsigned long', not 'enum'! */
99102
#define WORK_OFFQ_CANCELING (1ul << WORK_OFFQ_CANCELING_BIT)
100103
#define WORK_OFFQ_FLAG_MASK (((1ul << WORK_OFFQ_FLAG_BITS) - 1) << WORK_OFFQ_FLAG_SHIFT)
104+
#define WORK_OFFQ_DISABLE_MASK (((1ul << WORK_OFFQ_DISABLE_BITS) - 1) << WORK_OFFQ_DISABLE_SHIFT)
101105
#define WORK_OFFQ_POOL_NONE ((1ul << WORK_OFFQ_POOL_BITS) - 1)
102106
#define WORK_STRUCT_NO_POOL (WORK_OFFQ_POOL_NONE << WORK_OFFQ_POOL_SHIFT)
103107
#define WORK_STRUCT_PWQ_MASK (~((1ul << WORK_STRUCT_PWQ_SHIFT) - 1))
@@ -560,6 +564,14 @@ extern bool flush_delayed_work(struct delayed_work *dwork);
560564
extern bool cancel_delayed_work(struct delayed_work *dwork);
561565
extern bool cancel_delayed_work_sync(struct delayed_work *dwork);
562566

567+
extern bool disable_work(struct work_struct *work);
568+
extern bool disable_work_sync(struct work_struct *work);
569+
extern bool enable_work(struct work_struct *work);
570+
571+
extern bool disable_delayed_work(struct delayed_work *dwork);
572+
extern bool disable_delayed_work_sync(struct delayed_work *dwork);
573+
extern bool enable_delayed_work(struct delayed_work *dwork);
574+
563575
extern bool flush_rcu_work(struct rcu_work *rwork);
564576

565577
extern void workqueue_set_max_active(struct workqueue_struct *wq,

kernel/workqueue.c

Lines changed: 167 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@ enum worker_flags {
9999

100100
enum work_cancel_flags {
101101
WORK_CANCEL_DELAYED = 1 << 0, /* canceling a delayed_work */
102+
WORK_CANCEL_DISABLE = 1 << 1, /* canceling to disable */
102103
};
103104

104105
enum wq_internal_consts {
@@ -394,6 +395,7 @@ struct wq_pod_type {
394395

395396
struct work_offq_data {
396397
u32 pool_id;
398+
u32 disable;
397399
u32 flags;
398400
};
399401

@@ -908,12 +910,15 @@ static void work_offqd_unpack(struct work_offq_data *offqd, unsigned long data)
908910

909911
offqd->pool_id = shift_and_mask(data, WORK_OFFQ_POOL_SHIFT,
910912
WORK_OFFQ_POOL_BITS);
913+
offqd->disable = shift_and_mask(data, WORK_OFFQ_DISABLE_SHIFT,
914+
WORK_OFFQ_DISABLE_BITS);
911915
offqd->flags = data & WORK_OFFQ_FLAG_MASK;
912916
}
913917

914918
static unsigned long work_offqd_pack_flags(struct work_offq_data *offqd)
915919
{
916-
return (unsigned long)offqd->flags;
920+
return ((unsigned long)offqd->disable << WORK_OFFQ_DISABLE_SHIFT) |
921+
((unsigned long)offqd->flags);
917922
}
918923

919924
static bool work_is_canceling(struct work_struct *work)
@@ -2408,6 +2413,21 @@ static void __queue_work(int cpu, struct workqueue_struct *wq,
24082413
rcu_read_unlock();
24092414
}
24102415

2416+
static bool clear_pending_if_disabled(struct work_struct *work)
2417+
{
2418+
unsigned long data = *work_data_bits(work);
2419+
struct work_offq_data offqd;
2420+
2421+
if (likely((data & WORK_STRUCT_PWQ) ||
2422+
!(data & WORK_OFFQ_DISABLE_MASK)))
2423+
return false;
2424+
2425+
work_offqd_unpack(&offqd, data);
2426+
set_work_pool_and_clear_pending(work, offqd.pool_id,
2427+
work_offqd_pack_flags(&offqd));
2428+
return true;
2429+
}
2430+
24112431
/**
24122432
* queue_work_on - queue work on specific cpu
24132433
* @cpu: CPU number to execute work on
@@ -2430,7 +2450,8 @@ bool queue_work_on(int cpu, struct workqueue_struct *wq,
24302450

24312451
local_irq_save(irq_flags);
24322452

2433-
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
2453+
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
2454+
!clear_pending_if_disabled(work)) {
24342455
__queue_work(cpu, wq, work);
24352456
ret = true;
24362457
}
@@ -2508,7 +2529,8 @@ bool queue_work_node(int node, struct workqueue_struct *wq,
25082529

25092530
local_irq_save(irq_flags);
25102531

2511-
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
2532+
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
2533+
!clear_pending_if_disabled(work)) {
25122534
int cpu = select_numa_node_cpu(node);
25132535

25142536
__queue_work(cpu, wq, work);
@@ -2590,7 +2612,8 @@ bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
25902612
/* read the comment in __queue_work() */
25912613
local_irq_save(irq_flags);
25922614

2593-
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
2615+
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
2616+
!clear_pending_if_disabled(work)) {
25942617
__queue_delayed_work(cpu, wq, dwork, delay);
25952618
ret = true;
25962619
}
@@ -2663,7 +2686,12 @@ bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
26632686
{
26642687
struct work_struct *work = &rwork->work;
26652688

2666-
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
2689+
/*
2690+
* rcu_work can't be canceled or disabled. Warn if the user reached
2691+
* inside @rwork and disabled the inner work.
2692+
*/
2693+
if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)) &&
2694+
!WARN_ON_ONCE(clear_pending_if_disabled(work))) {
26672695
rwork->wq = wq;
26682696
call_rcu_hurry(&rwork->rcu, rcu_work_rcufn);
26692697
return true;
@@ -4268,20 +4296,46 @@ bool flush_rcu_work(struct rcu_work *rwork)
42684296
}
42694297
EXPORT_SYMBOL(flush_rcu_work);
42704298

4299+
static void work_offqd_disable(struct work_offq_data *offqd)
4300+
{
4301+
const unsigned long max = (1lu << WORK_OFFQ_DISABLE_BITS) - 1;
4302+
4303+
if (likely(offqd->disable < max))
4304+
offqd->disable++;
4305+
else
4306+
WARN_ONCE(true, "workqueue: work disable count overflowed\n");
4307+
}
4308+
4309+
static void work_offqd_enable(struct work_offq_data *offqd)
4310+
{
4311+
if (likely(offqd->disable > 0))
4312+
offqd->disable--;
4313+
else
4314+
WARN_ONCE(true, "workqueue: work disable count underflowed\n");
4315+
}
4316+
42714317
static bool __cancel_work(struct work_struct *work, u32 cflags)
42724318
{
42734319
struct work_offq_data offqd;
42744320
unsigned long irq_flags;
42754321
int ret;
42764322

4277-
do {
4278-
ret = try_to_grab_pending(work, cflags, &irq_flags);
4279-
} while (unlikely(ret == -EAGAIN));
4323+
if (cflags & WORK_CANCEL_DISABLE) {
4324+
ret = work_grab_pending(work, cflags, &irq_flags);
4325+
} else {
4326+
do {
4327+
ret = try_to_grab_pending(work, cflags, &irq_flags);
4328+
} while (unlikely(ret == -EAGAIN));
42804329

4281-
if (unlikely(ret < 0))
4282-
return false;
4330+
if (unlikely(ret < 0))
4331+
return false;
4332+
}
42834333

42844334
work_offqd_unpack(&offqd, *work_data_bits(work));
4335+
4336+
if (cflags & WORK_CANCEL_DISABLE)
4337+
work_offqd_disable(&offqd);
4338+
42854339
set_work_pool_and_clear_pending(work, offqd.pool_id,
42864340
work_offqd_pack_flags(&offqd));
42874341
local_irq_restore(irq_flags);
@@ -4298,6 +4352,10 @@ static bool __cancel_work_sync(struct work_struct *work, u32 cflags)
42984352
ret = work_grab_pending(work, cflags, &irq_flags);
42994353

43004354
work_offqd_unpack(&offqd, *work_data_bits(work));
4355+
4356+
if (cflags & WORK_CANCEL_DISABLE)
4357+
work_offqd_disable(&offqd);
4358+
43014359
offqd.flags |= WORK_OFFQ_CANCELING;
43024360
set_work_pool_and_keep_pending(work, offqd.pool_id,
43034361
work_offqd_pack_flags(&offqd));
@@ -4397,6 +4455,105 @@ bool cancel_delayed_work_sync(struct delayed_work *dwork)
43974455
}
43984456
EXPORT_SYMBOL(cancel_delayed_work_sync);
43994457

4458+
/**
4459+
* disable_work - Disable and cancel a work item
4460+
* @work: work item to disable
4461+
*
4462+
* Disable @work by incrementing its disable count and cancel it if currently
4463+
* pending. As long as the disable count is non-zero, any attempt to queue @work
4464+
* will fail and return %false. The maximum supported disable depth is 2 to the
4465+
* power of %WORK_OFFQ_DISABLE_BITS, currently 65536.
4466+
*
4467+
* Must be called from a sleepable context. Returns %true if @work was pending,
4468+
* %false otherwise.
4469+
*/
4470+
bool disable_work(struct work_struct *work)
4471+
{
4472+
return __cancel_work(work, WORK_CANCEL_DISABLE);
4473+
}
4474+
EXPORT_SYMBOL_GPL(disable_work);
4475+
4476+
/**
4477+
* disable_work_sync - Disable, cancel and drain a work item
4478+
* @work: work item to disable
4479+
*
4480+
* Similar to disable_work() but also wait for @work to finish if currently
4481+
* executing.
4482+
*
4483+
* Must be called from a sleepable context. Returns %true if @work was pending,
4484+
* %false otherwise.
4485+
*/
4486+
bool disable_work_sync(struct work_struct *work)
4487+
{
4488+
return __cancel_work_sync(work, WORK_CANCEL_DISABLE);
4489+
}
4490+
EXPORT_SYMBOL_GPL(disable_work_sync);
4491+
4492+
/**
4493+
* enable_work - Enable a work item
4494+
* @work: work item to enable
4495+
*
4496+
* Undo disable_work[_sync]() by decrementing @work's disable count. @work can
4497+
* only be queued if its disable count is 0.
4498+
*
4499+
* Must be called from a sleepable context. Returns %true if the disable count
4500+
* reached 0. Otherwise, %false.
4501+
*/
4502+
bool enable_work(struct work_struct *work)
4503+
{
4504+
struct work_offq_data offqd;
4505+
unsigned long irq_flags;
4506+
4507+
work_grab_pending(work, 0, &irq_flags);
4508+
4509+
work_offqd_unpack(&offqd, *work_data_bits(work));
4510+
work_offqd_enable(&offqd);
4511+
set_work_pool_and_clear_pending(work, offqd.pool_id,
4512+
work_offqd_pack_flags(&offqd));
4513+
local_irq_restore(irq_flags);
4514+
4515+
return !offqd.disable;
4516+
}
4517+
EXPORT_SYMBOL_GPL(enable_work);
4518+
4519+
/**
4520+
* disable_delayed_work - Disable and cancel a delayed work item
4521+
* @dwork: delayed work item to disable
4522+
*
4523+
* disable_work() for delayed work items.
4524+
*/
4525+
bool disable_delayed_work(struct delayed_work *dwork)
4526+
{
4527+
return __cancel_work(&dwork->work,
4528+
WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
4529+
}
4530+
EXPORT_SYMBOL_GPL(disable_delayed_work);
4531+
4532+
/**
4533+
* disable_delayed_work_sync - Disable, cancel and drain a delayed work item
4534+
* @dwork: delayed work item to disable
4535+
*
4536+
* disable_work_sync() for delayed work items.
4537+
*/
4538+
bool disable_delayed_work_sync(struct delayed_work *dwork)
4539+
{
4540+
return __cancel_work_sync(&dwork->work,
4541+
WORK_CANCEL_DELAYED | WORK_CANCEL_DISABLE);
4542+
}
4543+
EXPORT_SYMBOL_GPL(disable_delayed_work_sync);
4544+
4545+
/**
4546+
* enable_delayed_work - Enable a delayed work item
4547+
* @dwork: delayed work item to enable
4548+
*
4549+
* enable_work() for delayed work items.
4550+
*/
4551+
bool enable_delayed_work(struct delayed_work *dwork)
4552+
{
4553+
return enable_work(&dwork->work);
4554+
}
4555+
EXPORT_SYMBOL_GPL(enable_delayed_work);
4556+
44004557
/**
44014558
* schedule_on_each_cpu - execute a function synchronously on each online CPU
44024559
* @func: the function to call

0 commit comments

Comments
 (0)