Skip to content

Commit 5f72e52

Browse files
jognesspmladek
authored andcommitted
printk: ringbuffer: Do not skip non-finalized records with prb_next_seq()
Commit f244b4d ("printk: ringbuffer: Improve prb_next_seq() performance") introduced an optimization for prb_next_seq() by using best-effort to track recently finalized records. However, the order of finalization does not necessarily match the order of the records. The optimization changed prb_next_seq() to return inconsistent results, possibly yielding sequence numbers that are not available to readers because they are preceded by non-finalized records or they are not yet visible to the reader CPU. Rather than simply best-effort tracking recently finalized records, force the committing writer to read records and increment the last "contiguous block" of finalized records. In order to do this, the sequence number instead of ID must be stored because ID's cannot be directly compared. A new memory barrier pair is introduced to guarantee that a reader can always read the records up until the sequence number returned by prb_next_seq() (unless the records have since been overwritten in the ringbuffer). This restores the original functionality of prb_next_seq() while also keeping the optimization. For 32bit systems, only the lower 32 bits of the sequence number are stored. When reading the value, it is expanded to the full 64bit sequence number using the 32bit seq macros, which fold in the value returned by prb_first_seq(). Fixes: f244b4d ("printk: ringbuffer: Improve prb_next_seq() performance") Signed-off-by: John Ogness <[email protected]> Reviewed-by: Petr Mladek <[email protected]> Link: https://lore.kernel.org/r/[email protected] Signed-off-by: Petr Mladek <[email protected]>
1 parent 90ad525 commit 5f72e52

File tree

2 files changed

+127
-41
lines changed

2 files changed

+127
-41
lines changed

kernel/printk/printk_ringbuffer.c

Lines changed: 125 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
#include <linux/errno.h>
77
#include <linux/bug.h>
88
#include "printk_ringbuffer.h"
9+
#include "internal.h"
910

1011
/**
1112
* DOC: printk_ringbuffer overview
@@ -303,6 +304,9 @@
303304
*
304305
* desc_push_tail:B / desc_reserve:D
305306
* set descriptor reusable (state), then push descriptor tail (id)
307+
*
308+
* desc_update_last_finalized:A / desc_last_finalized_seq:A
309+
* store finalized record, then set new highest finalized sequence number
306310
*/
307311

308312
#define DATA_SIZE(data_ring) _DATA_SIZE((data_ring)->size_bits)
@@ -1441,20 +1445,118 @@ bool prb_reserve_in_last(struct prb_reserved_entry *e, struct printk_ringbuffer
14411445
return false;
14421446
}
14431447

