Skip to content

Commit 5892b34

Browse files
committed
Further Aio cleanups
1 parent cc8bd6f commit 5892b34

File tree

11 files changed

+161
-116
lines changed

11 files changed

+161
-116
lines changed

R/aio.R

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# nanonext - Core Functions - Aio Functions ------------------------------------
2+
3+
#' Call the Result of an Asynchronous AIO Operation
4+
#'
5+
#' Retrieve the result of an asynchronous AIO operation. Will wait for the AIO
6+
#' operation (blocking) if not yet complete. Once the result is retrieved,
7+
#' the Aio is deallocated and further actions cannot be performed on it.
8+
#'
9+
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
10+
#'
11+
#' @return The original Aio object (invisibly).
12+
#'
13+
#' @details For a 'sendAio', the send result will be attached to the Aio in
14+
#' \code{$result}. This will be zero on success.
15+
#'
16+
#' For a 'recvAio', the received raw vector will be attached in \code{$raw}
17+
#' (unless 'keep.raw' was set to FALSE when receiving), and the converted R
18+
#' object in \code{$data}.
19+
#'
20+
#' For a 'recvAio', in case of an error in unserialisation or data conversion,
21+
#' the received raw vector will always be saved in \code{$raw} to allow the
22+
#' data to be recovered.
23+
#'
24+
#' @examples
25+
#' s1 <- socket("pair", listen = "inproc://nanonext")
26+
#' s2 <- socket("pair", dial = "inproc://nanonext")
27+
#'
28+
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
29+
#' res
30+
#' aio_call(res)
31+
#' res
32+
#' res$result
33+
#'
34+
#' res <- recv_aio(s2, timeout = 100)
35+
#' res
36+
#' aio_call(res)$data
37+
#' res
38+
#'
39+
#' close(s1)
40+
#' close(s2)
41+
#'
42+
#' @export
43+
#'
44+
aio_call <- function(aio) {
45+
46+
if (inherits(aio, "recvAio")) {
47+
48+
mode <- attr(aio, "callparams")[[1L]]
49+
keep.raw <- attr(aio, "callparams")[[2L]]
50+
res <- .Call(rnng_aio_get_msg, aio)
51+
if (keep.raw) attr(aio, "raw") <- res
52+
is.integer(res) && {
53+
message(res, " : ", nng_error(res))
54+
return(invisible(aio))
55+
}
56+
on.exit(expr = {
57+
attr(aio, "raw") <- res
58+
return(invisible(aio))
59+
})
60+
data <- switch(mode,
61+
serial = unserialize(connection = res),
62+
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
63+
raw = res,
64+
readBin(con = res, what = mode, n = length(res)))
65+
if (is.null(data)) data <- list(NULL)
66+
attr(aio, "data") <- data
67+
on.exit(expr = NULL)
68+
invisible(aio)
69+
70+
} else if (inherits(aio, "sendAio")) {
71+
72+
res <- .Call(rnng_aio_result, aio)
73+
attr(aio, "result") <- res
74+
if (res) {
75+
message(res, " : ", nng_error(res))
76+
}
77+
invisible(aio)
78+
79+
} else {
80+
stop("this function may only be used on an Aio")
81+
}
82+
83+
}
84+
85+
#' Stop Asynchronous AIO Operation
86+
#'
87+
#' Stop an asynchronous AIO operation.
88+
#'
89+
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
90+
#'
91+
#' @return Invisible NULL.
92+
#'
93+
#' @details Stops the asynchronous I/O operation associated with 'aio' by
94+
#' aborting, and then waits for it to complete or to be completely aborted.
95+
#' The Aio is then deallocated and no further operations may be performed on
96+
#' it.
97+
#'
98+
#' @export
99+
#'
100+
aio_stop <- function(aio) {
101+
102+
invisible(.Call(rnng_aio_stop, aio))
103+
104+
}
105+

