Skip to content

Commit 563f05e

Browse files
jukkarfabiobaltieri
authored andcommitted
net: zperf: Add concurrency when doing upload tests
If user has enabled CONFIG_ZPERF_SESSION_PER_THREAD, then if user gives -a (async) option to upload command, then multiple uploads can be run simultaneously. Each upload will be run in a dedicated work queue. The work queue thread priority can be set by -t option. Signed-off-by: Jukka Rissanen <[email protected]>
1 parent 5e47efa commit 563f05e

File tree

10 files changed

+836
-40
lines changed

10 files changed

+836
-40
lines changed

doc/connectivity/networking/api/zperf.rst

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,3 +96,79 @@ and this if you are testing TCP:
9696
9797
iPerf output can be limited by using the -b option if Zephyr is not
9898
able to receive all the packets in orderly manner.
99+
100+
Session Management
101+
******************
102+
103+
If :kconfig:option:`CONFIG_ZPERF_SESSION_PER_THREAD` option is set, then
104+
multiple upload sessions can be done at the same time if user supplies ``-a``
105+
option when starting the upload. Each session will have their own work queue
106+
to run the test. The session test results can be viewed also after the tests
107+
have finished.
108+
109+
Following zperf shell commands are available for session management:
110+
111+
.. csv-table::
112+
:header: "zperf shell command", "Description"
113+
:widths: auto
114+
115+
"``jobs``", "Show currently active or finished sessions"
116+
"``jobs all``", "Show statistics of finished sessions"
117+
"``jobs clear``", "Clear finished session statistics"
118+
119+
Example:
120+
121+
.. code-block:: console
122+
123+
uart:~$ zperf udp upload -a -t 5 192.0.2.2 5001 10 1K 1M
124+
Remote port is 5001
125+
Connecting to 192.0.2.2
126+
Duration: 10.00 s
127+
Packet size: 1000 bytes
128+
Rate: 1000 kbps
129+
Starting...
130+
Rate: 1.00 Mbps
131+
Packet duration 7 ms
132+
133+
uart:~$ zperf jobs all
134+
No sessions sessions found
135+
uart:~$ zperf jobs
136+
Thread Remaining
137+
Id Proto Priority time (sec)
138+
[1] UDP 5 4
139+
140+
Active sessions have not yet finished
141+
-
142+
Upload completed!
143+
Statistics: server (client)
144+
Duration: 30.01 s (30.01 s)
145+
Num packets: 3799 (3799)
146+
Num packets out order: 0
147+
Num packets lost: 0
148+
Jitter: 63 us
149+
Rate: 1.01 Mbps (1.01 Mbps)
150+
Thread priority: 5
151+
Protocol: UDP
152+
Session id: 1
153+
154+
uart:~$ zperf jobs all
155+
-
156+
Upload completed!
157+
Statistics: server (client)
158+
Duration: 30.01 s (30.01 s)
159+
Num packets: 3799 (3799)
160+
Num packets out order: 0
161+
Num packets lost: 0
162+
Jitter: 63 us
163+
Rate: 1.01 Mbps (1.01 Mbps)
164+
Thread priority: 5
165+
Protocol: UDP
166+
Session id: 1
167+
Total 1 sessions done
168+
169+
uart:~$ zperf jobs clear
170+
Cleared data from 1 sessions
171+
172+
uart:~$ zperf jobs
173+
No active upload sessions
174+
No finished sessions found

include/zephyr/net/zperf.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ struct zperf_upload_params {
4545
uint8_t tos;
4646
int tcp_nodelay;
4747
int priority;
48+
#ifdef CONFIG_ZPERF_SESSION_PER_THREAD
49+
int thread_priority;
50+
#endif
4851
uint32_t report_interval_ms;
4952
} options;
5053
};

subsys/net/lib/zperf/Kconfig

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ config NET_ZPERF_LEGACY_HEADER_COMPAT
2222
detected. This option reverts the header format for use with
2323
iperf version 2.0.9 and earlier.
2424