1448+
/*
1449+
* @last_finalized_seq value guarantees that all records up to and including
1450+
* this sequence number are finalized and can be read. The only exception are
1451+
* too old records which have already been overwritten.
1452+
*
1453+
* It is also guaranteed that @last_finalized_seq only increases.
1454+
*
1455+
* Be aware that finalized records following non-finalized records are not
1456+
* reported because they are not yet available to the reader. For example,
1457+
* a new record stored via printk() will not be available to a printer if
1458+
* it follows a record that has not been finalized yet. However, once that
1459+
* non-finalized record becomes finalized, @last_finalized_seq will be
1460+
* appropriately updated and the full set of finalized records will be
1461+
* available to the printer. And since each printk() caller will either
1462+
* directly print or trigger deferred printing of all available unprinted
1463+
* records, all printk() messages will get printed.
1464+
*/
1465+
static u64 desc_last_finalized_seq(struct printk_ringbuffer *rb)
1466+
{
1467+
struct prb_desc_ring *desc_ring = &rb->desc_ring;
1468+
unsigned long ulseq;
1469+
1470+
/*
1471+
* Guarantee the sequence number is loaded before loading the
1472+
* associated record in order to guarantee that the record can be
1473+
* seen by this CPU. This pairs with desc_update_last_finalized:A.
1474+
*/
1475+
ulseq = atomic_long_read_acquire(&desc_ring->last_finalized_seq
1476+
); /* LMM(desc_last_finalized_seq:A) */
1477+
1478+
return __ulseq_to_u64seq(rb, ulseq);
1479+
}
1480+
1481+
static bool _prb_read_valid(struct printk_ringbuffer *rb, u64 *seq,
1482+
struct printk_record *r, unsigned int *line_count);
1483+
1484+
/*
1485+
* Check if there are records directly following @last_finalized_seq that are
1486+
* finalized. If so, update @last_finalized_seq to the latest of these
1487+
* records. It is not allowed to skip over records that are not yet finalized.
1488+
*/
1489+
static void desc_update_last_finalized(struct printk_ringbuffer *rb)
1490+
{
1491+
struct prb_desc_ring *desc_ring = &rb->desc_ring;
1492+
u64 old_seq = desc_last_finalized_seq(rb);
1493+
unsigned long oldval;
1494+
unsigned long newval;
1495+
u64 finalized_seq;
1496+
u64 try_seq;
1497+
1498+
try_again:
1499+
finalized_seq = old_seq;
1500+
try_seq = finalized_seq + 1;
1501+
1502+
/* Try to find later finalized records. */
1503+
while (_prb_read_valid(rb, &try_seq, NULL, NULL)) {
1504+
finalized_seq = try_seq;
1505+
try_seq++;
1506+
}
1507+
1508+
/* No update needed if no later finalized record was found. */
1509+
if (finalized_seq == old_seq)
1510+
return;
1511+
1512+
oldval = __u64seq_to_ulseq(old_seq);
1513+
newval = __u64seq_to_ulseq(finalized_seq);
1514+
1515+
/*
1516+
* Set the sequence number of a later finalized record that has been
1517+
* seen.
1518+
*
1519+
* Guarantee the record data is visible to other CPUs before storing
1520+
* its sequence number. This pairs with desc_last_finalized_seq:A.
1521+
*
1522+
* Memory barrier involvement:
1523+
*
1524+
* If desc_last_finalized_seq:A reads from
1525+
* desc_update_last_finalized:A, then desc_read:A reads from
1526+
* _prb_commit:B.
1527+
*
1528+
* Relies on:
1529+
*
1530+
* RELEASE from _prb_commit:B to desc_update_last_finalized:A
1531+
* matching
1532+
* ACQUIRE from desc_last_finalized_seq:A to desc_read:A
1533+
*
1534+
* Note: _prb_commit:B and desc_update_last_finalized:A can be
1535+
* different CPUs. However, the desc_update_last_finalized:A
1536+
* CPU (which performs the release) must have previously seen
1537+
* _prb_commit:B.
1538+
*/
1539+
if (!atomic_long_try_cmpxchg_release(&desc_ring->last_finalized_seq,
1540+
&oldval, newval)) { /* LMM(desc_update_last_finalized:A) */
1541+
old_seq = __ulseq_to_u64seq(rb, oldval);
1542+
goto try_again;
1543+
}
1544+
}
1545+
14441546
/*
14451547
* Attempt to finalize a specified descriptor. If this fails, the descriptor
14461548
* is either already final or it will finalize itself when the writer commits.
14471549
*/
1448-
static void desc_make_final(struct prb_desc_ring *desc_ring, unsigned long id)
1550+
static void desc_make_final(struct printk_ringbuffer *rb, unsigned long id)
14491551
{
1552+
struct prb_desc_ring *desc_ring = &rb->desc_ring;
14501553
unsigned long prev_state_val = DESC_SV(id, desc_committed);
14511554
struct prb_desc *d = to_desc(desc_ring, id);
14521555

1453-
atomic_long_cmpxchg_relaxed(&d->state_var, prev_state_val,
1454-
DESC_SV(id, desc_finalized)); /* LMM(desc_make_final:A) */
1455-
1456-
/* Best effort to remember the last finalized @id. */
1457-
atomic_long_set(&desc_ring->last_finalized_id, id);
1556+
if (atomic_long_try_cmpxchg_relaxed(&d->state_var, &prev_state_val,
1557+
DESC_SV(id, desc_finalized))) { /* LMM(desc_make_final:A) */
1558+
desc_update_last_finalized(rb);
1559+
}
14581560
}
14591561

14601562
/**
@@ -1550,7 +1652,7 @@ bool prb_reserve(struct prb_reserved_entry *e, struct printk_ringbuffer *rb,
15501652
* readers. (For seq==0 there is no previous descriptor.)
15511653
*/
15521654
if (info->seq > 0)
1553-
desc_make_final(desc_ring, DESC_ID(id - 1));
1655+
desc_make_final(rb, DESC_ID(id - 1));
15541656

15551657
r->text_buf = data_alloc(rb, r->text_buf_size, &d->text_blk_lpos, id);
15561658
/* If text data allocation fails, a data-less record is committed. */
@@ -1643,7 +1745,7 @@ void prb_commit(struct prb_reserved_entry *e)
16431745
*/
16441746
head_id = atomic_long_read(&desc_ring->head_id); /* LMM(prb_commit:A) */
16451747
if (head_id != e->id)
1646-
desc_make_final(desc_ring, e->id);
1748+
desc_make_final(e->rb, e->id);
16471749
}
16481750

