Skip to content

Commit 1a0da5d

Browse files
committed
tidy Aio functions, introduce 'errorValue'
1 parent 27256bd commit 1a0da5d

File tree

13 files changed

+89
-133
lines changed

13 files changed

+89
-133
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.2.0.9004
4+
Version: 0.2.0.9005
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library providing high-performance scalability protocols,
77
implementing a cross-platform standard for messaging and communications.

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ S3method(close,nanoContext)
1010
S3method(close,nanoDialer)
1111
S3method(close,nanoListener)
1212
S3method(close,nanoSocket)
13+
S3method(print,errorValue)
1314
S3method(print,nanoContext)
1415
S3method(print,nanoDialer)
1516
S3method(print,nanoListener)

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
# nanonext 0.2.0.9004 (development)
1+
# nanonext 0.2.0.9005 (development)
22

33
#### New Features
44

55
* Aio values `$result`, `$raw` or `$data` now resolve without requiring `call_aio()`. Access the values directly and an NA 'unresolved value' will be returned if the Aio operation is yet to complete.
6+
* Integer error values generated by all receive functions are now classed 'errorValue' to be immediately distinguishable from possible message values.
67
* `unresolved()` added as an auxiliary function to query whether an Aio is unresolved, for use in control flow statements.
78
* `is_nul_byte()` added as a helper function for request/reply setups.
89
* `survey_time()` added as a convenience function for surveyor/respondent patterns.

R/aio.R

Lines changed: 3 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -57,54 +57,12 @@
5757
#'
5858
call_aio <- function(aio) {
5959

60-
is.null(.subset2(aio, "aio")) && return(invisible(aio))
61-
60+
.Call(rnng_aio_call, .subset2(aio, "aio")) && return(invisible(aio))
6261
if (inherits(aio, "recvAio")) {
63-
64-
res <- .Call(rnng_aio_wait_get_msg, .subset2(aio, "aio"))
65-
mode <- .subset2(aio, "mode")
66-
keep.raw <- .subset2(aio, "keep.raw")
67-
is.integer(res) && {
68-
if (keep.raw) {
69-
rm("raw", envir = aio)
70-
`[[<-`(aio, "raw", res)
71-
}
72-
rm("data", envir = aio)
73-
`[[<-`(aio, "data", res)
74-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
75-
return(invisible(aio))
76-
}
77-
on.exit(expr = {
78-
if (keep.raw) rm("raw", envir = aio)
79-
rm("data", envir = aio)
80-
`[[<-`(aio, "data", res)
81-
rm("aio", envir = aio)
82-
return(invisible(aio))
83-
})
84-
data <- switch(mode,
85-
serial = unserialize(connection = res),
86-
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
87-
raw = res,
88-
readBin(con = res, what = mode, n = length(res)))
89-
on.exit()
90-
if (keep.raw) {
91-
rm("raw", envir = aio)
92-
`[[<-`(aio, "raw", res)
93-
}
94-
rm("data", envir = aio)
95-
`[[<-`(aio, "data", data)
96-
rm("aio", envir = aio)
97-
62+
.subset2(aio, "data")
9863
} else if (inherits(aio, "sendAio")) {
99-
100-
res <- .Call(rnng_aio_wait_result, .subset2(aio, "aio"))
101-
rm("result", envir = aio)
102-
`[[<-`(aio, "result", res)
103-
rm("aio", envir = aio)
104-
if (res) message(Sys.time(), " [ ", res, " ] ", nng_error(res))
105-
64+
.subset2(aio, "result")
10665
}
107-
10866
invisible(aio)
10967

11068
}