R/context.R

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ ctx_recv <- function(context,
162162
#' to the caller/client.
163163
#'
164164
#' @param context a Context.
165+
#' @param execute a function which takes the received (converted) data as its
166+
#' first argument. Can be an anonymous function of the form \code{function(x) do(x)}.
167+
#' Additional arguments can also be passed in through '...'.
168+
#' @param ... additional arguments passed to the function specified by 'execute'.
165169
#' @param send_mode [default 'serial'] whether data will be sent serialized or
166170
#' as a raw vector. Use 'serial' for sending and receiving within R to ensure
167171
#' perfect reproducibility. Use 'raw' for sending vectors of any type (will be
@@ -171,14 +175,10 @@ ctx_recv <- function(context,
171175
#' 'character', 'complex', 'double', 'integer', 'logical', 'numeric', or 'raw'.
172176
#' The default 'serial' means a serialised R object, for the other modes,
173177
#' the raw vector received will be converted into the respective mode.
174-
#' @param execute a function which takes the received (converted) data as its
175-
#' first argument. Can be an anonymous function of the form \code{function(x) do(x)}.
176-
#' Additional arguments can also be passed in through '...'.
177178
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
178179
#' be used. Note this applies to each of the receive and send legs, hence the
179180
#' total elapsed time could be up to twice this parameter plus the time to
180181
#' perform 'execute' on the received data.
181-
#' @param ... additional arguments passed to the function specified by 'execute'.
182182
#'
183183
#' @return Invisible NULL.
184184
#'
@@ -210,11 +210,11 @@ ctx_recv <- function(context,
210210
#' @export
211211
#'
212212
ctx_rep <- function(context,
213+
execute,
213214
...,
214215
recv_mode = c("serial", "character", "complex", "double",
215216
"integer", "logical", "numeric", "raw"),
216217
send_mode = c("serial", "raw"),
217-
execute,
218218
timeout) {
219219

220220
recv_mode <- match.arg(recv_mode)

R/methods.R

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ start.nanoDialer <- function(x, async = TRUE, ...) {
5353
#'
5454
#' @return Zero (invisibly) on success.
5555
#'
56+
#' @details Dialers and Listeners are implicitly closed when the socket they are
57+
#' associated with is closed.
58+
#'
59+
#' Closing a socket associated with a context also closes the context.
60+
#'
61+
#' When closing a socket or a context: messages that have been submitted for
62+
#' sending may be flushed or delivered, depending upon the transport. Closing
63+
#' the socket while data is in transmission will likely lead to loss of that
64+
#' data. There is no automatic linger or flush to ensure that the socket
65+
#' send buffers have completely transmitted.
66+
#'
5667
#' @name close
5768
#' @rdname close
5869
#'

R/nano.R

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,13 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
9797
mode = mode,
9898
block = block,
9999
keep.raw = keep.raw)
100-
nano[["recv_aio"]] <- function(timeout) recv_aio(socket,
101-
timeout = timeout)
100+
nano[["recv_aio"]] <- function(mode = c("serial", "character", "complex", "double",
101+
"integer", "logical", "numeric", "raw"),
102+
timeout,
103+
keep.raw = TRUE) recv_aio(socket,
104+
mode = mode,
105+
timeout = timeout,
106+
keep.raw = keep.raw)
102107
nano[["send"]] <- function(data,
103108
mode = c("serial", "raw"),
104109
block = FALSE,

R/sendrecv.R

Lines changed: 0 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -217,108 +217,6 @@ recv_aio <- function(socket,
217217

218218
}
219219

220-
#' Call the Result of an Asynchronous AIO Operation
221-
#'
222-
#' Retrieve the result of an asynchronous AIO operation. Will wait for the AIO
223-
#' operation (blocking) if not yet complete. Once the result is retrieved,
224-
#' the Aio is deallocated and further actions cannot be performed on it.
225-
#'
226-
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
227-
#'
228-
#' @return The original Aio object (invisibly).
229-
#'
230-
#' @details For a 'sendAio', the send result will be attached to the Aio in
231-
#' \code{$result}. This will be zero on success.
232-
#'
233-
#' For a 'recvAio', the received raw vector will be attached in \code{$raw}
234-
#' (unless 'keep.raw' was set to FALSE when receiving), and the converted R
235-
#' object in \code{$data}.
236-
#'
237-
#' For a 'recvAio', in case of an error in unserialisation or data conversion,
238-
#' the received raw vector will always be saved in \code{$raw} to allow the
239-
#' data to be recovered.
240-
#'
241-
#' @examples
242-
#' s1 <- socket("pair", listen = "inproc://nanonext")
243-
#' s2 <- socket("pair", dial = "inproc://nanonext")
244-
#'
245-
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
246-
#' res
247-
#' aio_call(res)
248-
#' res
249-
#' res$result
250-
#'
251-
#' res <- recv_aio(s2, timeout = 100)
252-
#' res
253-
#' aio_call(res)$data
254-
#' res
255-
#'
256-
#' close(s1)
257-
#' close(s2)
258-
#'
259-
#' @export
260-
#'
261-
aio_call <- function(aio) {
262-
263-
if (inherits(aio, "recvAio")) {
264-
265-
mode <- attr(aio, "callparams")[[1L]]
266-
keep.raw <- attr(aio, "callparams")[[2L]]
267-
res <- .Call(rnng_aio_get_msg, aio)
268-
if (keep.raw) attr(aio, "raw") <- res
269-
is.integer(res) && {
270-
message(res, " : ", nng_error(res))
271-
return(invisible(aio))
272-
}
273-
on.exit(expr = {
274-
attr(aio, "raw") <- res
275-
return(invisible(aio))
276-
})
277-
data <- switch(mode,
278-
serial = unserialize(connection = res),
279-
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
280-
raw = res,
281-
readBin(con = res, what = mode, n = length(res)))
282-
attr(aio, "data") <- data
283-
on.exit(expr = NULL)
284-
invisible(aio)
285-
286-
} else if (inherits(aio, "sendAio")) {
287-
288-
res <- .Call(rnng_aio_result, aio)
289-
attr(aio, "result") <- res
290-
if (res) {
291-
message(res, " : ", nng_error(res))
292-
}
293-
invisible(aio)
294-
295-
} else {
296-
stop("this function may only be used on an Aio")
297-
}
298-
299-
}
300-
301-
#' Stop Asynchronous AIO Operation
302-
#'
303-
#' Stop an asynchronous AIO operation.
304-
#'
305-
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
306-
#'
307-
#' @return Invisible NULL.
308-
#'
309-
#' @details Stops the asynchronous I/O operation associated with 'aio' by
310-
#' aborting, and then waits for it to complete or to be completely aborted.
311-
#' The Aio is then deallocated and no further operations may be performed on
312-
#' it.
313-
#'
314-
#' @export
315-
#'
316-
aio_stop <- function(aio) {
317-
318-
invisible(.Call(rnng_aio_stop, aio))
319-
320-
}
321-
322220
# Deprecated - may be removed at any time - do not use -------------------------
323221

324222
#' Send Vector

README.Rmd

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@ Designed for performance and reliability, the NNG library is written in C and {n
2727

2828
Can be used for sending data across networks, but equally as an interface for code and processes to communicate with each other. Receive data generated in Python, perform analysis in R, and send results to a C++ program – all on the same computer or on networks spanning the globe.
2929

30+
### Table of Contents
31+
32+
1. [Installation](#installation)
33+
2. [Interfaces](#interfaces)
34+
3. [Building from source](#building-from-source)
35+
4. [Links](#links)
36+
3037
### Installation
3138

3239
Install the latest release from CRAN:

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,13 @@ data generated in Python, perform analysis in R, and send results to a
2828
C++ program – all on the same computer or on networks spanning the
2929
globe.
3030

31+
### Table of Contents
32+
33+
1. [Installation](#installation)
34+
2. [Interfaces](#interfaces)
35+
3. [Building from source](#building-from-source)
36+
4. [Links](#links)
37+
3138
### Installation
3239

3340
Install the latest release from CRAN:

man/aio_call.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/aio_stop.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/close.Rd

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)