Skip to content

Commit 5dc478f

Browse files
committed
implementing full AIO
1 parent 2d9c928 commit 5dc478f

21 files changed

+495
-684
lines changed

NAMESPACE

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ S3method(close,nanoContext)
88
S3method(close,nanoDialer)
99
S3method(close,nanoListener)
1010
S3method(close,nanoSocket)
11+
S3method(print,nano)
1112
S3method(print,nanoContext)
1213
S3method(print,nanoDialer)
1314
S3method(print,nanoListener)
@@ -19,13 +20,13 @@ S3method(setopt,nanoListener)
1920
S3method(setopt,nanoSocket)
2021
S3method(start,nanoDialer)
2122
S3method(start,nanoListener)
23+
export(aio_call)
24+
export(aio_stop)
2225
export(context)
2326
export(ctx_recv)
24-
export(ctx_recv_vec)
2527
export(ctx_rep)
2628
export(ctx_req)
2729
export(ctx_send)
28-
export(ctx_send_vec)
2930
export(dial)
3031
export(listen)
3132
export(nano)
@@ -35,11 +36,9 @@ export(nng_version)
3536
export(recv)
3637
export(recv_aio)
3738
export(recv_vec)
38-
export(recv_vec_aio)
3939
export(send)
4040
export(send_aio)
4141
export(send_vec)
42-
export(send_vec_aio)
4342
export(setopt)
4443
export(socket)
4544
export(subscribe)

NEWS.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@
22

33
#### New Features
44

5-
* New `ctx_rep()` and `ctx_req()` functions implement the full logic of an RPC server/client. Designed to be run in separate processes, the server will await data and apply an arbitrary function to it before returning a result, whilst the client will send data to the server and await a response.
5+
* Implemented full async I/O capabilities allowing `send_aio()` and `recv_aio()` to return Aio objects, for which the results may be called using `aio_call()`.
6+
* New `ctx_rep()` and `ctx_req()` functions implement the full logic of an RPC server/client. Designed to be run in separate processes, the server will await data and apply an arbitrary function before returning a result, whilst the client will send data to the server and await a response.
67
* New `ncurl()` minimalistic http(s) client.
78
* Allow setting the environment variable 'NANONEXT_TLS' prior to package installation to enable TLS where the system NNG library has been built with TLS support (using Mbed TLS).
89

910
#### Updates
1011

1112
* All send and receive functions, e.g. `send()`/`recv()`, gain a revamped 'mode' argument. This now permits the choice of whether to use R serialization, consolidating the functionality of the '_vec' series of functions.
12-
* The '_vec' series of functions ('send_vec', 'recv_vec' etc.) is deprecated and will be removed in a future release.
13+
* Functions 'send_vec' and 'recv_vec' are deprecated and will be removed in a future release.
1314

1415
# nanonext 0.1.0
1516

R/context.R

Lines changed: 47 additions & 130 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@
2121
#'
2222
#' Note: not every protocol supports creation of separate contexts.
2323
#'
24-
#' To send and receive over a context use \code{\link{ctx_send}} or
25-
#' \code{\link{ctx_send_vec}} and \code{\link{ctx_recv}} or
26-
#' \code{\link{ctx_recv_vec}} respectively.
24+
#' To send and receive over a context use \code{\link{ctx_send}} and
25+
#' \code{\link{ctx_recv}}. It is also possible to perform async send and receive
26+
#' with a context using \code{\link{send_aio}} and \code{\link{recv_aio}}.
2727
#'
2828
#' @examples
2929
#' s <- socket("req", listen = "inproc://nanonext")
@@ -49,15 +49,15 @@ context <- function(socket) {
4949

5050
}
5151