16491751
/**
@@ -1663,12 +1765,9 @@ void prb_commit(struct prb_reserved_entry *e)
16631765
*/
16641766
void prb_final_commit(struct prb_reserved_entry *e)
16651767
{
1666-
struct prb_desc_ring *desc_ring = &e->rb->desc_ring;
1667-
16681768
_prb_commit(e, desc_finalized);
16691769

1670-
/* Best effort to remember the last finalized @id. */
1671-
atomic_long_set(&desc_ring->last_finalized_id, e->id);
1770+
desc_update_last_finalized(e->rb);
16721771
}
16731772

16741773
/*
@@ -2008,42 +2107,29 @@ u64 prb_first_valid_seq(struct printk_ringbuffer *rb)
20082107
* newest sequence number available to readers will be.
20092108
*
20102109
* This provides readers a sequence number to jump to if all currently
2011-
* available records should be skipped.
2110+
* available records should be skipped. It is guaranteed that all records
2111+
* previous to the returned value have been finalized and are (or were)
2112+
* available to the reader.
20122113
*
20132114
* Context: Any context.
20142115
* Return: The sequence number of the next newest (not yet available) record
20152116
* for readers.
20162117
*/
20172118
u64 prb_next_seq(struct printk_ringbuffer *rb)
20182119
{
2019-
struct prb_desc_ring *desc_ring = &rb->desc_ring;
2020-
enum desc_state d_state;
2021-
unsigned long id;
20222120
u64 seq;
20232121

2024-
/* Check if the cached @id still points to a valid @seq. */
2025-
id = atomic_long_read(&desc_ring->last_finalized_id);
2026-
d_state = desc_read(desc_ring, id, NULL, &seq, NULL);
2122+
seq = desc_last_finalized_seq(rb);
20272123

2028-
if (d_state == desc_finalized || d_state == desc_reusable) {
2029-
/*
2030-
* Begin searching after the last finalized record.
2031-
*
2032-
* On 0, the search must begin at 0 because of hack#2
2033-
* of the bootstrapping phase it is not known if a
2034-
* record at index 0 exists.
2035-
*/
2036-
if (seq != 0)
2037-
seq++;
2038-
} else {
2039-
/*
2040-
* The information about the last finalized sequence number
2041-
* has gone. It should happen only when there is a flood of
2042-
* new messages and the ringbuffer is rapidly recycled.
2043-
* Give up and start from the beginning.
2044-
*/
2045-
seq = 0;
2046-
}
2124+
/*
2125+
* Begin searching after the last finalized record.
2126+
*
2127+
* On 0, the search must begin at 0 because of hack#2
2128+
* of the bootstrapping phase it is not known if a
2129+
* record at index 0 exists.
2130+
*/
2131+
if (seq != 0)
2132+
seq++;
20472133

20482134
/*
20492135
* The information about the last finalized @seq might be inaccurate.
@@ -2085,7 +2171,7 @@ void prb_init(struct printk_ringbuffer *rb,
20852171
rb->desc_ring.infos = infos;
20862172
atomic_long_set(&rb->desc_ring.head_id, DESC0_ID(descbits));
20872173
atomic_long_set(&rb->desc_ring.tail_id, DESC0_ID(descbits));
2088-
atomic_long_set(&rb->desc_ring.last_finalized_id, DESC0_ID(descbits));
2174+
atomic_long_set(&rb->desc_ring.last_finalized_seq, 0);
20892175

20902176
rb->text_data_ring.size_bits = textbits;
20912177
rb->text_data_ring.data = text_buf;

kernel/printk/printk_ringbuffer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ struct prb_desc_ring {
7575
struct printk_info *infos;
7676
atomic_long_t head_id;
7777
atomic_long_t tail_id;
78-
atomic_long_t last_finalized_id;
78+
atomic_long_t last_finalized_seq;
7979
};
8080

8181
/*
@@ -259,7 +259,7 @@ static struct printk_ringbuffer name = { \
259259
.infos = &_##name##_infos[0], \
260260
.head_id = ATOMIC_INIT(DESC0_ID(descbits)), \
261261
.tail_id = ATOMIC_INIT(DESC0_ID(descbits)), \
262-
.last_finalized_id = ATOMIC_INIT(DESC0_ID(descbits)), \
262+
.last_finalized_seq = ATOMIC_INIT(0), \
263263
}, \
264264
.text_data_ring = { \
265265
.size_bits = (avgtextbits) + (descbits), \

0 commit comments

Comments
 (0)