Skip to content

Commit f7038aa

Browse files
committed
add nanoContext methods for protocol helpers
1 parent 4757310 commit f7038aa

File tree

13 files changed

+191
-59
lines changed

13 files changed

+191
-59
lines changed

NAMESPACE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,12 @@ S3method(setopt,nanoSocket)
4444
S3method(setopt,nanoStream)
4545
S3method(start,nanoDialer)
4646
S3method(start,nanoListener)
47+
S3method(subscribe,nanoContext)
48+
S3method(subscribe,nanoSocket)
49+
S3method(survey_time,nanoContext)
50+
S3method(survey_time,nanoSocket)
51+
S3method(unsubscribe,nanoContext)
52+
S3method(unsubscribe,nanoSocket)
4753
export(.mirai_scm)
4854
export(call_aio)
4955
export(context)

NEWS.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,17 @@
22

33
#### New Features
44

5-
* `$context()` method added for creating new contexts from nano Objects using supported protocols (i.e. req, rep, surveyor, respondent) - this replaces the `context()` function for nano Objects.
5+
* `$context()` method added for creating new contexts from nano Objects using supported protocols (i.e. req, rep, sub, surveyor, respondent) - this replaces the `context()` function for nano Objects.
66
* Added convenience auxiliary functions `is_nano()` and `is_aio()`.
7+
* `subscribe()` / `unsubscribe()` now accept a topic of any atomic type (not just character), allowing pub/sub to be used when sending integer, double, logical, complex, or raw vectors.
78

89
#### Updates
910

10-
* `subscribe()` / `unsubscribe()` now accept a topic in any of the standard types (not just character), allowing pub/sub to be used when sending integer, double, logical, complex, or raw vectors.
11+
* Protocol-specific helpers `subscribe()`, `unsubscribe()`, and `survey_time()` gain nanoContext methods.
1112
* For receives, if an error occurs in unserialisation or data conversion (e.g. mode was incorrectly specified), the received raw vector is now available at both `$raw` and `$data` if `keep.raw = TRUE`.
1213
* Setting 'NANONEXT_TLS=1' now allows the downloaded NNG library to be built against a system mbedtls installation.
1314
* Setting 'NANONEXT_ARM' is no longer required on platforms such as Raspberry Pi - the package configure script should now detect platforms that require the libatomic linker flag to be set automatically.
14-
* Deprecated `send_ctx()`, `recv_ctx()` and logging are removed.
15+
* Deprecated `send_ctx()`, `recv_ctx()` and logging removed.
1516
* All-round internal performance optimisations.
1617

1718
# nanonext 0.4.0

R/context.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
#' created on a rep socket can each receive requests, and send replies to
1616
#' them, without any regard to or interference with each other.
1717
#'
18-
#' Note: only certain protocols support creation of separate contexts -
19-
#' namely req, rep, surveyor, respondent.
18+
#' Only the following protocols support creation of contexts: req, rep, sub
19+
#' (in a pub/sub pattern), surveyor, respondent.
2020
#'
2121
#' To send and receive over a context use \code{\link{send}} and
2222
#' \code{\link{recv}} or their async counterparts \code{\link{send_aio}} and

