Skip to content

Commit 5786df9

Browse files
committed
use active bindings for more intuitive Aio UI
1 parent a6fd572 commit 5786df9

File tree

17 files changed

+294
-108
lines changed

17 files changed

+294
-108
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.2.0.9002
4+
Version: 0.2.0.9003
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library providing high-performance scalability protocols,
77
implementing a cross-platform standard for messaging and communications.

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ S3method(print,nanoObject)
1515
S3method(print,nanoSocket)
1616
S3method(print,recvAio)
1717
S3method(print,sendAio)
18+
S3method(print,unresolvedValue)
1819
S3method(setopt,nanoContext)
1920
S3method(setopt,nanoDialer)
2021
S3method(setopt,nanoListener)
@@ -26,6 +27,7 @@ export(call_aio)
2627
export(context)
2728
export(dial)
2829
export(is_nul_byte)
30+
export(is_resolved)
2931
export(listen)
3032
export(nano)
3133
export(ncurl)

NEWS.md

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
# nanonext 0.2.0.9002 (development)
1+
# nanonext 0.2.0.9003 (development)
22

33
#### New Features
44

5-
* `call_aio()` gains the argument 'block', offering a non-blocking method of calling an AIO, without waiting for completion if it is yet to resolve.
6-
* `survey_time()` added as a convenience function for surveyor/respondent patterns.
5+
* Aio fields `$result` for a 'sendAio', `$raw` and `$data` for a 'recvAio' may be queried directly, returning their values or else an NA 'unresolved value' if the Aio operation is yet to complete.
6+
* `is_resolved()` added as an auxiliary function to query whether an Aio has resolved in a non-blocking fashion.
77
* `is_nul_byte()` added as a helper function for request/reply setups.
8+
* `survey_time()` added as a convenience function for surveyor/respondent patterns.
89
* `ncurl()` adds a '...' argument. Support for HTTP methods other than GET.
910

1011
#### Updates
1112

12-
* 'quietly' argument added to functions that create or destroy objects. Set to FALSE to enable printing of informational messages for logging purposes.
13+
* Argument 'quietly' added to functions that create or destroy objects. Set to FALSE to enable printing of informational messages for logging purposes.
1314
* Common format for NNG errors now starts with a timestamp for easier logging.
1415
* `listen()` and `dial()` now return (invisible) zero rather than NULL upon success to better align with similar functions.
1516
* Allows setting the environment variable 'NANONEXT_ARM' prior to package installation

R/aio.R

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,12 @@
22

