Skip to content

Commit adde66f

Browse files
committed
Add 'mode' to send/recv, deprecate _vec series of functions
1 parent 2f98a5d commit adde66f

23 files changed

+538
-496
lines changed

NEWS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
* Add `ncurl()` minimalistic http(s) client.
44
* Allow setting the environment variable 'NANONEXT_TLS' prior to package installation to enable TLS where the system NNG library has been built with TLS support (using Mbed TLS).
5+
* All send and receive functions, e.g. `send()`/`recv()`, gain the argument 'mode' for choosing whether or not to serialize R objects, consolidating the functionality of the '_vec' series of functions, e.g. send_vec()/recv_vec(). The '_vec' series of functions is consequently deprecated.
56

67
# nanonext 0.1.0
78

R/context.R

Lines changed: 72 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,7 @@ context <- function(socket) {
5252
#' Send over Context (Async)
5353
#'
5454
#' Send any number of R objects asynchronously over a Context, with the ability
55-
#' to set (optional) send timeouts. For sending and receiving within R -
56-
#' objects are sent serialized to ensure perfect reproducibility.
55+
#' to set send timeouts.
5756
#'
5857
#' @param context a Context.
5958
#' @inheritParams send_aio
@@ -63,7 +62,7 @@ context <- function(socket) {
6362
#' @details Will block if the send is in progress and has not yet completed -
6463
#' certain protocol / transport combinations may limit the number of messages
6564
#' that can be queued if they have yet to be received. Set a timeout to
66-
#' ensure the function returns under all conditions.
65+
#' ensure the function returns under all scenarios.
6766
#'
6867
#' @examples
6968
#' req <- socket("req", listen = "inproc://nanonext")
@@ -73,56 +72,23 @@ context <- function(socket) {
7372
#' out <- ctx_send(ctx, data.frame(a = 1, b = 2), data.frame(c = 3, d = 4), timeout = 100)
7473
#' out
7574
#'
76-
#' close(req)
77-
#' close(rep)
78-
#'
79-
#' @export
80-
#'
81-
ctx_send <- function(context, ..., timeout) {
82-
83-
if (missing(timeout)) timeout <- -2L
84-
data <- lapply(list(...), serialize, connection = NULL)
85-
res <- .Call(rnng_ctx_send, context, data, timeout)
86-
for (i in seq_along(res)) {
87-
if (res[i]) message("[", i, "] ", res[i], " : ", nng_error(res[i]))
88-
}
89-
invisible(res)
90-
91-
}
92-
93-
#' Send Vector over Context (Async)
94-
#'
95-
#' Send any number of R vectors asynchronously over a Context, with the ability
96-
#' to set (optional) send timeouts. Data will be sent as binary without R
97-
#' serialisation, hence appropriate for interfacing with external programs.
98-
#'
99-
#' @inheritParams ctx_send
100-
#' @inheritParams send_vec_aio
75+
#' msg <- recv_aio(rep, n = 2L, timeout = 100)
10176
#'
102-
#' @return A vector of zeros (invisibly) on success.
103-
#'
104-
#' @details Will block if the send is in progress and has not yet completed -
105-
#' certain protocol / transport combinations may limit the number of messages
106-
#' that can be queued if they have yet to be received. Set a timeout to
107-
#' ensure the function returns under all conditions.
108-
#'
109-
#' @examples
110-
#' req <- socket("req", listen = "inproc://nanonext")
111-
#' rep <- socket("rep", dial = "inproc://nanonext")
112-
#'
113-
#' ctx <- context(req)
114-
#' out <- ctx_send_vec(ctx, c(1.1, 2.2), c(3.3, 4.4), timeout = 1000)
77+
#' out <- ctx_send(ctx, c(1.1, 2.2), c(3.3, 4.4), mode = "raw", timeout = 100)
11578
#' out
11679
#'
11780
#' close(req)
11881
#' close(rep)
11982
#'
12083
#' @export
12184
#'
122-
ctx_send_vec <- function(context, ..., timeout) {
85+
ctx_send <- function(context, ..., mode = c("serial", "raw"), timeout) {
12386

87+
mode <- match.arg(mode)
12488
if (missing(timeout)) timeout <- -2L
125-
data <- lapply(list(...), writeBin, con = raw())
89+
data <- switch(mode,
90+
serial = lapply(list(...), serialize, connection = NULL),
91+
raw = lapply(list(...), writeBin, con = raw()))
12692
res <- .Call(rnng_ctx_send, context, data, timeout)
12793
for (i in seq_along(res)) {
12894
if (res[i]) message("[", i, "] ", res[i], " : ", nng_error(res[i]))
@@ -133,26 +99,26 @@ ctx_send_vec <- function(context, ..., timeout) {
13399

134100
#' Receive over Context (Async)
135101
#'
136-
#' Receive serialised data asynchronously over a Context (with ability to set a
137-
#' timeout). For sending and receiving within R.
102+
#' Receive any number of R objects asynchronously over a Context, with the
103+
#' ability to set receive timeouts.
138104
#'
139105
#' @inheritParams ctx_send
140106
#' @inheritParams recv_aio
141107
#'
142108
#' @return Named list of 2 elements: 'raw' containing a list of received raw
143-
#' vectors and 'data' containing a list of unserialised R objects, or else a
144-
#' list of unserialised R objects if keep.raw is set to FALSE.
109+
#' vectors and 'data' containing a list of converted R objects, or else a
110+
#' list of converted R objects if keep.raw is set to FALSE.
145111
#'
146112
#' Note: a list of lists is always returned even when n = 1. To access the
147113
#' first raw element, for example, use \code{$raw[[1]]} and the first data
148114
#' element use \code{$data[[1]]}.
149115
#'
150116
#' @details Async recv will block while awaiting all 'n' messages to arrive. Set
151-
#' a timeout to ensure that the function returns under all conditions.
117+
#' a timeout to ensure that the function returns under all scenarios.
152118
#'
153-
#' In case of an error in unserialisation (e.g. the data was not sent
154-
#' serialised), the function will still return a list of received raw vectors
155-
#' to allow the data to be recovered.
119+
#' In case of an error in unserialisation or data conversion, the function
120+
#' will still return a list of received raw vectors to allow the data to be
121+
#' recovered.
156122
#'
157123
#' @examples
158124
#' req <- socket("req", listen = "inproc://nanonext")
@@ -163,32 +129,80 @@ ctx_send_vec <- function(context, ..., timeout) {
163129
#' ctx_send(ctxq, data.frame(a = 1, b = 2), data.frame(c = 3, d = 4), timeout = 100)
164130
#' ctx_recv(ctxp, 2L, timeout = 100)
165131
#'
132+
#' ctx_send(ctxq, c(1.1, 2.2), c(3.3, 4.4), mode = "raw", timeout = 100)
133+
#' ctx_recv(ctxp, n = 2L, mode = "double", timeout = 100)
134+
#'
166135
#' close(req)
167136
#' close(rep)
168137
#'
169138
#' @export
170139
#'
171-
ctx_recv <- function(context, n = 1L, timeout, keep.raw = TRUE) {
140+
ctx_recv <- function(context,
141+
n = 1L,
142+
mode = c("serial", "character", "complex", "double",
143+
"integer", "logical", "numeric", "raw"),
144+
timeout,
145+
keep.raw = TRUE) {
172146

147+
mode <- match.arg(mode)
173148
if (missing(timeout)) timeout <- -2L
174149
res <- .Call(rnng_ctx_recv, context, n, timeout)
175150
on.exit(expr = return(res))
176151
data <- vector(mode = "list", length = length(res))
177152
for (i in seq_along(res)) {
178153
if (is.integer(res[[i]])) message("[", i, "] ", res[[i]], " : ", nng_error(res[[i]])) else
179-
data[[i]] <- unserialize(res[[i]])
154+
data[[i]] <- switch(mode,
155+
serial = unserialize(res[[i]]),
156+
character = (r <- readBin(con = res[[i]], what = mode, n = length(res[[i]])))[r != ""],
157+
raw = res[[i]],
158+
readBin(con = res[[i]], what = mode, n = length(res[[i]])))
180159
}
181160
on.exit(expr = NULL)
182161
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
183162

184163
}
185164

165+
# Deprecated - may be removed at any time - do not use -------------------------
166+
167+
#' Send Vector over Context (Async)
168+
#'
169+
#' DEPRECATED [Use ctx_send specifying mode = 'raw'] Send any number of R vectors
170+
#' asynchronously over a Context, with the ability to set (optional) send
171+
#' timeouts. Data will be sent as binary without R serialisation, hence
172+
#' appropriate for interfacing with external programs.
173+
#'
174+
#' @inheritParams ctx_send
175+
#' @inheritParams send_vec_aio
176+
#'
177+
#' @return A vector of zeros (invisibly) on success.
178+
#'
179+
#' @details Will block if the send is in progress and has not yet completed -
180+
#' certain protocol / transport combinations may limit the number of messages
181+
#' that can be queued if they have yet to be received. Set a timeout to
182+
#' ensure the function returns under all conditions.
183+
#'
184+
#' @keywords internal
185+
#' @export
186+
#'
187+
ctx_send_vec <- function(context, ..., timeout) {
188+
189+
if (missing(timeout)) timeout <- -2L
190+
data <- lapply(list(...), writeBin, con = raw())
191+
res <- .Call(rnng_ctx_send, context, data, timeout)
192+
for (i in seq_along(res)) {
193+
if (res[i]) message("[", i, "] ", res[i], " : ", nng_error(res[i]))
194+
}
195+
invisible(res)
196+
197+
}
198+
186199
#' Receive Vector over Context (Async)
187200
#'
188-
#' Receive vector data asynchronously over a Context (with ability to set a
189-
#' timeout). The counterpart to \code{\link{ctx_send_vec}}, data will be
190-
#' re-created from the raw vector according to the specified mode. Can be
191-
#' used when interfacing with external programs.
201+
#' DEPRECATED [Use ctx_recv specifying mode] Receive vector data asynchronously
202+
#' over a Context (with ability to set a timeout). The counterpart to
203+
#' \code{\link{ctx_send_vec}}, data will be re-created from the raw vector
204+
#' according to the specified mode. Can be used when interfacing with
205+
#' external programs.
192206
#'
193207
#' @inheritParams ctx_recv
194208
#' @inheritParams recv_vec
@@ -205,18 +219,7 @@ ctx_recv <- function(context, n = 1L, timeout, keep.raw = TRUE) {
205219
#' @details Async recv will block while awaiting all 'n' messages to arrive. Set
206220
#' a timeout to ensure that the function returns under all conditions.
207221
#'
208-
#' @examples
209-
#' req <- socket("req", listen = "inproc://nanonext")
210-
#' rep <- socket("rep", dial = "inproc://nanonext")
211-
#'
212-
#' ctxq <- context(req)
213-
#' ctxp <- context(rep)
214-
#' ctx_send_vec(ctxq, c(1.1, 2.2), c(3.3, 4.4), timeout = 100)
215-
#' ctx_recv_vec(ctxp, "double", 2L, timeout = 100)
216-
#'
217-
#' close(req)
218-
#' close(rep)
219-
#'
222+
#' @keywords internal
220223
#' @export
221224
#'
222225
ctx_recv_vec <- function(context,

R/nano.R

Lines changed: 16 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@
4343
#' nano$listener
4444
#'
4545
#' nano1 <- nano("bus", dial = "inproc://nanonext")
46-
#' nano$send_vec("example test")
47-
#' nano1$recv_vec("character")
46+
#' nano$send("example test", mode = "raw")
47+
#' nano1$recv("character")
4848
#'
4949
#' nano$socket_close()
5050
#' nano1$socket_close()
@@ -90,51 +90,36 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
9090
autostart = TRUE) listen(nano,
9191
url = url,
9292
autostart = autostart)
93-
nano[["recv"]] <- function(block = FALSE, keep.raw = TRUE) recv(socket,
94-
block = block,
95-
keep.raw = keep.raw)
93+
nano[["recv"]] <- function(mode = c("serial", "character", "complex", "double",
94+
"integer", "logical", "numeric", "raw"),
95+
block = FALSE,
96+
keep.raw = TRUE) recv(socket,
97+
mode = mode,
98+
block = block,
99+
keep.raw = keep.raw)
96100
nano[["recv_aio"]] <- function(n = 1L,
101+
mode = c("serial", "character", "complex", "double",
102+
"integer", "logical", "numeric", "raw"),
97103
timeout,
98104
keep.raw = TRUE) recv_aio(socket,
99105
n = n,
100-
timeout = timeout,
101-
keep.raw = keep.raw)
102-
nano[["recv_vec"]] <- function(mode = c("character", "complex", "double", "integer",
103-
"logical", "numeric", "raw"),
104-
block = FALSE,
105-
keep.raw = TRUE) recv_vec(socket,
106106
mode = mode,
107-
block = block,
107+
timeout = timeout,
108108
keep.raw = keep.raw)
109-
nano[["recv_vec_aio"]] <- function(mode = c("character", "complex", "double", "integer",
110-
"logical", "numeric", "raw"),
111-
n = 1L,
112-
timeout,
113-
keep.raw = TRUE) recv_vec_aio(socket,
114-
mode = mode,
115-
n = n,
116-
timeout = timeout,
117-
keep.raw = keep.raw)
118109
nano[["send"]] <- function(data,
110+
mode = c("serial", "raw"),
119111
block = FALSE,
120112
echo = TRUE) send(socket,
121113
data = data,
114+
mode = mode,
122115
block = block,
123116
echo = echo)
124117
nano[["send_aio"]] <- function(...,
118+
mode = c("serial", "raw"),
125119
timeout) send_aio(socket,
126120
...,
121+
mode = mode,
127122
timeout = timeout)
128-
nano[["send_vec"]] <- function(data,
129-
block = FALSE,
130-
echo = TRUE) send_vec(socket,
131-
data = data,
132-
block = block,
133-
echo = echo)
134-
nano[["send_vec_aio"]] <- function(...,
135-
timeout) send_vec_aio(socket,
136-
...,
137-
timeout = timeout)
138123
nano[["socket_close"]] <- function() close(socket)
139124
nano[["socket_setopt"]] <- function(type = c("bool", "int", "ms", "size",
140125
"string", "uint64"),

0 commit comments

Comments
 (0)