R/context.R

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ recv_ctx <- function(context,
145145
res <- .Call(rnng_ctx_recv, context, timeout)
146146
is.integer(res) && {
147147
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
148-
return(invisible(res))
148+
return(invisible(`class<-`(res, "errorValue")))
149149
}
150150
on.exit(expr = return(res))
151151
data <- switch(mode,
@@ -231,7 +231,7 @@ reply <- function(context,
231231
res <- .Call(rnng_ctx_recv, context, timeout)
232232
is.integer(res) && {
233233
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
234-
return(invisible(res))
234+
return(invisible(`class<-`(res, "errorValue")))
235235
}
236236
on.exit(expr = send_aio(context, as.raw(0L), mode = send_mode))
237237
data <- switch(recv_mode,
@@ -321,21 +321,19 @@ request <- function(context,
321321
aio <- .Call(rnng_recv_aio, context, timeout)
322322
is.integer(aio) && {
323323
message(Sys.time(), " [ ", aio, " ] ", nng_error(aio))
324-
return(invisible(aio))
324+
return(invisible(`class<-`(aio, "errorValue")))
325325
}
326326
env <- `class<-`(new.env(), "recvAio")
327-
`[[<-`(env, "mode", recv_mode)
328-
`[[<-`(env, "keep.raw", keep.raw)
329327
data <- raw <- resolv <- NULL
330328
if (keep.raw) {
331329
makeActiveBinding(sym = "raw", fun = function(x) {
332330
if (is.null(resolv)) {
333331
res <- .Call(rnng_aio_get_msg, aio)
334332
missing(res) && return(.Call(rnng_aio_unresolv))
335333
is.integer(res) && {
336-
res <<- raw <<- resolv <<- res
334+
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
337335
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
338-
return(invisible(res))
336+
return(invisible(resolv))
339337
}
340338
on.exit(expr = {
341339
raw <<- res
@@ -360,9 +358,9 @@ request <- function(context,
360358
res <- .Call(rnng_aio_get_msg, aio)
361359
missing(res) && return(.Call(rnng_aio_unresolv))
362360
is.integer(res) && {
363-
res <<- raw <<- resolv <<- res
361+
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
364362
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
365-
return(invisible(res))
363+
return(invisible(resolv))
366364
}
367365
on.exit(expr = {
368366
data <<- res
@@ -381,6 +379,7 @@ request <- function(context,
381379
}
382380
data
383381
}, env = env)
382+
`[[<-`(env, "keep.raw", keep.raw)
384383
`[[<-`(env, "aio", aio)
385384

386385
}

R/nano.R

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,3 +288,12 @@ print.unresolvedValue <- function(x, ...) {
288288

289289
}
290290

291+
#' @export
292+
#'
293+
print.errorValue <- function(x, ...) {
294+
295+
cat("'errorValue' int", x, "\n", file = stdout())
296+
invisible(x)
297+
298+
}
299+

R/sendrecv.R

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -95,13 +95,13 @@ send_aio <- function(socket, data, mode = c("serial", "raw"), timeout) {
9595
return(invisible(aio))
9696
}
9797
env <- `class<-`(new.env(), "sendAio")
98-
result <- resolv <- NULL
98+
result <- NULL
9999
makeActiveBinding(sym = "result", fun = function(x) {
100-
if (is.null(resolv)) {
101-
result <- .Call(rnng_aio_result, aio)
102-
missing(result) && return(.Call(rnng_aio_unresolv))
103-
result <<- result
104-
resolv <<- 0L
100+
if (is.null(result)) {
101+
res <- .Call(rnng_aio_result, aio)
102+
missing(res) && return(.Call(rnng_aio_unresolv))
103+
if (res) message(Sys.time(), " [ ", res, " ] ", nng_error(res))
104+
result <<- res
105105
}
106106
result
107107
}, env = env)
@@ -163,7 +163,7 @@ recv <- function(socket,
163163
res <- .Call(rnng_recv, socket, block)
164164
is.integer(res) && {
165165
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
166-
return(invisible(res))
166+
return(invisible(`class<-`(res, "errorValue")))
167167
}
168168
on.exit(expr = return(res))
169169
data <- switch(mode,
@@ -231,21 +231,19 @@ recv_aio <- function(socket,
231231
aio <- .Call(rnng_recv_aio, socket, timeout)
232232
is.integer(aio) && {
233233
message(Sys.time(), " [ ", aio, " ] ", nng_error(aio))
234-
return(invisible(aio))
234+
return(invisible(`class<-`(aio, "errorValue")))
235235
}
236236
env <- `class<-`(new.env(), "recvAio")
237-
`[[<-`(env, "mode", mode)
238-
`[[<-`(env, "keep.raw", keep.raw)
239237
data <- raw <- resolv <- NULL
240238
if (keep.raw) {
241239
makeActiveBinding(sym = "raw", fun = function(x) {
242240
if (is.null(resolv)) {
243241
res <- .Call(rnng_aio_get_msg, aio)
244242
missing(res) && return(.Call(rnng_aio_unresolv))
245243
is.integer(res) && {
246-
res <<- raw <<- resolv <<- res
244+
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
247245
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
248-
return(invisible(res))
246+
return(invisible(resolv))
249247
}
250248
on.exit(expr = {
251249
raw <<- res
@@ -270,9 +268,9 @@ recv_aio <- function(socket,
270268
res <- .Call(rnng_aio_get_msg, aio)
271269
missing(res) && return(.Call(rnng_aio_unresolv))
272270
is.integer(res) && {
273-
res <<- raw <<- resolv <<- res
271+
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
274272
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
275-
return(invisible(res))
273+
return(invisible(resolv))
276274
}
277275
on.exit(expr = {
278276
data <<- res
@@ -291,6 +289,7 @@ recv_aio <- function(socket,
291289
}
292290
data
293291
}, env = env)
292+
`[[<-`(env, "keep.raw", keep.raw)
294293
`[[<-`(env, "aio", aio)
295294

296295
}

README.Rmd

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,9 @@ close(sub)
323323

324324
### Surveyor Respondent Model
325325

326-
This type of topology is useful for applications such as service discovery.
326+
This type of pattern is useful for applications such as service discovery.
327+
328+
A surveyor sends a survey, which is broadcast to all peer respondents. The respondents then have a chance to reply (but are not obliged to). The survey itself is a timed event, such that responses received after the timeout are discarded.
327329

328330
```{r survey}
329331

README.md

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ aio
328328
#> < recvAio >
329329
#> - $data for message data
330330
str(aio$data)
331-
#> num [1:100000000] -2.12 -0.152 -0.485 1.971 0.061 ...
331+
#> num [1:100000000] -1.211 -0.218 0.105 -0.608 0.256 ...
332332
```
333333

334334
In this example the calculation is returned, but other operations may
@@ -356,38 +356,38 @@ an environment variable `NANONEXT_LOG`.
356356
``` r
357357
# set logging level to include information events ------------------------------
358358
logging(level = "info")
359-
#> 2022-03-03 16:31:20 [ log level ] set to: info
359+
#> 2022-03-03 23:42:57 [ log level ] set to: info
360360

361361
pub <- socket("pub", listen = "inproc://nanobroadcast")
362-
#> 2022-03-03 16:31:20 [ sock open ] id: 9 | protocol: pub
363-
#> 2022-03-03 16:31:20 [ list start ] sock: 9 | url: inproc://nanobroadcast
362+
#> 2022-03-03 23:42:57 [ sock open ] id: 9 | protocol: pub
363+
#> 2022-03-03 23:42:57 [ list start ] sock: 9 | url: inproc://nanobroadcast
364364
sub <- socket("sub", dial = "inproc://nanobroadcast")
365-
#> 2022-03-03 16:31:20 [ sock open ] id: 10 | protocol: sub
366-
#> 2022-03-03 16:31:20 [ dial start ] sock: 10 | url: inproc://nanobroadcast
365+
#> 2022-03-03 23:42:57 [ sock open ] id: 10 | protocol: sub
366+
#> 2022-03-03 23:42:57 [ dial start ] sock: 10 | url: inproc://nanobroadcast
367367

368368
# subscribing to a specific topic 'examples' -----------------------------------
369369
sub |> subscribe(topic = "examples")
370-
#> 2022-03-03 16:31:20 [ subscribe ] sock: 10 | topic: examples
370+
#> 2022-03-03 23:42:57 [ subscribe ] sock: 10 | topic: examples
371371
pub |> send(c("examples", "this is an example"), mode = "raw", echo = FALSE)
372372
sub |> recv(mode = "character", keep.raw = FALSE)
373373
#> [1] "examples" "this is an example"
374374

375375
pub |> send(c("other", "this other topic will not be received"), mode = "raw", echo = FALSE)
376376
sub |> recv(mode = "character", keep.raw = FALSE)
377-
#> 2022-03-03 16:31:20 [ 8 ] Try again
377+
#> 2022-03-03 23:42:57 [ 8 ] Try again
378378

379379
# specify NULL to subscribe to ALL topics --------------------------------------
380380
sub |> subscribe(topic = NULL)
381-
#> 2022-03-03 16:31:20 [ subscribe ] sock: 10 | topic: ALL
381+
#> 2022-03-03 23:42:57 [ subscribe ] sock: 10 | topic: ALL
382382
pub |> send(c("newTopic", "this is a new topic"), mode = "raw", echo = FALSE)
383383
sub |> recv("character", keep.raw = FALSE)
384384
#> [1] "newTopic" "this is a new topic"
385385

386386
sub |> unsubscribe(topic = NULL)
387-
#> 2022-03-03 16:31:20 [ unsubscribe ] sock: 10 | topic: ALL
387+
#> 2022-03-03 23:42:57 [ unsubscribe ] sock: 10 | topic: ALL
388388
pub |> send(c("newTopic", "this topic will now not be received"), mode = "raw", echo = FALSE)
389389
sub |> recv("character", keep.raw = FALSE)
390-
#> 2022-03-03 16:31:20 [ 8 ] Try again
390+
#> 2022-03-03 23:42:57 [ 8 ] Try again
391391

392392
# however the topics explicitly subscribed to are still received ---------------
393393
pub |> send(c("examples", "this example will still be received"), mode = "raw", echo = FALSE)
@@ -396,7 +396,7 @@ sub |> recv(mode = "character", keep.raw = FALSE)
396396

397397
# set logging level back to the default of errors only -------------------------
398398
logging(level = "error")
399-
#> 2022-03-03 16:31:20 [ log level ] set to: error
399+
#> 2022-03-03 23:42:57 [ log level ] set to: error
400400

401401
close(pub)
402402
close(sub)
@@ -406,9 +406,14 @@ close(sub)
406406

407407
### Surveyor Respondent Model
408408

409-
This type of topology is useful for applications such as service
409+
This type of pattern is useful for applications such as service
410410
discovery.
411411

412+
A surveyor sends a survey, which is broadcast to all peer respondents.
413+
The respondents then have a chance to reply (but are not obliged to).
414+
The survey itself is a timed event, such that responses received after
415+
the timeout are discarded.
416+
412417
``` r
413418
sur <- socket("surveyor", listen = "inproc://nanoservice")
414419
res1 <- socket("respondent", dial = "inproc://nanoservice")
@@ -442,8 +447,8 @@ aio2$data
442447
# after the survey expires, the second resolves into a timeout error -----------
443448
Sys.sleep(0.5)
444449
aio2$data
445-
#> 2022-03-03 16:31:21 [ 5 ] Timed out
446-
#> [1] 5
450+
#> 2022-03-03 23:42:57 [ 5 ] Timed out
451+
#> 'errorValue' int 5
447452

448453
close(sur)
449454
close(res1)
@@ -463,11 +468,11 @@ ncurl("http://httpbin.org/headers")
463468
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
464469
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
465470
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
466-
#> [76] 2d 36 32 32 30 65 64 35 39 2d 32 31 64 36 39 31 30 32 31 32 62 30 63 39 34
467-
#> [101] 64 30 65 61 36 32 63 31 30 22 0a 20 20 7d 0a 7d 0a
471+
#> [76] 2d 36 32 32 31 35 32 38 31 2d 37 65 32 32 39 65 31 62 36 65 30 31 62 63 38
472+
#> [101] 33 36 31 37 61 61 64 31 64 22 0a 20 20 7d 0a 7d 0a
468473
#>
469474
#> $data
470-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6220ed59-21d6910212b0c94d0ea62c10\"\n }\n}\n"
475+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62215281-7e229e1b6e01bc83617aad1d\"\n }\n}\n"
471476
```
472477

473478
For advanced use, supports additional HTTP methods such as POST or PUT.

0 commit comments

Comments
 (0)