25+
config ZPERF_SESSION_PER_THREAD
26+
bool "Run each session in a separate thread"
27+
help
28+
Each session is started in its own thread. This means
29+
that the system will use more memory because multiple
30+
stack frames are needed, so this is not enabled by default.
31+
User is also able to set each thread priority separately and
32+
the ZPERF_WORK_Q_THREAD_PRIORITY is a default value if thread
33+
priority is not set when starting the session.
34+
2535
config ZPERF_WORK_Q_THREAD_PRIORITY
2636
int "zperf work queue thread priority"
2737
default NUM_PREEMPT_PRIORITIES

subsys/net/lib/zperf/zperf_common.c

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,70 @@ struct sockaddr_in *zperf_get_sin(void)
4141
return &in4_addr_my;
4242
}
4343

44-
#define ZPERF_WORK_Q_THREAD_PRIORITY \
45-
CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, K_HIGHEST_APPLICATION_THREAD_PRIO, \
44+
#define ZPERF_WORK_Q_THREAD_PRIORITY \
45+
CLAMP(CONFIG_ZPERF_WORK_Q_THREAD_PRIORITY, \
46+
K_HIGHEST_APPLICATION_THREAD_PRIO, \
4647
K_LOWEST_APPLICATION_THREAD_PRIO)
47-
K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE);
4848

49+
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
50+
struct zperf_work {
51+
struct k_work_q *queue;
52+
struct z_thread_stack_element *stack;
53+
size_t stack_size;
54+
};
55+
56+
#define CREATE_WORK_Q(i, _) \
57+
static struct k_work_q zperf_work_q_##i; \
58+
static K_KERNEL_STACK_DEFINE(zperf_work_q_stack_##i, \
59+
CONFIG_ZPERF_WORK_Q_STACK_SIZE)
60+
61+
/* Both UDP and TCP can have separate sessions so multiply by 2 */
62+
#if defined(CONFIG_NET_UDP) && defined(CONFIG_NET_TCP)
63+
#define MAX_SESSION_COUNT UTIL_X2(CONFIG_NET_ZPERF_MAX_SESSIONS)
64+
#define SESSION_INDEX CONFIG_NET_ZPERF_MAX_SESSIONS
65+
#else
66+
#define MAX_SESSION_COUNT CONFIG_NET_ZPERF_MAX_SESSIONS
67+
#define SESSION_INDEX 0
68+
#endif
69+
70+
LISTIFY(MAX_SESSION_COUNT, CREATE_WORK_Q, (;), _);
71+
72+
#define SET_WORK_Q(i, _) \
73+
[i] = { \
74+
.queue = &zperf_work_q_##i, \
75+
.stack = zperf_work_q_stack_##i, \
76+
.stack_size = K_THREAD_STACK_SIZEOF(zperf_work_q_stack_##i), \
77+
}
78+
79+
static struct zperf_work zperf_work_q[] = {
80+
LISTIFY(MAX_SESSION_COUNT, SET_WORK_Q, (,), _)
81+
};
82+
83+
struct k_work_q *get_queue(enum session_proto proto, int session_id)
84+
{
85+
if (session_id < 0 || session_id >= CONFIG_NET_ZPERF_MAX_SESSIONS) {
86+
return NULL;
87+
}
88+
89+
if (proto < 0 || proto >= SESSION_PROTO_END) {
90+
return NULL;
91+
}
92+
93+
NET_DBG("%s using queue %d for session %d\n",
94+
proto == SESSION_UDP ? "UDP" : "TCP",
95+
proto * SESSION_INDEX + session_id,
96+
session_id);
97+
98+
return zperf_work_q[proto * SESSION_INDEX + session_id].queue;
99+
}
100+
101+
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
102+
103+
K_THREAD_STACK_DEFINE(zperf_work_q_stack, CONFIG_ZPERF_WORK_Q_STACK_SIZE);
49104
static struct k_work_q zperf_work_q;
50105

106+
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
107+
51108
int zperf_get_ipv6_addr(char *host, char *prefix_str, struct in6_addr *addr)
52109
{
53110
struct net_if_ipv6_prefix *prefix;
@@ -220,28 +277,62 @@ uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps)
220277
(rate_in_kbps * 1024U));
221278
}
222279

