Skip to content

Commit a35d169

Browse files
lgebyungchulparkpaulmckrcu
authored andcommitted
rcu: Add basic support for kfree_rcu() batching
Recently a discussion about stability and performance of a system involving a high rate of kfree_rcu() calls surfaced on the list [1] which led to another discussion how to prepare for this situation. This patch adds basic batching support for kfree_rcu(). It is "basic" because we do none of the slab management, dynamic allocation, code moving or any of the other things, some of which previous attempts did [2]. These fancier improvements can be follow-up patches and there are different ideas being discussed in those regards. This is an effort to start simple, and build up from there. In the future, an extension to use kfree_bulk and possibly per-slab batching could be done to further improve performance due to cache-locality and slab-specific bulk free optimizations. By using an array of pointers, the worker thread processing the work would need to read lesser data since it does not need to deal with large rcu_head(s) any longer. Torture tests follow in the next patch and show improvements of around 5x reduction in number of grace periods on a 16 CPU system. More details and test data are in that patch. There is an implication with rcu_barrier() with this patch. Since the kfree_rcu() calls can be batched, and may not be handed yet to the RCU machinery in fact, the monitor may not have even run yet to do the queue_rcu_work(), there seems no easy way of implementing rcu_barrier() to wait for those kfree_rcu()s that are already made. So this means a kfree_rcu() followed by an rcu_barrier() does not imply that memory will be freed once rcu_barrier() returns. Another implication is higher active memory usage (although not run-away..) until the kfree_rcu() flooding ends, in comparison to without batching. More details about this are in the second patch which adds an rcuperf test. Finally, in the near future we will get rid of kfree_rcu() special casing within RCU such as in rcu_do_batch and switch everything to just batching. Currently we don't do that since timer subsystem is not yet up and we cannot schedule the kfree_rcu() monitor as the timer subsystem's lock are not initialized. That would also mean getting rid of kfree_call_rcu_nobatch() entirely. [1] http://lore.kernel.org/lkml/[email protected] [2] https://lkml.org/lkml/2017/12/19/824 Cc: [email protected] Cc: [email protected] Co-developed-by: Byungchul Park <[email protected]> Signed-off-by: Byungchul Park <[email protected]> Signed-off-by: Joel Fernandes (Google) <[email protected]> [ paulmck: Applied 0day and Paul Walmsley feedback on ->monitor_todo. ] [ paulmck: Make it work during early boot. ] [ paulmck: Add a crude early boot self-test. ] [ paulmck: Style adjustments and experimental docbook structure header. ] Link: https://lore.kernel.org/lkml/[email protected]/T/#me9956f66cb611b95d26ae92700e1d901f46e8c59 Signed-off-by: Paul E. McKenney <[email protected]>
1 parent e42617b commit a35d169

File tree

4 files changed

+206
-6
lines changed

4 files changed

+206
-6
lines changed

include/linux/rcutiny.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ static inline void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
3939
call_rcu(head, func);
4040
}
4141

42+
static inline void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
43+
{
44+
call_rcu(head, func);
45+
}
46+
4247
void rcu_qs(void);
4348

4449
static inline void rcu_softirq_qs(void)
@@ -85,6 +90,7 @@ static inline void rcu_scheduler_starting(void) { }
8590
static inline void rcu_end_inkernel_boot(void) { }
8691
static inline bool rcu_is_watching(void) { return true; }
8792
static inline void rcu_momentary_dyntick_idle(void) { }
93+
static inline void kfree_rcu_scheduler_running(void) { }
8894

8995
/* Avoid RCU read-side critical sections leaking across. */
9096
static inline void rcu_all_qs(void) { barrier(); }

include/linux/rcutree.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ static inline void rcu_virt_note_context_switch(int cpu)
3434

3535
void synchronize_rcu_expedited(void);
3636
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func);
37+
void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func);
3738

