Skip to content

Commit 27256bd

Browse files
committed
cleaner Aio structure and UI
1 parent ec22e8f commit 27256bd

File tree

10 files changed

+169
-49
lines changed

10 files changed

+169
-49
lines changed

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ S3method("$",nano)
44
S3method("[",nanoSocket)
55
S3method("[[",nano)
66
S3method(.DollarNames,nano)
7+
S3method(.DollarNames,recvAio)
8+
S3method(.DollarNames,sendAio)
79
S3method(close,nanoContext)
810
S3method(close,nanoDialer)
911
S3method(close,nanoListener)

R/aio.R

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@
4646
#' call_aio(res)
4747
#' res$result
4848
#'
49-
#' res <- recv_aio(s2, timeout = 100)
50-
#' res
51-
#' call_aio(res)$data
49+
#' msg <- recv_aio(s2, timeout = 100)
50+
#' msg
51+
#' call_aio(msg)$data
5252
#'
5353
#' close(s1)
5454
#' close(s2)
@@ -62,8 +62,8 @@ call_aio <- function(aio) {
6262
if (inherits(aio, "recvAio")) {
6363

6464
res <- .Call(rnng_aio_wait_get_msg, .subset2(aio, "aio"))
65-
mode <- .subset2(aio, "callparams")[[1L]]
66-
keep.raw <- .subset2(aio, "callparams")[[2L]]
65+
mode <- .subset2(aio, "mode")
66+
keep.raw <- .subset2(aio, "keep.raw")
6767
is.integer(res) && {
6868
if (keep.raw) {
6969
rm("raw", envir = aio)
@@ -79,7 +79,6 @@ call_aio <- function(aio) {
7979
rm("data", envir = aio)
8080
`[[<-`(aio, "data", res)
8181
rm("aio", envir = aio)
82-
rm("callparams", envir = aio)
8382
return(invisible(aio))
8483
})
8584
data <- switch(mode,
@@ -95,7 +94,6 @@ call_aio <- function(aio) {
9594
rm("data", envir = aio)
9695
`[[<-`(aio, "data", data)
9796
rm("aio", envir = aio)
98-
rm("callparams", envir = aio)
9997

10098
} else if (inherits(aio, "sendAio")) {
10199

R/context.R

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,8 @@ request <- function(context,
324324
return(invisible(aio))
325325
}
326326
env <- `class<-`(new.env(), "recvAio")
327-
`[[<-`(env, "callparams", list(recv_mode, keep.raw))
327+
`[[<-`(env, "mode", recv_mode)
328+
`[[<-`(env, "keep.raw", keep.raw)
328329
data <- raw <- resolv <- NULL
329330
if (keep.raw) {
330331
makeActiveBinding(sym = "raw", fun = function(x) {

R/nano.R

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,8 @@ print.nanoListener <- function(x, ...) {
216216
#'
217217
print.recvAio <- function(x, ...) {
218218

219-
cat("< recvAio >\n - $raw for raw message\n - $data for message data\n", file = stdout())
219+
cat("< recvAio >\n - $data for message data\n",
220+
if (.subset2(x, "keep.raw")) "- $raw for raw message\n", file = stdout())
220221
invisible(x)
221222

222223
}
@@ -262,6 +263,22 @@ print.sendAio <- function(x, ...) {
262263

263264
}
264265

266+
#' @export
267+
#'
268+
.DollarNames.recvAio <- function(x, pattern = "") {
269+
270+
grep(pattern, c("data", if (.subset2(x, "keep.raw")) "raw"), value = TRUE, fixed = TRUE)
271+
272+
}
273+
274+
#' @export
275+
#'
276+
.DollarNames.sendAio <- function(x, pattern = "") {
277+
278+
grep(pattern, "result", value = TRUE, fixed = TRUE)
279+
280+
}
281+
265282
#' @export
266283
#'
267284
print.unresolvedValue <- function(x, ...) {

R/sendrecv.R

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,13 @@ recv <- function(socket,
200200
#' send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
201201
#' res <- recv_aio(s2, timeout = 100, keep.raw = FALSE)
202202
#' res
203-
#' call_aio(res)
204-
#' res
203+
#' res$data
205204
#'
206205
#' send_aio(s1, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
207206
#' res <- recv_aio(s2, mode = "double", timeout = 100)
208-
#' call_aio(res)
209207
#' res
208+
#' res$raw
209+
#' res$data
210210
#'
211211
#' send_aio(s1, "example message", mode = "raw", timeout = 100)
212212
#' res <- recv_aio(s2, mode = "character", timeout = 100)
@@ -234,7 +234,8 @@ recv_aio <- function(socket,
234234
return(invisible(aio))
235235
}
236236
env <- `class<-`(new.env(), "recvAio")
237-
`[[<-`(env, "callparams", list(mode, keep.raw))
237+
`[[<-`(env, "mode", mode)
238+
`[[<-`(env, "keep.raw", keep.raw)
238239
data <- raw <- resolv <- NULL
239240
if (keep.raw) {
240241
makeActiveBinding(sym = "raw", fun = function(x) {

README.Rmd

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ Implemented transports:
4949
4. [Async and Concurrency](#async-and-concurrency)
5050
5. [RPC and Distributed Computing](#rpc-and-distributed-computing)
5151
6. [Publisher / Subscriber Model](#publisher-subscriber-model)
52-
7. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
53-
8. [Building from source](#building-from-source)
54-
9. [Links](#links)
52+
7. [Surveyor / Repondent Model](#surveyor-respondent-model)
53+
8. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
54+
9. [Building from source](#building-from-source)
55+
10. [Links](#links)
5556

5657
### Installation
5758

@@ -275,26 +276,29 @@ In such a case, using `call_aio()` confirms that the operation has completed (or
275276

276277
### Publisher Subscriber Model
277278

278-
{nanonext} fully implements NNG's pub/sub protocol as per the below example.
279+
{nanonext} fully implements NNG's pub/sub protocol as per the below example.
279280

280281
The built-in logging levels are also demonstrated here. NNG errors are always output to stderr and operation is otherwise silent by default. To enable key information events to be sent to stdout, use `logging(level = "info")`.
281282

282283
The log level can also be set externally in production environments via an environment variable `NANONEXT_LOG`.
283284

284285
```{r pub}
286+
287+
# set logging level to include information events ------------------------------
285288
logging(level = "info")
286289
287290
pub <- socket("pub", listen = "inproc://nanobroadcast")
288291
sub <- socket("sub", dial = "inproc://nanobroadcast")
289292
293+
# subscribing to a specific topic 'examples' -----------------------------------
290294
sub |> subscribe(topic = "examples")
291295
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
292296
sub |> recv(mode = "character", keep.raw = FALSE)
293297
294298
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
295299
sub |> recv(mode = "character", keep.raw = FALSE)
296300
297-
# specify NULL to subscribe to ALL topics
301+
# specify NULL to subscribe to ALL topics --------------------------------------
298302
sub |> subscribe(topic = NULL)
299303
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
300304
sub |> recv("character", keep.raw = FALSE)
@@ -303,17 +307,61 @@ sub |> unsubscribe(topic = NULL)
303307
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
304308
sub |> recv("character", keep.raw = FALSE)
305309
306-
# however the topics explicitly subscribed to are still received
310+
# however the topics explicitly subscribed to are still received ---------------
307311
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
308312
sub |> recv(mode = "character", keep.raw = FALSE)
309313
314+
# set logging level back to the default of errors only -------------------------
315+
logging(level = "error")
316+
310317
close(pub)
311318
close(sub)
312319
313320
```
314321

315322
[&laquo; Back to ToC](#table-of-contents)
316323

324+
### Surveyor Respondent Model
325+
326+
This type of topology is useful for applications such as service discovery.
327+
328+
```{r survey}
329+
330+
sur <- socket("surveyor", listen = "inproc://nanoservice")
331+
res1 <- socket("respondent", dial = "inproc://nanoservice")
332+
res2 <- socket("respondent", dial = "inproc://nanoservice")
333+
334+
# sur sets a survey timeout, applying to this and subsequent surveys -----------
335+
sur |> survey_time(500)
336+
337+
# sur sends a message and then requests 2 async receives -----------------------
338+
sur |> send("service check", echo = FALSE)
339+
aio1 <- sur |> recv_aio()
340+
aio2 <- sur |> recv_aio()
341+
342+
# res1 receives the message and replies using an aio send function -------------
343+
res1 |> recv(keep.raw = FALSE)
344+
res1 |> send_aio("res1")
345+
346+
# res2 receives the message but fails to reply ---------------------------------
347+
res2 |> recv(keep.raw = FALSE)
348+
349+
# checking the aio - only the first will have resolved -------------------------
350+
aio1$data
351+
aio2$data
352+
353+
# after the survey expires, the second resolves into a timeout error -----------
354+
Sys.sleep(0.5)
355+
aio2$data
356+
357+
close(sur)
358+
close(res1)
359+
close(res2)
360+
361+
```
362+
363+
[&laquo; Back to ToC](#table-of-contents)
364+
317365
### ncurl Minimalist http Client
318366

319367
`ncurl()` is a minimalistic http(s) client. In normal use, it takes only one argument, the URL. It can follow redirects.

README.md

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ Implemented transports:
4949
4. [Async and Concurrency](#async-and-concurrency)
5050
5. [RPC and Distributed Computing](#rpc-and-distributed-computing)
5151
6. [Publisher / Subscriber Model](#publisher-subscriber-model)
52-
7. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
53-
8. [Building from source](#building-from-source)
54-
9. [Links](#links)
52+
7. [Surveyor / Repondent Model](#surveyor-respondent-model)
53+
8. [ncurl Minimalist http Client](#ncurl-minimalist-http-client)
54+
9. [Building from source](#building-from-source)
55+
10. [Links](#links)
5556

5657
### Installation
5758

@@ -253,8 +254,8 @@ msg <- recv_aio(s2)
253254
call_aio(msg)
254255
msg
255256
#> < recvAio >
256-
#> - $raw for raw message
257257
#> - $data for message data
258+
#> - $raw for raw message
258259
msg$data
259260
#> a b
260261
#> 1 1 2
@@ -325,10 +326,9 @@ call_aio(aio)
325326

326327
aio
327328
#> < recvAio >
328-
#> - $raw for raw message
329329
#> - $data for message data
330330
str(aio$data)
331-
#> num [1:100000000] 0.837 1.301 2.225 1.43 -1.543 ...
331+
#> num [1:100000000] -2.12 -0.152 -0.485 1.971 0.061 ...
332332
```
333333

334334
In this example the calculation is returned, but other operations may
@@ -354,48 +354,100 @@ The log level can also be set externally in production environments via
354354
an environment variable `NANONEXT_LOG`.
355355

356356
``` r
357+
# set logging level to include information events ------------------------------
357358
logging(level = "info")
358-
#> 2022-03-03 10:51:39 [ log level ] set to: info
359+
#> 2022-03-03 16:31:20 [ log level ] set to: info
359360

360361
pub <- socket("pub", listen = "inproc://nanobroadcast")
361-
#> 2022-03-03 10:51:39 [ sock open ] id: 9 | protocol: pub
362-
#> 2022-03-03 10:51:39 [ list start ] sock: 9 | url: inproc://nanobroadcast
362+
#> 2022-03-03 16:31:20 [ sock open ] id: 9 | protocol: pub
363+
#> 2022-03-03 16:31:20 [ list start ] sock: 9 | url: inproc://nanobroadcast
363364
sub <- socket("sub", dial = "inproc://nanobroadcast")
364-
#> 2022-03-03 10:51:39 [ sock open ] id: 10 | protocol: sub
365-
#> 2022-03-03 10:51:39 [ dial start ] sock: 10 | url: inproc://nanobroadcast
365+
#> 2022-03-03 16:31:20 [ sock open ] id: 10 | protocol: sub
366+
#> 2022-03-03 16:31:20 [ dial start ] sock: 10 | url: inproc://nanobroadcast
366367

368+
# subscribing to a specific topic 'examples' -----------------------------------
367369
sub |> subscribe(topic = "examples")
368-
#> 2022-03-03 10:51:39 [ subscribe ] sock: 10 | topic: examples
370+
#> 2022-03-03 16:31:20 [ subscribe ] sock: 10 | topic: examples
369371
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
370372
sub |> recv(mode = "character", keep.raw = FALSE)
371373
#> [1] "examples" "this is an example"
372374

373375
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
374376
sub |> recv(mode = "character", keep.raw = FALSE)
375-
#> 2022-03-03 10:51:39 [ 8 ] Try again
377+
#> 2022-03-03 16:31:20 [ 8 ] Try again
376378

377-
# specify NULL to subscribe to ALL topics
379+
# specify NULL to subscribe to ALL topics --------------------------------------
378380
sub |> subscribe(topic = NULL)
379-
#> 2022-03-03 10:51:39 [ subscribe ] sock: 10 | topic: ALL
381+
#> 2022-03-03 16:31:20 [ subscribe ] sock: 10 | topic: ALL
380382
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
381383
sub |> recv("character", keep.raw = FALSE)
382384
#> [1] "newTopic" "this is a new topic"
383385

384386
sub |> unsubscribe(topic = NULL)
385-
#> 2022-03-03 10:51:39 [ unsubscribe ] sock: 10 | topic: ALL
387+
#> 2022-03-03 16:31:20 [ unsubscribe ] sock: 10 | topic: ALL
386388
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
387389
sub |> recv("character", keep.raw = FALSE)
388-
#> 2022-03-03 10:51:39 [ 8 ] Try again
390+
#> 2022-03-03 16:31:20 [ 8 ] Try again
389391

390-
# however the topics explicitly subscribed to are still received
392+
# however the topics explicitly subscribed to are still received ---------------
391393
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
392394
sub |> recv(mode = "character", keep.raw = FALSE)
393395
#> [1] "examples" "this example will still be received"
394396

397+
# set logging level back to the default of errors only -------------------------
398+
logging(level = "error")
399+
#> 2022-03-03 16:31:20 [ log level ] set to: error
400+
395401
close(pub)
396-
#> 2022-03-03 10:51:39 [ sock close ] id: 9 | protocol: pub
397402
close(sub)
398-
#> 2022-03-03 10:51:39 [ sock close ] id: 10 | protocol: sub
403+
```
404+
405+
[« Back to ToC](#table-of-contents)
406+
407+
### Surveyor Respondent Model
408+
409+
This type of topology is useful for applications such as service
410+
discovery.
411+
412+
``` r
413+
sur <- socket("surveyor", listen = "inproc://nanoservice")
414+
res1 <- socket("respondent", dial = "inproc://nanoservice")
415+
res2 <- socket("respondent", dial = "inproc://nanoservice")
416+
417+
# sur sets a survey timeout, applying to this and subsequent surveys -----------
418+
sur |> survey_time(500)
419+
420+
# sur sends a message and then requests 2 async receives -----------------------
421+
sur |> send("service check", echo = FALSE)
422+
aio1 <- sur |> recv_aio()
423+
aio2 <- sur |> recv_aio()
424+
425+
# res1 receives the message and replies using an aio send function -------------
426+
res1 |> recv(keep.raw = FALSE)
427+
#> [1] "service check"
428+
res1 |> send_aio("res1")
429+
#> < sendAio >
430+
#> - $result for send result
431+
432+
# res2 receives the message but fails to reply ---------------------------------
433+
res2 |> recv(keep.raw = FALSE)
434+
#> [1] "service check"
435+
436+
# checking the aio - only the first will have resolved -------------------------
437+
aio1$data
438+
#> [1] "res1"
439+
aio2$data
440+
#> < unresolved value >
441+
442+
# after the survey expires, the second resolves into a timeout error -----------
443+
Sys.sleep(0.5)
444+
aio2$data
445+
#> 2022-03-03 16:31:21 [ 5 ] Timed out
446+
#> [1] 5
447+
448+
close(sur)
449+
close(res1)
450+
close(res2)
399451
```
400452

401453
[« Back to ToC](#table-of-contents)
@@ -411,11 +463,11 @@ ncurl("http://httpbin.org/headers")
411463
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
412464
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
413465
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
414-
#> [76] 2d 36 32 32 30 39 64 62 62 2d 34 61 61 66 66 65 31 37 32 61 64 64 35 39 35
415-
#> [101] 35 37 33 31 62 35 34 30 30 22 0a 20 20 7d 0a 7d 0a
466+
#> [76] 2d 36 32 32 30 65 64 35 39 2d 32 31 64 36 39 31 30 32 31 32 62 30 63 39 34
467+
#> [101] 64 30 65 61 36 32 63 31 30 22 0a 20 20 7d 0a 7d 0a
416468
#>
417469
#> $data
418-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62209dbb-4aaffe172add5955731b5400\"\n }\n}\n"
470+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6220ed59-21d6910212b0c94d0ea62c10\"\n }\n}\n"
419471
```
420472

421473
For advanced use, supports additional HTTP methods such as POST or PUT.

0 commit comments

Comments
 (0)