223-
void zperf_async_work_submit(struct k_work *work)
280+
void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work)
224281
{
282+
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
283+
k_work_submit_to_queue(zperf_work_q[proto * SESSION_INDEX + session_id].queue, work);
284+
#else
285+
ARG_UNUSED(proto);
286+
ARG_UNUSED(session_id);
287+
225288
k_work_submit_to_queue(&zperf_work_q, work);
289+
#endif
226290
}
227291

228292
static int zperf_init(void)
229293
{
294+
#if defined(CONFIG_ZPERF_SESSION_PER_THREAD)
295+
296+
ARRAY_FOR_EACH(zperf_work_q, i) {
297+
struct k_work_queue_config cfg = {
298+
.no_yield = false,
299+
};
300+
301+
#define MAX_NAME_LEN sizeof("zperf_work_q[xxx]")
302+
char name[MAX_NAME_LEN];
303+
304+
snprintk(name, sizeof(name), "zperf_work_q[%d]", i);
305+
cfg.name = name;
306+
307+
k_work_queue_init(zperf_work_q[i].queue);
308+
309+
k_work_queue_start(zperf_work_q[i].queue,
310+
zperf_work_q[i].stack,
311+
zperf_work_q[i].stack_size,
312+
ZPERF_WORK_Q_THREAD_PRIORITY,
313+
&cfg);
314+
}
315+
316+
#else /* CONFIG_ZPERF_SESSION_PER_THREAD */
230317

231318
k_work_queue_init(&zperf_work_q);
232319
k_work_queue_start(&zperf_work_q, zperf_work_q_stack,
233-
K_THREAD_STACK_SIZEOF(zperf_work_q_stack), ZPERF_WORK_Q_THREAD_PRIORITY,
320+
K_THREAD_STACK_SIZEOF(zperf_work_q_stack),
321+
ZPERF_WORK_Q_THREAD_PRIORITY,
234322
NULL);
235323
k_thread_name_set(&zperf_work_q.thread, "zperf_work_q");
236324

325+
#endif /* CONFIG_ZPERF_SESSION_PER_THREAD */
326+
237327
if (IS_ENABLED(CONFIG_NET_UDP)) {
238328
zperf_udp_uploader_init();
239329
}
240330
if (IS_ENABLED(CONFIG_NET_TCP)) {
241331
zperf_tcp_uploader_init();
242332
}
243333

244-
if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER)) {
334+
if (IS_ENABLED(CONFIG_NET_ZPERF_SERVER) ||
335+
IS_ENABLED(CONFIG_ZPERF_SESSION_PER_THREAD)) {
245336
zperf_session_init();
246337
}
247338

subsys/net/lib/zperf/zperf_internal.h

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@
5353

5454
#define ZPERF_VERSION "1.1"
5555

56+
enum session_proto {
57+
SESSION_UDP = 0,
58+
SESSION_TCP = 1,
59+
SESSION_PROTO_END
60+
};
61+
5662
struct zperf_udp_datagram {
5763
uint32_t id;
5864
uint32_t tv_sec;
@@ -110,13 +116,14 @@ int zperf_get_ipv4_addr(char *host, struct in_addr *addr);
110116
struct sockaddr_in *zperf_get_sin(void);
111117

112118
extern void connect_ap(char *ssid);
119+
extern struct k_work_q *get_queue(enum session_proto proto, int session_id);
113120

114121
int zperf_prepare_upload_sock(const struct sockaddr *peer_addr, uint8_t tos,
115122
int priority, int tcp_nodelay, int proto);
116123

117124
uint32_t zperf_packet_duration(uint32_t packet_size, uint32_t rate_in_kbps);
118125

119-
void zperf_async_work_submit(struct k_work *work);
126+
void zperf_async_work_submit(enum session_proto proto, int session_id, struct k_work *work);
120127
void zperf_udp_uploader_init(void);
121128
void zperf_tcp_uploader_init(void);
122129

subsys/net/lib/zperf/zperf_session.c

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,70 @@ LOG_MODULE_DECLARE(net_zperf, CONFIG_NET_ZPERF_LOG_LEVEL);
1919

2020
static struct session sessions[SESSION_PROTO_END][SESSION_MAX];
2121

22+
struct session *get_free_session(const struct sockaddr *addr,
23+
enum session_proto proto)
24+
{
25+
struct session *ptr;
26+
uint64_t oldest = 0ULL;
27+
int oldest_completed_index = -1, oldest_free_index = -1;
28+
int i = 0;
29+
30+
const struct sockaddr_in *addr4 = (const struct sockaddr_in *)addr;
31+
const struct sockaddr_in6 *addr6 = (const struct sockaddr_in6 *)addr;
32+
33+
/* Check whether we already have an active session */
34+
while (i < SESSION_MAX) {
35+
ptr = &sessions[proto][i];
36+
37+
if (ptr->state == STATE_NULL ||
38+
ptr->state == STATE_COMPLETED) {
39+
40+
if (oldest == 0ULL || ptr->last_time < oldest) {
41+
oldest = ptr->last_time;
42+
43+
if (ptr->state == STATE_COMPLETED) {
44+
if (oldest_completed_index < 0) {
45+
oldest_completed_index = i;
46+
}
47+
} else {
48+
/* Free session */
49+
if (oldest_free_index < 0) {
50+
oldest_free_index = i;
51+
}
52+
}
53+
}
54+
}
55+
56+
i++;
57+
}
58+
59+
ptr = NULL;
60+
61+
if (oldest_free_index >= 0) {
62+
ptr = &sessions[proto][oldest_free_index];
63+
} else if (oldest_completed_index >= 0) {
64+
ptr = &sessions[proto][oldest_completed_index];
65+
}
66+
67+
if (ptr != NULL) {
68+
if (IS_ENABLED(CONFIG_NET_IPV4) &&
69+
addr->sa_family == AF_INET) {
70+
ptr->port = addr4->sin_port;
71+
ptr->ip.family = AF_INET;
72+
net_ipaddr_copy(&ptr->ip.in_addr, &addr4->sin_addr);
73+
} else if (IS_ENABLED(CONFIG_NET_IPV6) &&
74+
addr->sa_family == AF_INET6) {
75+
ptr->port = addr6->sin6_port;
76+
ptr->ip.family = AF_INET6;
77+
net_ipaddr_copy(&ptr->ip.in6_addr, &addr6->sin6_addr);
78+
}
79+
80+
ptr->state = STATE_STARTING;
81+
}
82+
83+
return ptr;
84+
}
85+
2286
/* Get session from a given packet */
2387
struct session *get_session(const struct sockaddr *addr,
2488
enum session_proto proto)
@@ -102,6 +166,14 @@ void zperf_reset_session_stats(struct session *session)
102166
session->last_transit_time = 0;
103167
}
104168

169+
void zperf_session_foreach(enum session_proto proto, session_cb_t cb,
170+
void *user_data)
171+
{
172+
ARRAY_FOR_EACH(sessions[proto], i) {
173+
cb(&sessions[proto][i], proto, user_data);
174+
}
175+
}
176+
105177
void zperf_session_reset(enum session_proto proto)
106178
{
107179
int i, j;
@@ -114,6 +186,7 @@ void zperf_session_reset(enum session_proto proto)
114186

115187
for (j = 0; j < SESSION_MAX; j++) {
116188
sessions[i][j].state = STATE_NULL;
189+
sessions[i][j].id = j;
117190
zperf_reset_session_stats(&(sessions[i][j]));
118191
}
119192
}

0 commit comments

Comments
 (0)