3839
void rcu_barrier(void);
3940
bool rcu_eqs_special_set(int cpu);
4041
void rcu_momentary_dyntick_idle(void);
42+
void kfree_rcu_scheduler_running(void);
4143
unsigned long get_state_synchronize_rcu(void);
4244
void cond_synchronize_rcu(unsigned long oldstate);
4345

kernel/rcu/tree.c

Lines changed: 188 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2683,19 +2683,187 @@ void call_rcu(struct rcu_head *head, rcu_callback_t func)
26832683
}
26842684
EXPORT_SYMBOL_GPL(call_rcu);
26852685

2686+
2687+
/* Maximum number of jiffies to wait before draining a batch. */
2688+
#define KFREE_DRAIN_JIFFIES (HZ / 50)
2689+
2690+
/**
2691+
* struct kfree_rcu_cpu - batch up kfree_rcu() requests for RCU grace period
2692+
* @rcu_work: Let queue_rcu_work() invoke workqueue handler after grace period
2693+
* @head: List of kfree_rcu() objects not yet waiting for a grace period
2694+
* @head_free: List of kfree_rcu() objects already waiting for a grace period
2695+
* @lock: Synchronize access to this structure
2696+
* @monitor_work: Promote @head to @head_free after KFREE_DRAIN_JIFFIES
2697+
* @monitor_todo: Tracks whether a @monitor_work delayed work is pending
2698+
* @initialized: The @lock and @rcu_work fields have been initialized
2699+
*
2700+
* This is a per-CPU structure. The reason that it is not included in
2701+
* the rcu_data structure is to permit this code to be extracted from
2702+
* the RCU files. Such extraction could allow further optimization of
2703+
* the interactions with the slab allocators.
2704+
*/
2705+
struct kfree_rcu_cpu {
2706+
struct rcu_work rcu_work;
2707+
struct rcu_head *head;
2708+
struct rcu_head *head_free;
2709+
spinlock_t lock;
2710+
struct delayed_work monitor_work;
2711+
int monitor_todo;
2712+
bool initialized;
2713+
};
2714+
2715+
static DEFINE_PER_CPU(struct kfree_rcu_cpu, krc);
2716+
26862717
/*
2687-
* Queue an RCU callback for lazy invocation after a grace period.
2688-
* This will likely be later named something like "call_rcu_lazy()",
2689-
* but this change will require some way of tagging the lazy RCU
2690-
* callbacks in the list of pending callbacks. Until then, this
2691-
* function may only be called from __kfree_rcu().
2718+
* This function is invoked in workqueue context after a grace period.
2719+
* It frees all the objects queued on ->head_free.
26922720
*/
2693-
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
2721+
static void kfree_rcu_work(struct work_struct *work)
2722+
{
2723+
unsigned long flags;
2724+
struct rcu_head *head, *next;
2725+
struct kfree_rcu_cpu *krcp;
2726+
2727+
krcp = container_of(to_rcu_work(work), struct kfree_rcu_cpu, rcu_work);
2728+
spin_lock_irqsave(&krcp->lock, flags);
2729+
head = krcp->head_free;
2730+
krcp->head_free = NULL;
2731+
spin_unlock_irqrestore(&krcp->lock, flags);
2732+
2733+
// List "head" is now private, so traverse locklessly.
2734+
for (; head; head = next) {
2735+
next = head->next;
2736+
// Potentially optimize with kfree_bulk in future.
2737+
__rcu_reclaim(rcu_state.name, head);
2738+
cond_resched_tasks_rcu_qs();
2739+
}
2740+
}
2741+
2742+
/*
2743+
* Schedule the kfree batch RCU work to run in workqueue context after a GP.
2744+
*
2745+
* This function is invoked by kfree_rcu_monitor() when the KFREE_DRAIN_JIFFIES
2746+
* timeout has been reached.
2747+
*/
2748+
static inline bool queue_kfree_rcu_work(struct kfree_rcu_cpu *krcp)
2749+
{
2750+
lockdep_assert_held(&krcp->lock);
2751+
2752+
// If a previous RCU batch is in progress, we cannot immediately
2753+
// queue another one, so return false to tell caller to retry.
2754+
if (krcp->head_free)
2755+
return false;
2756+
2757+
krcp->head_free = krcp->head;
2758+
krcp->head = NULL;
2759+
INIT_RCU_WORK(&krcp->rcu_work, kfree_rcu_work);
2760+
queue_rcu_work(system_wq, &krcp->rcu_work);
2761+
return true;
2762+
}
2763+
2764+
static inline void kfree_rcu_drain_unlock(struct kfree_rcu_cpu *krcp,
2765+
unsigned long flags)
2766+
{
2767+
// Attempt to start a new batch.
2768+
if (queue_kfree_rcu_work(krcp)) {
2769+
// Success! Our job is done here.
2770+
spin_unlock_irqrestore(&krcp->lock, flags);
2771+
return;
2772+
}
2773+
2774+
// Previous RCU batch still in progress, try again later.
2775+
if (!xchg(&krcp->monitor_todo, true))
2776+
schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
2777+
spin_unlock_irqrestore(&krcp->lock, flags);
2778+
}
2779+
2780+
/*
2781+
* This function is invoked after the KFREE_DRAIN_JIFFIES timeout.
2782+
* It invokes kfree_rcu_drain_unlock() to attempt to start another batch.
2783+
*/
2784+
static void kfree_rcu_monitor(struct work_struct *work)
2785+
{
2786+
unsigned long flags;
2787+
struct kfree_rcu_cpu *krcp = container_of(work, struct kfree_rcu_cpu,
2788+
monitor_work.work);
2789+
2790+
spin_lock_irqsave(&krcp->lock, flags);
2791+
if (xchg(&krcp->monitor_todo, false))
2792+
kfree_rcu_drain_unlock(krcp, flags);
2793+
else
2794+
spin_unlock_irqrestore(&krcp->lock, flags);
2795+
}
2796+
2797+
/*
2798+
* This version of kfree_call_rcu does not do batching of kfree_rcu() requests.
2799+
* Used only by rcuperf torture test for comparison with kfree_rcu_batch().
2800+
*/
2801+
void kfree_call_rcu_nobatch(struct rcu_head *head, rcu_callback_t func)
26942802
{
26952803
__call_rcu(head, func, 1);
26962804
}
2805+
EXPORT_SYMBOL_GPL(kfree_call_rcu_nobatch);
2806+
2807+
/*
2808+
* Queue a request for lazy invocation of kfree() after a grace period.
2809+
*
2810+
* Each kfree_call_rcu() request is added to a batch. The batch will be drained
2811+
* every KFREE_DRAIN_JIFFIES number of jiffies. All the objects in the batch
2812+
* will be kfree'd in workqueue context. This allows us to:
2813+
*
2814+
* 1. Batch requests together to reduce the number of grace periods during
2815+
* heavy kfree_rcu() load.
2816+
*
2817+
* 2. It makes it possible to use kfree_bulk() on a large number of
2818+
* kfree_rcu() requests thus reducing cache misses and the per-object
2819+
* overhead of kfree().
2820+
*/
2821+
void kfree_call_rcu(struct rcu_head *head, rcu_callback_t func)
2822+
{
2823+
unsigned long flags;
2824+
struct kfree_rcu_cpu *krcp;
2825+
2826+
head->func = func;
2827+
2828+
local_irq_save(flags); // For safely calling this_cpu_ptr().
2829+
krcp = this_cpu_ptr(&krc);
2830+
if (krcp->initialized)
2831+
spin_lock(&krcp->lock);
2832+
2833+
// Queue the object but don't yet schedule the batch.
2834+
head->func = func;
2835+
head->next = krcp->head;
2836+
krcp->head = head;
2837+
2838+
// Set timer to drain after KFREE_DRAIN_JIFFIES.
2839+
if (rcu_scheduler_active == RCU_SCHEDULER_RUNNING &&
2840+
!xchg(&krcp->monitor_todo, true))
2841+
schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
2842+
2843+
if (krcp->initialized)
2844+
spin_unlock(&krcp->lock);
2845+
local_irq_restore(flags);
2846+
}
26972847
EXPORT_SYMBOL_GPL(kfree_call_rcu);
26982848

