Skip to content

Commit 509b0b1

Browse files
committed
enable pub/sub with character scalars
1 parent 005915b commit 509b0b1

File tree

8 files changed

+49
-34
lines changed

8 files changed

+49
-34
lines changed

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@
44

55
* `$context()` method added for creating new contexts from nano Objects using supported protocols (i.e. req, rep, sub, surveyor, respondent) - this replaces the `context()` function for nano Objects.
66
* `subscribe()` and `unsubscribe()` now accept a topic of any atomic type (not just character), allowing pub/sub to be used with integer, double, logical, complex, or raw vectors.
7+
* Sending via the "pub" protocol, the topic no longer needs to be separated from the rest of the message, allowing character scalars to be sent as well as vectors.
78
* Added convenience auxiliary functions `is_nano()` and `is_aio()`.
89

910
#### Updates
1011

1112
* Protocol-specific helpers `subscribe()`, `unsubscribe()`, and `survey_time()` gain nanoContext methods.
12-
* Default protocol is now 'bus' when opening a new Socket or nano Object - the choices are now ordered more logically.
13+
* Default protocol is now 'bus' when opening a new Socket or nano Object - the choices are ordered more logically.
1314
* Closing a stream now strips all attributes on the object rendering it a nil external pointer - this is for safety, eliminating a potential crash if attempting to re-use a closed stream.
1415
* For receives, if an error occurs in unserialisation or data conversion (e.g. mode was incorrectly specified), the received raw vector is now available at both `$raw` and `$data` if `keep.raw = TRUE`.
1516
* Setting 'NANONEXT_TLS=1' now allows the downloaded NNG library to be built against a system mbedtls installation.

