Skip to content

Commit bc21d8f

Browse files
lpereiraAnas Nashif
authored andcommitted
samples: net/nats: Fix parsing of MSG messages
This addresses the issues found by QA in ZEP-1012 and clarify the documentated behavior as described in ZEP-1859. Change-Id: I602e5749db7f6f44cf5be449b8e6f0d2ba66b69b Signed-off-by: Leandro Pereira <[email protected]>
1 parent 5eee248 commit bc21d8f

File tree

3 files changed

+87
-31
lines changed

3 files changed

+87
-31
lines changed

samples/net/nats/README.rst

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,15 +131,19 @@ If ``queue_group`` is NULL, it's not sent to the server.
131131

132132
``max_msgs`` specifies the number of messages that the server will
133133
send before actually unsubscribing the message. Can be 0 to
134-
immediately unsubscribe.
134+
immediately unsubscribe. (See note below.)
135135

136136
Both of these functions will return ``-ENOMEM`` if they couldn't build
137137
the message to transmit to the server. They can also return any error
138138
that ``net_context_send()`` can return.
139139

140-
In order to conserve resources, the Zephyr implementation will not make
141-
not of subscribed topics. This is left as a task for the user of the API
142-
to handle, for instance, when the ``on_message()`` callback is called.
140+
Note: In order to conserve resources, the library implementation will not
141+
make note of subscribed topics. Both ``nats_subscribe()`` and
142+
``nats_unsubscribe()`` functions will only notify the server that the client
143+
is either interested or uninterested in a particular topic. The
144+
``on_message()`` callback may be called to notify of changes in topics that
145+
have not been subscribed to (or have been recently unsubscribed). It's up
146+
to the application to decide to ignore the message.
143147

144148
Topics can be published by using the following function:
145149

@@ -155,5 +159,5 @@ which case, subscribers to this topic won't receive this information as
155159
well.
156160

157161
As ``net_subscribe()`` and ``net_unsubscribe()``, this function can
158-
return ``ENOMEM`` -or any other errors that ``net_context_send()``
162+
return ``-ENOMEM`` or any other errors that ``net_context_send()``
159163
returns.

samples/net/nats/src/main.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ static void write_led(const struct nats *nats,
203203
pubstate = state ? "on" : "off";
204204
nats_publish(nats, "led0", 0, msg->reply_to, 0,
205205
pubstate, strlen(pubstate));
206+
207+
printk("*** Turning LED %s\n", pubstate);
206208
}
207209