52-
#' Send over Context (Async)
52+
#' Send over Context
5353
#'
54-
#' Send any number of R objects asynchronously over a Context, with the ability
55-
#' to set send timeouts.
54+
#' Send data over a Context.
5655
#'
5756
#' @param context a Context.
57+
#' @inheritParams send
5858
#' @inheritParams send_aio
5959
#'
60-
#' @return A vector of zeros (invisibly) on success.
60+
#' @return Raw vector of sent data, or zero (invisibly) if 'echo' is set to FALSE.
6161
#'
6262
#' @details Will block if the send is in progress and has not yet completed -
6363
#' certain protocol / transport combinations may limit the number of messages
@@ -69,41 +69,40 @@ context <- function(socket) {
6969
#' rep <- socket("rep", dial = "inproc://nanonext")
7070
#'
7171
#' ctx <- context(req)
72-
#' out <- ctx_send(ctx, data.frame(a = 1, b = 2), data.frame(c = 3, d = 4), timeout = 100)
73-
#' out
72+
#' ctx_send(ctx, data.frame(a = 1, b = 2), timeout = 100)
7473
#'
75-
#' msg <- recv_aio(rep, n = 2L, timeout = 100)
76-
#'
77-
#' out <- ctx_send(ctx, c(1.1, 2.2), c(3.3, 4.4), mode = "raw", timeout = 100)
78-
#' out
74+
#' msg <- recv_aio(rep, timeout = 100)
75+
#' ctx_send(ctx, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
7976
#'
8077
#' close(req)
8178
#' close(rep)
8279
#'
8380
#' @export
8481
#'
85-
ctx_send <- function(context, ..., mode = c("serial", "raw"), timeout) {
82+
ctx_send <- function(context, data, mode = c("serial", "raw"), timeout, echo = TRUE) {
8683

8784
mode <- match.arg(mode)
88-
if (missing(timeout)) timeout <- -2L
85+
force(data)
8986
data <- switch(mode,
90-
serial = lapply(list(...), serialize, connection = NULL),
91-
raw = lapply(list(...), writeBin, con = raw()))
87+
serial = serialize(object = data, connection = NULL),
88+
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
9289
res <- .Call(rnng_ctx_send, context, data, timeout)
93-
for (i in seq_along(res)) {
94-
if (res[i]) message("[", i, "] ", res[i], " : ", nng_error(res[i]))
90+
is.integer(res) && {
91+
message(res, " : ", nng_error(res))
92+
return(invisible(res))
9593
}
96-
invisible(res)
94+
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
9795

9896
}
9997

100-
#' Receive over Context (Async)
98+
#' Receive over Context
10199
#'
102100
#' Receive any number of R objects asynchronously over a Context, with the
103101
#' ability to set receive timeouts.
104102
#'
105103
#' @param context a Context.
106-
#' @inheritParams recv_aio
104+
#' @inheritParams recv
105+
#' @inheritParams send_aio
107106
#'
108107
#' @return Named list of 2 elements: 'raw' containing a list of received raw
109108
#' vectors and 'data' containing a list of converted R objects, or else a
@@ -126,37 +125,36 @@ ctx_send <- function(context, ..., mode = c("serial", "raw"), timeout) {
126125
#'
127126
#' ctxq <- context(req)
128127
#' ctxp <- context(rep)
129-
#' ctx_send(ctxq, data.frame(a = 1, b = 2), data.frame(c = 3, d = 4), timeout = 100)
130-
#' ctx_recv(ctxp, 2L, timeout = 100)
128+
#' ctx_send(ctxq, data.frame(a = 1, b = 2), timeout = 100)
129+
#' ctx_recv(ctxp, timeout = 100)
131130
#'
132-
#' ctx_send(ctxq, c(1.1, 2.2), c(3.3, 4.4), mode = "raw", timeout = 100)
133-
#' ctx_recv(ctxp, n = 2L, mode = "double", timeout = 100)
131+
#' ctx_send(ctxq, c(1.1, 2.2, 3.3), mode = "raw", timeout = 100)
132+
#' ctx_recv(ctxp, mode = "double", timeout = 100)
134133
#'
135134
#' close(req)
136135
#' close(rep)
137136
#'
138137
#' @export
139138
#'
140139
ctx_recv <- function(context,
141-
n = 1L,
142140
mode = c("serial", "character", "complex", "double",
143141
"integer", "logical", "numeric", "raw"),
144142
timeout,
145143
keep.raw = TRUE) {
146144

147145
mode <- match.arg(mode)
148146
if (missing(timeout)) timeout <- -2L
149-
res <- .Call(rnng_ctx_recv, context, n, timeout)
150-
on.exit(expr = return(res))
151-
data <- vector(mode = "list", length = length(res))
152-
for (i in seq_along(res)) {
153-
if (is.integer(res[[i]])) message("[", i, "] ", res[[i]], " : ", nng_error(res[[i]])) else
154-
data[[i]] <- switch(mode,
155-
serial = unserialize(res[[i]]),
156-
character = (r <- readBin(con = res[[i]], what = mode, n = length(res[[i]])))[r != ""],
157-
raw = res[[i]],
158-
readBin(con = res[[i]], what = mode, n = length(res[[i]])))
147+
res <- .Call(rnng_ctx_recv, context, timeout)
148+
is.integer(res) && {
149+
message(res, " : ", nng_error(res))
150+
return(invisible(res))
159151
}
152+
on.exit(expr = return(res))
153+
data <- switch(mode,
154+
serial = unserialize(connection = res),
155+
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
156+
raw = res,
157+
readBin(con = res, what = mode, n = length(res)))
160158
on.exit(expr = NULL)
161159
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
162160

@@ -203,11 +201,11 @@ ctx_recv <- function(context,
203201
#' ctxq <- context(req)
204202
#' ctxp <- context(rep)
205203
#'
206-
#' ctx_send(ctxq, 2022, timeout = 100)
204+
#' ctx_send(ctxq, 2022, timeout = 100, echo = FALSE)
207205
#' ctx_rep(ctxp, execute = function(x) x + 1, send_mode = "raw", timeout = 100)
208206
#' ctx_recv(ctxq, mode = "double", timeout = 100, keep.raw = FALSE)
209207
#'
210-
#' ctx_send(ctxq, 100, mode = "raw", timeout = 100)
208+
#' ctx_send(ctxq, 100, mode = "raw", timeout = 100, echo = FALSE)
211209
#' ctx_rep(ctxp, recv_mode = "double", execute = log, base = 10, timeout = 100)
212210
#' ctx_recv(ctxq, timeout = 100, keep.raw = FALSE)
213211
#'
@@ -227,7 +225,7 @@ ctx_rep <- function(context,
227225
recv_mode <- match.arg(recv_mode)
228226
send_mode <- match.arg(send_mode)
229227
if (missing(timeout)) timeout <- -2L
230-
res <- .Call(rnng_ctx_recv, context, 1L, timeout)[[1L]]
228+
res <- .Call(rnng_ctx_recv, context, timeout)
231229
is.integer(res) && {
232230
message(res, " : ", nng_error(res))
233231
return(invisible(res))
@@ -240,7 +238,7 @@ ctx_rep <- function(context,
240238
readBin(con = res, what = recv_mode, n = length(res)))
241239
on.exit(expr = NULL)
242240
msg <- execute(data, ...)
243-
ctx_send(context, msg, mode = send_mode, timeout = timeout)
241+
ctx_send(context, data = msg, mode = send_mode, timeout = timeout, echo = FALSE)
244242

245243
}
246244

@@ -250,7 +248,7 @@ ctx_rep <- function(context,
250248
#' data to the rep node (executor/server) and awaits the result to be returned.
251249
#'
252250
#' @inheritParams ctx_rep
253-
#' @inheritParams ctx_recv
251+
#' @inheritParams recv
254252
#' @param data an R object (if send_mode = 'raw', an R vector).
255253
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
256254
#' be used. Note this applies to each of the send and receive legs, hence the
@@ -294,15 +292,16 @@ ctx_req <- function(context,
294292
send_mode <- match.arg(send_mode)
295293
recv_mode <- match.arg(recv_mode)
296294
if (missing(timeout)) timeout <- -2L
297-
data <- list(switch(send_mode,
298-
serial = serialize(object = data, connection = NULL),
299-
if (is.raw(data)) data else writeBin(object = data, con = raw())))
300-
res <- .Call(rnng_ctx_send, context, data, timeout)[[1L]]
301-
res == 0L || {
295+
force(data)
296+
data <- switch(send_mode,
297+
serial = serialize(object = data, connection = NULL),
298+
if (is.raw(data)) data else writeBin(object = data, con = raw()))
299+
res <- .Call(rnng_ctx_send, context, data, timeout)
300+
is.integer(res) && {
302301
message(res, " : ", nng_error(res))
303302
return(invisible(res))
304303
}
305-
res <- .Call(rnng_ctx_recv, context, 1L, timeout)[[1L]]
304+
res <- .Call(rnng_ctx_recv, context, timeout)
306305
is.integer(res) && {
307306
message(res, " : ", nng_error(res))
308307
return(invisible(res))
@@ -318,85 +317,3 @@ ctx_req <- function(context,
318317

319318
}
320319

321-
# Deprecated - may be removed at any time - do not use -------------------------
322-
323-
#' Send Vector over Context (Async)
324-
#'
325-
#' DEPRECATED [Use ctx_send specifying mode = 'raw'] Send any number of R vectors
326-
#' asynchronously over a Context, with the ability to set (optional) send
327-
#' timeouts. Data will be sent as binary without R serialisation, hence
328-
#' appropriate for interfacing with external programs.
329-
#'
330-
#' @inheritParams ctx_send
331-
#' @inheritParams send_vec_aio
332-
#'
333-
#' @return A vector of zeros (invisibly) on success.
334-
#'
335-
#' @details Will block if the send is in progress and has not yet completed -
336-
#' certain protocol / transport combinations may limit the number of messages
337-
#' that can be queued if they have yet to be received. Set a timeout to
338-
#' ensure the function returns under all conditions.
339-
#'
340-
#' @keywords internal
341-
#' @export
342-
#'
343-
ctx_send_vec <- function(context, ..., timeout) {
344-
345-
if (missing(timeout)) timeout <- -2L
346-
data <- lapply(list(...), writeBin, con = raw())
347-
res <- .Call(rnng_ctx_send, context, data, timeout)
348-
for (i in seq_along(res)) {
349-
if (res[i]) message("[", i, "] ", res[i], " : ", nng_error(res[i]))
350-
}
351-
invisible(res)
352-
353-
}
354-
355-
#' Receive Vector over Context (Async)
356-
#'
357-
#' DEPRECATED [Use ctx_recv specifying mode] Receive vector data asynchronously
358-
#' over a Context (with ability to set a timeout). The counterpart to
359-
#' \code{\link{ctx_send_vec}}, data will be re-created from the raw vector
360-
#' according to the specified mode. Can be used when interfacing with
361-
#' external programs.
362-
#'
363-
#' @inheritParams ctx_recv
364-
#' @inheritParams recv_vec
365-
#'
366-
#' @return Named list of 2 elements: 'raw' containing a list of received raw
367-
#' vectors and 'data' containing a list of vectors decoded to the type 'mode',
368-
#' or else a list of vectors decoded to type 'mode' if keep.raw is set to
369-
#' FALSE.
370-
#'
371-
#' Note: a list of lists is always returned even when n = 1. To access the
372-
#' first raw element, for example, use \code{$raw[[1]]} and the first data
373-
#' element use \code{$data[[1]]}.
374-
#'
375-
#' @details Async recv will block while awaiting all 'n' messages to arrive. Set
376-
#' a timeout to ensure that the function returns under all conditions.
377-
#'
378-
#' @keywords internal
379-
#' @export
380-
#'
381-
ctx_recv_vec <- function(context,
382-
mode = c("character", "complex", "double", "integer",
383-
"logical", "numeric", "raw"),
384-
n = 1L,
385-
timeout,
386-
keep.raw = TRUE) {
387-
388-
mode <- match.arg(mode)
389-
if (missing(timeout)) timeout <- -2L
390-
res <- .Call(rnng_ctx_recv, context, n, timeout)
391-
data <- vector(mode = "list", length = length(res))
392-
for (i in seq_along(res)) {
393-
if (is.integer(res[[i]])) message("[", i, "] ", res[[i]], " : ", nng_error(res[[i]])) else
394-
data[[i]] <- switch(mode,
395-
character = (r <- readBin(con = res[[i]], what = mode, n = length(res[[i]])))[r != ""],
396-
raw = res[[i]],
397-
readBin(con = res[[i]], what = mode, n = length(res[[i]])))
398-
}
399-
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
400-
401-
}
402-

R/nano.R

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -97,15 +97,8 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
9797
mode = mode,
9898
block = block,
9999
keep.raw = keep.raw)
100-
nano[["recv_aio"]] <- function(n = 1L,
101-
mode = c("serial", "character", "complex", "double",
102-
"integer", "logical", "numeric", "raw"),
103-
timeout,
104-
keep.raw = TRUE) recv_aio(socket,
105-
n = n,
106-
mode = mode,
107-
timeout = timeout,
108-
keep.raw = keep.raw)
100+
nano[["recv_aio"]] <- function(timeout) recv_aio(socket,
101+
timeout = timeout)
109102
nano[["send"]] <- function(data,
110103
mode = c("serial", "raw"),
111104
block = FALSE,
@@ -114,10 +107,10 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
114107
mode = mode,
115108
block = block,
116109
echo = echo)
117-
nano[["send_aio"]] <- function(...,
110+
nano[["send_aio"]] <- function(data,
118111
mode = c("serial", "raw"),
119112
timeout) send_aio(socket,
120-
...,
113+
data = data,
121114
mode = mode,
122115
timeout = timeout)
123116
nano[["socket_close"]] <- function() close(socket)
@@ -206,6 +199,18 @@ print.nanoListener <- function(x, ...) {
206199

207200
}
208201

202+
#' @export
203+
#'
204+
print.nano <- function(x, ...) {
205+
206+
klass <- attr(x, "class")[1L]
207+
switch(klass,
208+
recvAio = cat("< recvAio > - use aio_call() to retrieve message\n"),
209+
sendAio = cat("< sendAio > - use aio_call() to retrieve result\n"))
210+
invisible(x)
211+
212+
}
213+
209214
#' @export
210215
#'
211216
`[[.nano` <- function(x, i, exact = FALSE) {

0 commit comments

Comments
 (0)