33
#' Call the Result of an Asynchronous AIO Operation
44
#'
5-
#' Retrieve the result of an asynchronous AIO operation. Optionally wait for the
6-
#' AIO operation to complete if this is still in progress.
5+
#' Retrieve the result of an asynchronous AIO operation, waiting for the AIO
6+
#' operation to complete if still in progress.
77
#'
88
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
9-
#' @param block [default TRUE] whether to wait for completion of the AIO
10-
#' operation (blocking) or return immediately.
119
#'
12-
#' @return The passed Aio object (invisibly), or NULL if non-blocking and the
13-
#' Aio has yet to resolve.
10+
#' @return The passed Aio object (invisibly).
1411
#'
1512
#' @details For a 'recvAio', the received raw vector will be attached in \code{$raw}
1613
#' (unless 'keep.raw' was set to FALSE when receiving), and the converted R
@@ -23,23 +20,21 @@
2320
#' \code{call_aio(x)$result}.
2421
#'
2522
#' For a 'recvAio', in case of an error in unserialisation or data conversion,
26-
#' the received raw vector will always be saved in \code{$raw} to allow the
23+
#' the received raw vector will be stored in \code{$data} to allow for the
2724
#' data to be recovered.
2825
#'
2926
#' Once the result has been successfully retrieved, the Aio is deallocated
3027
#' and only the result is stored in the Aio object.
3128
#'
3229
#' @section Non-blocking:
3330
#'
34-
#' To query whether Aio \code{x} has resolved, test if \code{call_aio(x, block = FALSE)}
35-
#' returns NULL. When the Aio resolves, the Aio itself will be returned
36-
#' (invisibly) instead of NULL. The data may then be extracted from the Aio
37-
#' using \code{$result}, \code{$raw} or \code{$data} as the case may be.
31+
#' To query the value of an Aio without potentially waiting for the Aio
32+
#' operation to complete, call the values directly at \code{$result} for a 'sendAio', and
33+
#' \code{$raw} or \code{$data} for a 'recvAio'.
3834
#'
39-
#' It is not advisable to use, for example, \code{call_aio(x, block = FALSE)$data}.
40-
#' This is as \code{call_aio()} will return NULL if the Aio is unresolved and
41-
#' \code{NULL$data} is also \code{NULL}, hence it would be impossible to
42-
#' distinguish between an unresolved Aio and a NULL return value in this case.
35+
#' If the Aio operation is yet to complete, the result will be an
36+
#' 'unresolved value', which is a logical NA. Once complete, the resolved
37+
#' value will be returned instead.
4338
#'
4439
#' @examples
4540
#' s1 <- socket("pair", listen = "inproc://nanonext")
@@ -48,40 +43,40 @@
4843
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
4944
#' res
5045
#' call_aio(res)
51-
#' res
5246
#' res$result
5347
#'
5448
#' res <- recv_aio(s2, timeout = 100)
5549
#' res
5650
#' call_aio(res)$data
57-
#' res
5851
#'
5952
#' close(s1)
6053
#' close(s2)
6154
#'
6255
#' @export
6356
#'
64-
call_aio <- function(aio, block = TRUE) {
57+
call_aio <- function(aio) {
6558

6659
is.null(.subset2(aio, "aio")) && return(invisible(aio))
6760

6861
if (inherits(aio, "recvAio")) {
6962

70-
if (missing(block) || isTRUE(block)) {
71-
res <- .Call(rnng_aio_wait_get_msg, .subset2(aio, "aio"))
72-
} else {
73-
res <- .Call(rnng_aio_get_msg, .subset2(aio, "aio"))
74-
missing(res) && return()
75-
}
63+
res <- .Call(rnng_aio_wait_get_msg, .subset2(aio, "aio"))
7664
mode <- .subset2(aio, "callparams")[[1L]]
7765
keep.raw <- .subset2(aio, "callparams")[[2L]]
78-
if (keep.raw) aio[["raw"]] <- res
7966
is.integer(res) && {
67+
if (keep.raw) {
68+
rm("raw", envir = aio)
69+
`[[<-`(aio, "raw", res)
70+
}
71+
rm("data", envir = aio)
72+
`[[<-`(aio, "data", res)
8073
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
8174
return(invisible(aio))
8275
}
8376
on.exit(expr = {
84-
aio[["raw"]] <- res
77+
if (keep.raw) rm("raw", envir = aio)
78+
rm("data", envir = aio)
79+
`[[<-`(aio, "data", res)
8580
rm("aio", envir = aio)
8681
rm("callparams", envir = aio)
8782
return(invisible(aio))
@@ -91,22 +86,23 @@ call_aio <- function(aio, block = TRUE) {
9186
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
9287
raw = res,
9388
readBin(con = res, what = mode, n = length(res)))
94-
aio[["data"]] <- data
9589
on.exit()
90+
if (keep.raw) {
91+
rm("raw", envir = aio)
92+
`[[<-`(aio, "raw", res)
93+
}
94+
rm("data", envir = aio)
95+
`[[<-`(aio, "data", data)
9696
rm("aio", envir = aio)
9797
rm("callparams", envir = aio)
9898

9999
} else if (inherits(aio, "sendAio")) {
100100

101-
if (missing(block) || isTRUE(block)) {
102-
res <- .Call(rnng_aio_wait_result, .subset2(aio, "aio"))
103-
} else {
104-
res <- .Call(rnng_aio_result, .subset2(aio, "aio"))
105-
missing(res) && return()
106-
}
107-
aio[["result"]] <- res
101+
res <- .Call(rnng_aio_wait_result, .subset2(aio, "aio"))
102+
rm("result", envir = aio)
103+
`[[<-`(aio, "result", res)
108104
rm("aio", envir = aio)
109-
if (res) message(Sys.time(), " | ", res, " : ", nng_error(res))
105+
if (res) message(Sys.time(), " [ ", res, " ] ", nng_error(res))
110106

111107
}
112108

@@ -135,3 +131,40 @@ stop_aio <- function(aio) {
135131

136132
}
137133

134+
#' Is Resolved (Asynchronous AIO Operation)
135+
#'
136+
#' Query whether an Aio has resolved. This function is non-blocking unlike
137+
#' \code{\link{call_aio}} which waits for completion.
138+
#'
139+
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
140+
#'
141+
#' @return Logical TRUE or FALSE. NA if 'aio' is not a 'sendAio' or 'recvAio'.
142+
#'
143+
#' @details Querying resolution will potentially cause the state of the Aio to
144+
#' update.
145+
#'
146+
#' @examples
147+
#' s1 <- socket("pair", listen = "inproc://nanonext")
148+
#' aio <- send_aio(s1, "test", timeout = 100)
149+
#' is_resolved(aio)
150+
#'
151+
#' s2 <- socket("pair", dial = "inproc://nanonext")
152+
#' is_resolved(aio)
153+
#'
154+
#' close(s1)
155+
#' close(s2)
156+
#'
157+
#' @export
158+
#'
159+
is_resolved <- function(aio) {
160+
161+
if (inherits(aio, "recvAio")) {
162+
!inherits(aio$data, "unresolvedValue")
163+
} else if (inherits(aio, "sendAio")) {
164+
!inherits(aio$result, "unresolvedValue")
165+
} else {
166+
NA
167+
}
168+
169+
}
170+

R/context.R

Lines changed: 63 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,7 @@ request <- function(context,
312312

313313
send_mode <- match.arg(send_mode)
314314
recv_mode <- match.arg(recv_mode)
315+
keep.raw <- missing(keep.raw) || isTRUE(keep.raw)
315316
if (missing(timeout)) timeout <- -2L
316317
force(data)
317318
data <- switch(send_mode,
@@ -322,15 +323,70 @@ request <- function(context,
322323
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
323324
return(invisible(res))
324325
}
325-
res <- .Call(rnng_recv_aio, context, timeout)
326-
is.integer(res) && {
327-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
328-
return(invisible(res))
326+
327+
aio <- .Call(rnng_recv_aio, context, timeout)
328+
is.integer(aio) && {
329+
message(Sys.time(), " [ ", aio, " ] ", nng_error(aio))
330+
return(invisible(aio))
329331
}
330332
env <- `class<-`(new.env(), "recvAio")
331-
env[["aio"]] <- res
332-
env[["callparams"]] <- list(recv_mode, missing(keep.raw) || isTRUE(keep.raw))
333-
env
333+
`[[<-`(env, "callparams", list(recv_mode, keep.raw))
334+
data <- raw <- resolv <- NULL
335+
if (keep.raw) {
336+
makeActiveBinding(sym = "raw", fun = function(x) {
337+
if (is.null(resolv)) {
338+
res <- .Call(rnng_aio_get_msg, aio)
339+
missing(res) && return(.Call(rnng_aio_unresolv))
340+
is.integer(res) && {
341+
res <<- raw <<- resolv <<- res
342+
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
343+
return(invisible(res))
344+
}
345+
on.exit(expr = {
346+
raw <<- res
347+
resolv <<- 0L
348+
return(res)
349+
})
350+
data <- switch(recv_mode,
351+
serial = unserialize(connection = res),
352+
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
353+
raw = res,
354+
readBin(con = res, what = recv_mode, n = length(res)))
355+
on.exit()
356+
raw <<- res
357+
data <<- data
358+
resolv <<- 0L
359+
}
360+
raw
361+
}, env = env)
362+
}
363+
makeActiveBinding(sym = "data", fun = function(x) {
364+
if (is.null(resolv)) {
365+
res <- .Call(rnng_aio_get_msg, aio)
366+
missing(res) && return(.Call(rnng_aio_unresolv))
367+
is.integer(res) && {
368+
res <<- raw <<- resolv <<- res
369+
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
370+
return(invisible(res))
371+
}
372+
on.exit(expr = {
373+
data <<- res
374+
resolv <<- 0L
375+
return(res)
376+
})
377+
data <- switch(recv_mode,
378+
serial = unserialize(connection = res),
379+
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
380+
raw = res,
381+
readBin(con = res, what = recv_mode, n = length(res)))
382+
on.exit()
383+
if (keep.raw) raw <<- res
384+
data <<- data
385+
resolv <<- 0L
386+
}
387+
data
388+
}, env = env)
389+
`[[<-`(env, "aio", aio)
334390

335391
}
336392

R/docs.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -349,10 +349,10 @@ NULL
349349
#'
350350
#' \item{'socket-name' [type 'string']}
351351
#'
352-
#' {This the socket name. By default this is a string corresponding to the
353-
#' value of the socket. The string must fit within 64-bytes, including the
354-
#' terminating NUL byte. The value is intended for application use, and is
355-
#' not used for anything in the library itself.}
352+
#' {This is the socket name. By default this is a string corresponding to
353+
#' the value of the socket. The string must fit within 64-bytes, including
354+
#' the terminating NUL byte. The value is intended for application use,
355+
#' and is not used for anything in the library itself.}
356356
#'
357357
#' }
358358
#'

R/nano.R

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,7 @@ print.nanoListener <- function(x, ...) {
217217
#'
218218
print.recvAio <- function(x, ...) {
219219

220-
cat("< recvAio >\n")
221-
if (length(.subset2(x, "aio"))) {
222-
cat(" ~ use call_aio() to retrieve message\n")
223-
} else {
224-
if (length(.subset2(x, "raw")))
225-
cat(" - $raw for raw message\n")
226-
if (length(.subset2(x, "data")))
227-
cat(" - $data for message data\n")
228-
}
220+
cat("< recvAio >\n - $raw for raw message\n - $data for message data\n")
229221
invisible(x)
230222

231223
}
@@ -234,12 +226,7 @@ print.recvAio <- function(x, ...) {
234226
#'
235227
print.sendAio <- function(x, ...) {
236228

237-
cat("< sendAio >\n")
238-
if (length(.subset2(x, "aio"))) {
239-
cat(" ~ use call_aio() to retrieve result\n")
240-
} else {
241-
cat(" - $result for send result\n")
242-
}
229+
cat("< sendAio >\n - $result for send result\n")
243230
invisible(x)
244231

245232
}
@@ -276,3 +263,12 @@ print.sendAio <- function(x, ...) {
276263

277264
}
278265

266+
#' @export
267+
#'
268+
print.unresolvedValue <- function(x, ...) {
269+
270+
cat("< unresolved value >\n")
271+
invisible(x)
272+
273+
}
274+

R/nanonext-package.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,3 @@
122122
#'
123123
NULL
124124

125-
utils::globalVariables("eval_mirai")

0 commit comments

Comments
 (0)