208210
static int on_msg_received(const struct nats *nats,

samples/net/nats/src/nats.c

Lines changed: 76 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@
66

77
#include <ctype.h>
88
#include <errno.h>
9+
#include <json.h>
910
#include <misc/printk.h>
11+
#include <misc/util.h>
1012
#include <net/nbuf.h>
1113
#include <net/net_context.h>
1214
#include <net/net_core.h>
@@ -17,10 +19,11 @@
1719
#include <stdlib.h>
1820
#include <string.h>
1921
#include <zephyr.h>
20-
#include <json.h>
2122

2223
#include "nats.h"
2324

25+
#define CMD_BUF_LEN 256
26+
2427
struct nats_info {
2528
const char *server_id;
2629
const char *version;
@@ -128,7 +131,8 @@ static inline int transmit(struct net_context *conn, const char buffer[],
128131
.offset = offsetof(struct_, member_), \
129132
.type = type_ \
130133
}
131-
static int handle_server_info(struct nats *nats, char *payload, size_t len)
134+
static int handle_server_info(struct nats *nats, char *payload, size_t len,
135+
struct net_buf *buf, uint16_t offset)
132136
{
133137
static const struct json_obj_descr descr[] = {
134138
FIELD(struct nats_info, server_id, JSON_TOK_STRING),
@@ -227,10 +231,42 @@ static char *strsep(char *strp, const char *delims)
227231
return NULL;
228232
}
229233

230-
static int handle_server_msg(struct nats *nats, char *payload, size_t len)
234+
static int copy_nbuf_to_buf(struct net_buf *src, uint16_t offset,
235+
char *dst, size_t dst_size, size_t n_bytes)
236+
{
237+
uint16_t to_copy;
238+
uint16_t copied;
239+
240+
if (dst_size < n_bytes) {
241+
return -ENOMEM;
242+
}
243+
244+
while (src && offset >= src->len) {
245+
offset -= src->len;
246+
src = src->frags;
247+
}
248+
249+
for (copied = 0; src && n_bytes > 0; offset = 0) {
250+
to_copy = min(n_bytes, src->len - offset);
251+
252+
memcpy(dst + copied, (char *)src->data + offset, to_copy);
253+
copied += to_copy;
254+
255+
n_bytes -= to_copy;
256+
src = src->frags;
257+
}
258+
259+
if (n_bytes > 0) {
260+
return -ENOMEM;
261+
}
262+
263+
return 0;
264+
}
265+
266+
static int handle_server_msg(struct nats *nats, char *payload, size_t len,
267+
struct net_buf *buf, uint16_t offset)
231268
{
232-
char *subject, *sid, *reply_to, *bytes;
233-
char *end_ptr;
269+
char *subject, *sid, *reply_to, *bytes, *end_ptr;
234270
char prev_end = payload[len];
235271
size_t payload_size;
236272

@@ -241,21 +277,21 @@ static int handle_server_msg(struct nats *nats, char *payload, size_t len)
241277
subject = payload;
242278
sid = strsep(subject, " \t");
243279
reply_to = strsep(sid, " \t");
244-
if (!reply_to) {
245-
bytes = strsep(sid, "\r");
246-
} else {
247-
bytes = strsep(reply_to, "\r");
248-
}
249-
250-
payload[len] = prev_end;
280+
bytes = strsep(reply_to, " \t");
251281

252282
if (!bytes) {
253-
return -EINVAL;
283+
if (!reply_to) {
284+
return -EINVAL;
285+
}
286+
287+
bytes = reply_to;
288+
reply_to = NULL;
254289
}
255290

256291
/* Parse the payload size */
257292
errno = 0;
258293
payload_size = strtoul(bytes, &end_ptr, 10);
294+
payload[len] = prev_end;
259295
if (errno != 0) {
260296
return -errno;
261297
}
@@ -264,29 +300,35 @@ static int handle_server_msg(struct nats *nats, char *payload, size_t len)
264300
return -EINVAL;
265301
}
266302

267-
if (payload_size >= payload + len - end_ptr) {
268-
return -EINVAL;
303+
if (payload_size >= CMD_BUF_LEN - len) {
304+
return -ENOMEM;
269305
}
270306

271-
payload = end_ptr + 2;
307+
if (copy_nbuf_to_buf(buf, offset, end_ptr, CMD_BUF_LEN - len,
308+
payload_size) < 0) {
309+
return -ENOMEM;
310+
}
311+
end_ptr[payload_size] = '\0';
272312

273313
return nats->on_message(nats, &(struct nats_msg) {
274314
.subject = subject,
275315
.sid = sid,
276316
.reply_to = reply_to,
277-
.payload = payload,
317+
.payload = end_ptr,
278318
.payload_len = payload_size,
279319
});
280320
}
281321

282-
static int handle_server_ping(struct nats *nats, char *payload, size_t len)
322+
static int handle_server_ping(struct nats *nats, char *payload, size_t len,
323+
struct net_buf *buf, uint16_t offset)
283324
{
284325
static const char pong[] = "PONG\r\n";
285326

286327
return transmit(nats->conn, pong, sizeof(pong) - 1);
287328
}
288329

289-
static int ignore(struct nats *nats, char *payload, size_t len)
330+
static int ignore(struct nats *nats, char *payload, size_t len,
331+
struct net_buf *buf, uint16_t offset)
290332
{
291333
/* FIXME: Notify user of success/errors. This would require
292334
* maintaining information of what was the last sent command in
@@ -302,12 +344,14 @@ static int ignore(struct nats *nats, char *payload, size_t len)
302344
.len = sizeof(cmd_) - 1, \
303345
.handle = handler_ \
304346
}
305-
static int handle_server_cmd(struct nats *nats, char *cmd, size_t len)
347+
static int handle_server_cmd(struct nats *nats, char *cmd, size_t len,
348+
struct net_buf *buf, uint16_t offset)
306349
{
307350
static const struct {
308351
const char *op;
309352
size_t len;
310-
int (*handle)(struct nats *nats, char *payload, size_t len);
353+
int (*handle)(struct nats *nats, char *payload, size_t len,
354+
struct net_buf *buf, uint16_t offset);
311355
} cmds[] = {
312356
CMD("INFO", handle_server_info),
313357
CMD("MSG", handle_server_msg),
@@ -327,15 +371,16 @@ static int handle_server_cmd(struct nats *nats, char *cmd, size_t len)
327371
}
328372
}
329373
payload_len = len - (size_t)(payload - cmd);
330-
len = (size_t)(payload - cmd);
374+
len = (size_t)(payload - cmd - 1);
331375

332376
for (i = 0; i < ARRAY_SIZE(cmds); i++) {
333377
if (len != cmds[i].len) {
334378
continue;
335379
}
336380

337381
if (!strncmp(cmds[i].op, cmd, len)) {
338-
return cmds[i].handle(nats, payload, payload_len);
382+
return cmds[i].handle(nats, payload, payload_len,
383+
buf, offset);
339384
}
340385
}
341386

@@ -490,7 +535,7 @@ static void receive_cb(struct net_context *ctx, struct net_buf *buf, int status,
490535
void *user_data)
491536
{
492537
struct nats *nats = user_data;
493-
char cmd_buf[256];
538+
char cmd_buf[CMD_BUF_LEN];
494539
struct net_buf *tmp;
495540
uint16_t pos = 0, cmd_len = 0;
496541
size_t len;
@@ -513,7 +558,7 @@ static void receive_cb(struct net_context *ctx, struct net_buf *buf, int status,
513558
while (tmp) {
514559
len = tmp->len - pos;
515560

516-
end_of_line = memchr((uint8_t *)tmp->data + pos, '\r', len);
561+
end_of_line = memchr((uint8_t *)tmp->data + pos, '\n', len);
517562
if (end_of_line) {
518563
len = end_of_line - ((uint8_t *)tmp->data + pos);
519564
}
@@ -526,12 +571,17 @@ static void receive_cb(struct net_context *ctx, struct net_buf *buf, int status,
526571
cmd_len += len;
527572

528573
if (end_of_line) {
574+
int ret;
575+
529576
if (tmp) {
530577
tmp = net_nbuf_read(tmp, pos, &pos, 1, NULL);
531578
}
532579

533580
cmd_buf[cmd_len] = '\0';
534-
if (handle_server_cmd(nats, cmd_buf, cmd_len) < 0) {
581+
582+
ret = handle_server_cmd(nats, cmd_buf, cmd_len,
583+
tmp, pos);
584+
if (ret < 0) {
535585
/* FIXME: What to do with unhandled messages? */
536586
break;
537587
}

0 commit comments

Comments
 (0)