Skip to content

Commit d41c182

Browse files
teburdstephanosio
authored andcommitted
rtio: Make MPSC faster on non-smp systems
On non-smp systems where multiple cores aren't in play atomics aren't really necessary and volatile can be used in stead. Additionally marks the push function as ALWAYS_INLINE as I saw at times it was not being inlined. MPSC operation speed is crucial to the performance of rtio, these changes provided a 30% throughput improvmement in the throughput test. Signed-off-by: Tom Burdick <[email protected]>
1 parent fc32f1c commit d41c182

File tree

2 files changed

+54
-21
lines changed

2 files changed

+54
-21
lines changed

include/zephyr/rtio/rtio_mpsc.h

Lines changed: 47 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@
55
* SPDX-License-Identifier: Apache-2.0
66
*/
77

8-
98
#ifndef ZEPHYR_RTIO_MPSC_H_
109
#define ZEPHYR_RTIO_MPSC_H_
1110

1211
#include <stdint.h>
1312
#include <stdbool.h>
13+
#include <zephyr/toolchain/common.h>
1414
#include <zephyr/sys/atomic.h>
1515
#include <zephyr/kernel.h>
1616

@@ -25,6 +25,40 @@ extern "C" {
2525
* @{
2626
*/
2727

28+
/*
29+
* On single core systems atomics are unnecessary
30+
* and cause a lot of unnecessary cache invalidation
31+
*
32+
* Using volatile to at least ensure memory is read/written
33+
* by the compiler generated op codes is enough.
34+
*
35+
* On SMP atomics *must* be used to ensure the pointers
36+
* are updated in the correct order and the values are
37+
* updated core caches correctly.
38+
*/
39+
#if defined(CONFIG_SMP)
40+
41+
typedef atomic_ptr_t mpsc_ptr_t;
42+
43+
#define mpsc_ptr_get(ptr) atomic_ptr_get(&(ptr))
44+
#define mpsc_ptr_set(ptr, val) atomic_ptr_set(&(ptr), val)
45+
#define mpsc_ptr_set_get(ptr, val) atomic_ptr_set(&(ptr), val)
46+
47+
#else
48+
49+
typedef struct rtio_mpsc_node *mpsc_ptr_t;
50+
51+
#define mpsc_ptr_get(ptr) ptr
52+
#define mpsc_ptr_set(ptr, val) ptr = val
53+
#define mpsc_ptr_set_get(ptr, val) \
54+
({ \
55+
mpsc_ptr_t tmp = ptr; \
56+
ptr = val; \
57+
tmp; \
58+
})
59+
60+
#endif
61+
2862
/**
2963
* @file rtio_mpsc.h
3064
*
@@ -45,19 +79,18 @@ extern "C" {
4579
* @brief Queue member
4680
*/
4781
struct rtio_mpsc_node {
48-
atomic_ptr_t next;
82+
mpsc_ptr_t next;
4983
};
5084

5185
/**
5286
* @brief MPSC Queue
5387
*/
5488
struct rtio_mpsc {
55-
atomic_ptr_t head;
89+
mpsc_ptr_t head;
5690
struct rtio_mpsc_node *tail;
5791
struct rtio_mpsc_node stub;
5892
};
5993

60-
6194
/**
6295
* @brief Static initializer for a mpsc queue
6396
*
@@ -81,9 +114,9 @@ struct rtio_mpsc {
81114
*/
82115
static inline void rtio_mpsc_init(struct rtio_mpsc *q)
83116
{
84-
atomic_ptr_set(&q->head, &q->stub);
117+
mpsc_ptr_set(q->head, &q->stub);
85118
q->tail = &q->stub;
86-
atomic_ptr_set(&q->stub.next, NULL);
119+
mpsc_ptr_set(q->stub.next, NULL);
87120
}
88121

89122
/**
@@ -92,16 +125,16 @@ static inline void rtio_mpsc_init(struct rtio_mpsc *q)
92125
* @param q Queue to push the node to
93126
* @param n Node to push into the queue
94127
*/
95-
static inline void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
128+
static ALWAYS_INLINE void rtio_mpsc_push(struct rtio_mpsc *q, struct rtio_mpsc_node *n)
96129
{
97130
struct rtio_mpsc_node *prev;
98131
int key;
99132

100-
atomic_ptr_set(&n->next, NULL);
133+
mpsc_ptr_set(n->next, NULL);
101134

102135
key = arch_irq_lock();
103-
prev = (struct rtio_mpsc_node *)atomic_ptr_set(&q->head, n);
104-
atomic_ptr_set(&prev->next, n);
136+
prev = (struct rtio_mpsc_node *)mpsc_ptr_set_get(q->head, n);
137+
mpsc_ptr_set(prev->next, n);
105138
arch_irq_unlock(key);
106139
}
107140

@@ -115,7 +148,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
115148
{
116149
struct rtio_mpsc_node *head;
117150
struct rtio_mpsc_node *tail = q->tail;
118-
struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);
151+
struct rtio_mpsc_node *next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
119152

120153
/* Skip over the stub/sentinel */
121154
if (tail == &q->stub) {
@@ -125,7 +158,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
125158

126159
q->tail = next;
127160
tail = next;
128-
next = (struct rtio_mpsc_node *)atomic_ptr_get(&next->next);
161+
next = (struct rtio_mpsc_node *)mpsc_ptr_get(next->next);
129162
}
130163

131164
/* If next is non-NULL then a valid node is found, return it */
@@ -134,7 +167,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
134167
return tail;
135168
}
136169

137-
head = (struct rtio_mpsc_node *)atomic_ptr_get(&q->head);
170+
head = (struct rtio_mpsc_node *)mpsc_ptr_get(q->head);
138171

139172
/* If next is NULL, and the tail != HEAD then the queue has pending
140173
* updates that can't yet be accessed.
@@ -145,7 +178,7 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
145178

146179
rtio_mpsc_push(q, &q->stub);
147180

148-
next = (struct rtio_mpsc_node *)atomic_ptr_get(&tail->next);
181+
next = (struct rtio_mpsc_node *)mpsc_ptr_get(tail->next);
149182

150183
if (next != NULL) {
151184
q->tail = next;
@@ -163,5 +196,4 @@ static inline struct rtio_mpsc_node *rtio_mpsc_pop(struct rtio_mpsc *q)
163196
}
164197
#endif
165198

166-
167199
#endif /* ZEPHYR_RTIO_MPSC_H_ */

tests/subsys/rtio/rtio_api/src/test_rtio_mpsc.c

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ static struct rtio_mpsc_node push_pop_nodes[2];
2626
ZTEST(rtio_mpsc, test_push_pop)
2727
{
2828

29-
struct rtio_mpsc_node *node, *head, *stub, *next, *tail;
29+
mpsc_ptr_t node, head;
30+
struct rtio_mpsc_node *stub, *next, *tail;
3031

3132
rtio_mpsc_init(&push_pop_q);
3233

33-
head = atomic_ptr_get(&push_pop_q.head);
34+
head = mpsc_ptr_get(push_pop_q.head);
3435
tail = push_pop_q.tail;
3536
stub = &push_pop_q.stub;
36-
next = atomic_ptr_get(&stub->next);
37+
next = stub->next;
3738

3839
zassert_equal(head, stub, "Head should point at stub");
3940
zassert_equal(tail, stub, "Tail should point at stub");
@@ -44,12 +45,12 @@ ZTEST(rtio_mpsc, test_push_pop)
4445

4546
rtio_mpsc_push(&push_pop_q, &push_pop_nodes[0]);
4647

47-
head = atomic_ptr_get(&push_pop_q.head);
48+
head = mpsc_ptr_get(push_pop_q.head);
4849

4950
zassert_equal(head, &push_pop_nodes[0], "Queue head should point at push_pop_node");
50-
next = atomic_ptr_get(&push_pop_nodes[0].next);
51+
next = mpsc_ptr_get(push_pop_nodes[0].next);
5152
zassert_is_null(next, NULL, "push_pop_node next should point at null");
52-
next = atomic_ptr_get(&push_pop_q.stub.next);
53+
next = mpsc_ptr_get(push_pop_q.stub.next);
5354
zassert_equal(next, &push_pop_nodes[0], "Queue stub should point at push_pop_node");
5455
tail = push_pop_q.tail;
5556
stub = &push_pop_q.stub;

0 commit comments

Comments
 (0)