2849+
void __init kfree_rcu_scheduler_running(void)
2850+
{
2851+
int cpu;
2852+
unsigned long flags;
2853+
2854+
for_each_online_cpu(cpu) {
2855+
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
2856+
2857+
spin_lock_irqsave(&krcp->lock, flags);
2858+
if (!krcp->head || xchg(&krcp->monitor_todo, true)) {
2859+
spin_unlock_irqrestore(&krcp->lock, flags);
2860+
continue;
2861+
}
2862+
schedule_delayed_work(&krcp->monitor_work, KFREE_DRAIN_JIFFIES);
2863+
spin_unlock_irqrestore(&krcp->lock, flags);
2864+
}
2865+
}
2866+
26992867
/*
27002868
* During early boot, any blocking grace-period wait automatically
27012869
* implies a grace period. Later on, this is never the case for PREEMPT.
@@ -3557,12 +3725,26 @@ static void __init rcu_dump_rcu_node_tree(void)
35573725
struct workqueue_struct *rcu_gp_wq;
35583726
struct workqueue_struct *rcu_par_gp_wq;
35593727

3728+
static void __init kfree_rcu_batch_init(void)
3729+
{
3730+
int cpu;
3731+
3732+
for_each_possible_cpu(cpu) {
3733+
struct kfree_rcu_cpu *krcp = per_cpu_ptr(&krc, cpu);
3734+
3735+
spin_lock_init(&krcp->lock);
3736+
INIT_DELAYED_WORK(&krcp->monitor_work, kfree_rcu_monitor);
3737+
krcp->initialized = true;
3738+
}
3739+
}
3740+
35603741
void __init rcu_init(void)
35613742
{
35623743
int cpu;
35633744

35643745
rcu_early_boot_tests();
35653746

3747+
kfree_rcu_batch_init();
35663748
rcu_bootup_announce();
35673749
rcu_init_geometry();
35683750
rcu_init_one();

kernel/rcu/update.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#include <linux/rcupdate_wait.h>
4141
#include <linux/sched/isolation.h>
4242
#include <linux/kprobes.h>
43+
#include <linux/slab.h>
4344

4445
#define CREATE_TRACE_POINTS
4546

@@ -218,6 +219,7 @@ static int __init rcu_set_runtime_mode(void)
218219
{
219220
rcu_test_sync_prims();
220221
rcu_scheduler_active = RCU_SCHEDULER_RUNNING;
222+
kfree_rcu_scheduler_running();
221223
rcu_test_sync_prims();
222224
return 0;
223225
}
@@ -853,14 +855,22 @@ static void test_callback(struct rcu_head *r)
853855

854856
DEFINE_STATIC_SRCU(early_srcu);
855857

858+
struct early_boot_kfree_rcu {
859+
struct rcu_head rh;
860+
};
861+
856862
static void early_boot_test_call_rcu(void)
857863
{
858864
static struct rcu_head head;
859865
static struct rcu_head shead;
866+
struct early_boot_kfree_rcu *rhp;
860867

861868
call_rcu(&head, test_callback);
862869
if (IS_ENABLED(CONFIG_SRCU))
863870
call_srcu(&early_srcu, &shead, test_callback);
871+
rhp = kmalloc(sizeof(*rhp), GFP_KERNEL);
872+
if (!WARN_ON_ONCE(!rhp))
873+
kfree_rcu(rhp, rh);
864874
}
865875

866876
void rcu_early_boot_tests(void)

0 commit comments

Comments
 (0)