R/protocol.R

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,21 @@
1313
#'
1414
#' @details To use pub/sub the publisher must:
1515
#' \itemize{
16-
#' \item{specify \code{mode = 'raw'} when sending to allow the topics to be
17-
#' recognised by the receiving party.}
18-
#' \item{when sending a character vector, the topic must be separated from
19-
#' the rest of the message using e.g.
20-
#' \code{send(socket, c("topic", "message"), mode = "raw")}}
16+
#' \item{specify \code{mode = 'raw'} when sending.}
17+
#' \item{ensure the sent vector starts with the topic.}
2118
#' }
19+
#' The subscriber should then receive specifying the correct mode.
2220
#'
2321
#' @examples
2422
#' pub <- socket("pub", listen = "inproc://nanonext")
2523
#' sub <- socket("sub", dial = "inproc://nanonext")
2624
#'
2725
#' subscribe(sub, "examples")
26+
#'
2827
#' send(pub, c("examples", "this is an example"), mode = "raw")
2928
#' recv(sub, "character")
29+
#' send(pub, "examples will also be received", mode = "raw")
30+
#' recv(sub, "character")
3031
#' send(pub, c("other", "this other topic will not be received"), mode = "raw")
3132
#' recv(sub, "character")
3233
#'
@@ -78,20 +79,21 @@ subscribe.nanoContext <- function(con, topic = NULL) {
7879
#'
7980
#' To use pub/sub the publisher must:
8081
#' \itemize{
81-
#' \item{specify \code{mode = 'raw'} when sending to allow the topics to be
82-
#' recognised by the receiving party.}
83-
#' \item{when sending a character vector, the topic must be separated from
84-
#' the rest of the message using e.g.
85-
#' \code{send(socket, c("topic", "message"), mode = "raw")}}
82+
#' \item{specify \code{mode = 'raw'} when sending.}
83+
#' \item{ensure the sent vector starts with the topic.}
8684
#' }
85+
#' The subscriber should then receive specifying the correct mode.
8786
#'
8887
#' @examples
8988
#' pub <- socket("pub", listen = "inproc://nanonext")
9089
#' sub <- socket("sub", dial = "inproc://nanonext")
9190
#'
9291
#' subscribe(sub, NULL)
92+
#'
9393
#' send(pub, c("examples", "this is an example"), mode = "raw")
9494
#' recv(sub, "character")
95+
#' send(pub, "examples will also be received", mode = "raw")
96+
#' recv(sub, "character")
9597
#' unsubscribe(sub, NULL)
9698
#' send(pub, c("examples", "this example will not be received"), mode = "raw")
9799
#' recv(sub, "character")

README.Rmd

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,9 +335,13 @@ pub <- socket("pub", listen = "inproc://nanobroadcast")
335335
sub <- socket("sub", dial = "inproc://nanobroadcast")
336336
337337
sub |> subscribe(topic = "examples")
338+
338339
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
339340
sub |> recv(mode = "character", keep.raw = FALSE)
340341
342+
pub |> send("examples at the start of a single text message", mode = "raw", echo = FALSE)
343+
sub |> recv(mode = "character", keep.raw = FALSE)
344+
341345
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
342346
sub |> recv(mode = "character", keep.raw = FALSE)
343347
@@ -351,7 +355,7 @@ pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw",
351355
sub |> recv("character", keep.raw = FALSE)
352356
353357
# however the topics explicitly subscribed to are still received
354-
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
358+
pub |> send(c("examples will still be received"), mode = "raw", echo = FALSE)
355359
sub |> recv(mode = "character", keep.raw = FALSE)
356360
357361
```

README.md

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -395,7 +395,7 @@ aio
395395
#> < recvAio >
396396
#> - $data for message data
397397
aio$data |> str()
398-
#> num [1:100000000] 1.5004 -0.1919 -1.5241 0.9783 0.0144 ...
398+
#> num [1:100000000] 0.983 0.386 1.337 0.243 -0.119 ...
399399
```
400400

401401
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -427,10 +427,15 @@ pub <- socket("pub", listen = "inproc://nanobroadcast")
427427
sub <- socket("sub", dial = "inproc://nanobroadcast")
428428

429429
sub |> subscribe(topic = "examples")
430+
430431
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
431432
sub |> recv(mode = "character", keep.raw = FALSE)
432433
#> [1] "examples" "this is an example"
433434

435+
pub |> send("examples at the start of a single text message", mode = "raw", echo = FALSE)
436+
sub |> recv(mode = "character", keep.raw = FALSE)
437+
#> [1] "examples at the start of a single text message"
438+
434439
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
435440
sub |> recv(mode = "character", keep.raw = FALSE)
436441
#> Warning in recv.nanoSocket(sub, mode = "character", keep.raw = FALSE): 8 | Try
@@ -450,9 +455,9 @@ sub |> recv("character", keep.raw = FALSE)
450455
#> 'errorValue' int 8
451456

452457
# however the topics explicitly subscribed to are still received
453-
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
458+
pub |> send(c("examples will still be received"), mode = "raw", echo = FALSE)
454459
sub |> recv(mode = "character", keep.raw = FALSE)
455-
#> [1] "examples" "this example will still be received"
460+
#> [1] "examples will still be received"
456461
```
457462

458463
The subscribed topic can be of any atomic type (not just character),
@@ -547,11 +552,11 @@ ncurl("http://httpbin.org/headers")
547552
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
548553
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
549554
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
550-
#> [76] 2d 36 32 37 38 32 66 61 64 2d 37 30 63 35 63 31 61 30 37 39 63 66 62 66 32
551-
#> [101] 31 37 36 65 33 34 30 65 65 22 0a 20 20 7d 0a 7d 0a
555+
#> [76] 2d 36 32 37 38 63 63 63 30 2d 37 66 66 34 37 37 33 31 31 35 32 66 39 61 32
556+
#> [101] 34 30 65 63 37 38 30 66 37 22 0a 20 20 7d 0a 7d 0a
552557
#>
553558
#> $data
554-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62782fad-70c5c1a079cfbf2176e340ee\"\n }\n}\n"
559+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6278ccc0-7ff47731152f9a240ec780f7\"\n }\n}\n"
555560
```
556561

557562
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -567,7 +572,7 @@ res
567572
#> - $raw for raw message
568573

569574
call_aio(res)$data
570-
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62782fad-7472b2c32d5af7603e86ae7d\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
575+
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6278ccc0-476e7d46361255b56173915a\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"79.173.189.204\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
571576
```
572577

573578
In this respect, it may be used as a performant and lightweight method
@@ -597,10 +602,10 @@ s
597602
#> - textframes: TRUE
598603

599604
s |> recv(keep.raw = FALSE)
600-
#> [1] "{\"e\":\"kline\",\"E\":1652043696064,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652043660000,\"T\":1652043719999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1351814911,\"L\":1351815351,\"o\":\"34253.08000000\",\"c\":\"34279.21000000\",\"h\":\"34285.51000000\",\"l\":\"34253.07000000\",\"v\":\"21.53329000\",\"n\":441,\"x\":false,\"q\":\"737928.59980930\",\"V\":\"15.55859000\",\"Q\":\"533152.23838480\",\"B\":\"0\"}}"
605+
#> [1] "{\"e\":\"kline\",\"E\":1652083907587,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652083860000,\"T\":1652083919999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1352400783,\"L\":1352401304,\"o\":\"33509.32000000\",\"c\":\"33501.05000000\",\"h\":\"33509.33000000\",\"l\":\"33492.19000000\",\"v\":\"38.29280000\",\"n\":522,\"x\":false,\"q\":\"1282941.28760590\",\"V\":\"10.68340000\",\"Q\":\"357878.51734890\",\"B\":\"0\"}}"
601606

602607
s |> recv(keep.raw = FALSE)
603-
#> [1] "{\"e\":\"kline\",\"E\":1652043698106,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652043660000,\"T\":1652043719999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1351814911,\"L\":1351815367,\"o\":\"34253.08000000\",\"c\":\"34281.51000000\",\"h\":\"34285.51000000\",\"l\":\"34253.07000000\",\"v\":\"21.61695000\",\"n\":457,\"x\":false,\"q\":\"740796.49099100\",\"V\":\"15.64225000\",\"Q\":\"536020.12956650\",\"B\":\"0\"}}"
608+
#> [1] "{\"e\":\"kline\",\"E\":1652083909700,\"s\":\"BTCUSDT\",\"k\":{\"t\":1652083860000,\"T\":1652083919999,\"s\":\"BTCUSDT\",\"i\":\"1m\",\"f\":1352400783,\"L\":1352401310,\"o\":\"33509.32000000\",\"c\":\"33501.05000000\",\"h\":\"33509.33000000\",\"l\":\"33492.19000000\",\"v\":\"38.43526000\",\"n\":528,\"x\":false,\"q\":\"1287713.84719200\",\"V\":\"10.68371000\",\"Q\":\"357888.90267750\",\"B\":\"0\"}}"
604609

605610
close(s)
606611
```

man/subscribe.Rd

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/unsubscribe.Rd

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/opts.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ SEXP rnng_socket_set(SEXP socket, SEXP type, SEXP opt, SEXP value) {
2020
xc = nng_socket_set(*sock, op, NULL, 0);
2121
} else {
2222
SEXP enc = nano_encode(value);
23-
xc = nng_socket_set(*sock, op, RAW(enc), Rf_xlength(enc));
23+
size_t sz = TYPEOF(value) == STRSXP ? Rf_xlength(enc) - 1 : Rf_xlength(enc);
24+
xc = nng_socket_set(*sock, op, RAW(enc), sz);
2425
}
2526
break;
2627
case 1:

tests/tests.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,8 @@ nanotest(subscribe(sub$socket, NULL) == 0L)
139139
nanotest(unsubscribe(sub$socket, NULL) == 0L)
140140
nanotest(sub$unsubscribe("test") == 0L)
141141
subctx <- sub$context()
142-
nanotest(subscribe(subctx, "test") == 0L)
143-
nanotest(unsubscribe(subctx, "test") == 0L)
142+
nanotest(subscribe(subctx, 12) == 0L)
143+
nanotest(unsubscribe(subctx, 12) == 0L)
144144
nanotest(close(subctx) == 0L)
145145
nanotest(sub$close() == 0L)
146146
nanotest(pub$close() == 0L)

0 commit comments

Comments
 (0)