R/nano.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ nano <- function(protocol = c("pair", "bus", "req", "rep", "push", "pull",
134134
nano[["context"]] <- function() context(socket)
135135
},
136136
sub = {
137+
nano[["context"]] <- function() context(socket)
137138
nano[["subscribe"]] <- function(topic = NULL) subscribe(socket,
138139
topic = topic)
139140
nano[["unsubscribe"]] <- function(topic = NULL) unsubscribe(socket,

R/protocol.R

Lines changed: 78 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22

33
#' Subscribe Topic
44
#'
5-
#' For a socket using the sub protocol in a publisher/subscriber pattern. Set a
6-
#' topic to subscribe to.
5+
#' For a socket or context using the sub protocol in a publisher/subscriber
6+
#' pattern. Set a topic to subscribe to.
77
#'
8-
#' @param socket a Socket using the sub protocol.
9-
#' @param topic [default NULL] a topic (vector or NULL). The default NULL
8+
#' @param con a Socket or Context using the 'sub' protocol.
9+
#' @param topic [default NULL] an atomic type or NULL. The default NULL
1010
#' subscribes to all topics.
1111
#'
1212
#' @return Invisibly, an integer exit code (zero on success).
@@ -30,24 +30,45 @@
3030
#' send(pub, c("other", "this other topic will not be received"), mode = "raw")
3131
#' recv(sub, "character")
3232
#'
33+
#' subscribe(sub, 2)
34+
#' send(pub, c(2, 10, 10, 20), mode = "raw")
35+
#' recv(sub, "double", keep.raw = FALSE)
36+
#'
3337
#' close(pub)
3438
#' close(sub)
3539
#'
40+
#' @rdname subscribe
41+
#' @export
42+
#'
43+
subscribe <- function(con, topic = NULL) UseMethod("subscribe")
44+
45+
#' @rdname subscribe
46+
#' @method subscribe nanoSocket
3647
#' @export
3748
#'
38-
subscribe <- function(socket, topic = NULL) {
49+
subscribe.nanoSocket <- function(con, topic = NULL) {
3950

40-
invisible(.Call(rnng_socket_set, socket, 0L, "sub:subscribe", topic))
51+
invisible(.Call(rnng_socket_set, con, 0L, "sub:subscribe", topic))
52+
53+
}
54+
55+
#' @rdname subscribe
56+
#' @method subscribe nanoContext
57+
#' @export
58+
#'
59+
subscribe.nanoContext <- function(con, topic = NULL) {
60+
61+
invisible(.Call(rnng_ctx_set, con, 0L, "sub:subscribe", topic))
4162

4263
}
4364

4465
#' Unsubscribe Topic
4566
#'
46-
#' For a socket using the sub protocol in a publisher/subscriber pattern. Remove
47-
#' a topic from the subscription list.
67+
#' For a socket or context using the sub protocol in a publisher/subscriber
68+
#' pattern. Remove a topic from the subscription list.
4869
#'
49-
#' @param socket a Socket using the sub protocol.
50-
#' @param topic [default NULL] a topic (vector or NULL). The default NULL
70+
#' @param con a Socket or Context using the 'sub' protocol.
71+
#' @param topic [default NULL] an atomic type or NULL. The default NULL
5172
#' unsubscribes from all topics (if all topics were previously subscribed).
5273
#'
5374
#' @return Invisibly, an integer exit code (zero on success).
@@ -75,24 +96,46 @@ subscribe <- function(socket, topic = NULL) {
7596
#' send(pub, c("examples", "this example will not be received"), mode = "raw")
7697
#' recv(sub, "character")
7798
#'
99+
#' subscribe(sub, 2)
100+
#' send(pub, c(2, 10, 10, 20), mode = "raw")
101+
#' recv(sub, "double", keep.raw = FALSE)
102+
#'
78103
#' close(pub)
79104
#' close(sub)
80105
#'
106+
#' @rdname unsubscribe
81107
#' @export
82108
#'
83-
unsubscribe <- function(socket, topic = NULL) {
109+
unsubscribe <- function(con, topic = NULL) UseMethod("unsubscribe")
84110

85-
invisible(.Call(rnng_socket_set, socket, 0L, "sub:unsubscribe", topic))
111+
#' @rdname unsubscribe
112+
#' @method unsubscribe nanoSocket
113+
#' @export
114+
#'
115+
unsubscribe.nanoSocket <- function(con, topic = NULL) {
116+
117+
invisible(.Call(rnng_socket_set, con, 0L, "sub:unsubscribe", topic))
118+
119+
}
120+
121+
#' @rdname unsubscribe
122+
#' @method unsubscribe nanoContext
123+
#' @export
124+
#'
125+
unsubscribe.nanoContext <- function(con, topic = NULL) {
126+
127+
invisible(.Call(rnng_ctx_set, con, 0L, "sub:unsubscribe", topic))
86128

87129
}
88130

89131
#' Set Survey Time
90132
#'
91-
#' For a socket using the surveyor protocol in a surveyor/respondent pattern.
92-
#' Set a survey timeout in ms (remains valid for all subsequent surveys).
93-
#' Messages received by the surveyor after the timer has ended are discarded.
133+
#' For a socket or context using the surveyor protocol in a surveyor/respondent
134+
#' pattern. Set a survey timeout in ms (remains valid for all subsequent
135+
#' surveys). Messages received by the surveyor after the timer has ended are
136+
#' discarded.
94137
#'
95-
#' @param socket a Socket or Context using the surveyor protocol.
138+
#' @param con a Socket or Context using the 'surveyor' protocol.
96139
#' @param time the survey timeout in ms.
97140
#'
98141
#' @return Invisibly, an integer exit code (zero on success).
@@ -126,11 +169,28 @@ unsubscribe <- function(socket, topic = NULL) {
126169
#' close(sur)
127170
#' close(res)
128171
#'
172+
#' @rdname survey_time
173+
#' @export
174+
#'
175+
survey_time <- function(con, time) UseMethod("survey_time")
176+
177+
#' @rdname survey_time
178+
#' @method survey_time nanoSocket
179+
#' @export
180+
#'
181+
survey_time.nanoSocket <- function(con, time) {
182+
183+
invisible(.Call(rnng_socket_set, con, 3L, "surveyor:survey-time", time))
184+
185+
}
186+
187+
#' @rdname survey_time
188+
#' @method survey_time nanoContext
129189
#' @export
130190
#'
131-
survey_time <- function(socket, time) {
191+
survey_time.nanoContext <- function(con, time) {
132192

133-
invisible(.Call(rnng_socket_set, socket, 3L, "surveyor:survey-time", time))
193+
invisible(.Call(rnng_ctx_set, con, 3L, "surveyor:survey-time", time))
134194

135195
}
136196

README.Rmd

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,16 @@ sub |> recv("character", keep.raw = FALSE)
348348
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
349349
sub |> recv(mode = "character", keep.raw = FALSE)
350350
351+
```
352+
353+
The subscribed topic can be any atomic type (not just character), allowing integer, double, logical, complex and raw vectors to be sent as well.
354+
355+
```{r pub2}
356+
357+
sub |> subscribe(topic = 1)
358+
pub |> send(c(1, 10, 10, 20), mode = "raw", echo = FALSE)
359+
sub |> recv(mode = "double", keep.raw = FALSE)
360+
351361
close(pub)
352362
close(sub)
353363

README.md

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ Send message from ‘nano1’:
105105

106106
``` r
107107
nano1$send("hello world!")
108-
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
108+
#> [1] 58 0a 00 00 00 03 00 04 02 00 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
109109
#> [26] 00 10 00 00 00 01 00 04 00 09 00 00 00 0c 68 65 6c 6c 6f 20 77 6f 72 6c 64
110110
#> [51] 21
111111
```
@@ -115,7 +115,7 @@ Receive message using ‘nano2’:
115115
``` r
116116
nano2$recv()
117117
#> $raw
118-
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
118+
#> [1] 58 0a 00 00 00 03 00 04 02 00 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
119119
#> [26] 00 10 00 00 00 01 00 04 00 09 00 00 00 0c 68 65 6c 6c 6f 20 77 6f 72 6c 64
120120
#> [51] 21
121121
#>
@@ -146,7 +146,7 @@ Send message from ‘socket1’:
146146

147147
``` r
148148
send(socket1, "hello world!")
149-
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
149+
#> [1] 58 0a 00 00 00 03 00 04 02 00 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
150150
#> [26] 00 10 00 00 00 01 00 04 00 09 00 00 00 0c 68 65 6c 6c 6f 20 77 6f 72 6c 64
151151
#> [51] 21
152152
```
@@ -156,7 +156,7 @@ Receive message using ‘socket2’:
156156
``` r
157157
recv(socket2)
158158
#> $raw
159-
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
159+
#> [1] 58 0a 00 00 00 03 00 04 02 00 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
160160
#> [26] 00 10 00 00 00 01 00 04 00 09 00 00 00 0c 68 65 6c 6c 6f 20 77 6f 72 6c 64
161161
#> [51] 21
162162
#>
@@ -285,7 +285,7 @@ msg$data
285285
#> a b
286286
#> 1 1 2
287287
msg$raw
288-
#> [1] 58 0a 00 00 00 03 00 04 01 02 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
288+
#> [1] 58 0a 00 00 00 03 00 04 02 00 00 03 05 00 00 00 00 05 55 54 46 2d 38 00 00
289289
#> [26] 03 13 00 00 00 02 00 00 00 0e 00 00 00 01 3f f0 00 00 00 00 00 00 00 00 00
290290
#> [51] 0e 00 00 00 01 40 00 00 00 00 00 00 00 00 00 04 02 00 00 00 01 00 04 00 09
291291
#> [76] 00 00 00 05 6e 61 6d 65 73 00 00 00 10 00 00 00 02 00 04 00 09 00 00 00 01
@@ -388,7 +388,7 @@ aio
388388
#> < recvAio >
389389
#> - $data for message data
390390
aio$data |> str()
391-
#> num [1:100000000] -0.901 0.2851 -0.0205 0.1886 1.3892 ...
391+
#> num [1:100000000] 0.173 -0.103 -0.522 -0.812 0.532 ...
392392
```
393393

394394
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -446,6 +446,18 @@ sub |> recv("character", keep.raw = FALSE)
446446
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
447447
sub |> recv(mode = "character", keep.raw = FALSE)
448448
#> [1] "examples" "this example will still be received"
449+
```
450+
451+
The subscribed topic can be any atomic type (not just character),
452+
allowing integer, double, logical, complex and raw vectors to be sent as
453+
well.
454+
455+
``` r
456+
457+
sub |> subscribe(topic = 1)
458+
pub |> send(c(1, 10, 10, 20), mode = "raw", echo = FALSE)
459+
sub |> recv(mode = "double", keep.raw = FALSE)
460+
#> [1] 1 10 10 20
449461

450462
close(pub)
451463
close(sub)
@@ -528,11 +540,11 @@ ncurl("http://httpbin.org/headers")
528540
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
529541
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
530542
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
531-
#> [76] 2d 36 32 37 33 61 65 38 65 2d 37 33 31 61 36 62 35 30 32 30 65 63 36 30 31
532-
#> [101] 32 35 32 62 64 66 32 39 38 22 0a 20 20 7d 0a 7d 0a
543+
#> [76] 2d 36 32 37 34 35 31 35 30 2d 32 61 62 61 34 61 31 61 36 39 63 37 36 61 66
544+
#> [101] 62 32 38 30 61 30 65 66 62 22 0a 20 20 7d 0a 7d 0a
533545
#>
534546
#> $data
535-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6273ae8e-731a6b5020ec601252bdf298\"\n }\n}\n"
547+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62745150-2aba4a1a69c76afb280a0efb\"\n }\n}\n"
536548
```
537549

538550
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -548,7 +560,7 @@ res
548560
#> - $raw for raw message
549561

550562
call_aio(res)$data
551-
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6273ae8f-5a31583a2f9f02b8619845b7\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"79.173.189.204\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
563+
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62745150-4a03a30d6fcd147d71c51fdf\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
552564
```
553565

554566
In this respect, it may be used as a performant and lightweight method

man/context.Rd

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/subscribe.Rd

Lines changed: 15 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)