Skip to content

Commit c7f3cf4

Browse files
committed
implement RPC server/client (req/rep over context)
1 parent adde66f commit c7f3cf4

File tree

12 files changed

+335
-29
lines changed

12 files changed

+335
-29
lines changed

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ S3method(start,nanoListener)
2222
export(context)
2323
export(ctx_recv)
2424
export(ctx_recv_vec)
25+
export(ctx_rep)
26+
export(ctx_req)
2527
export(ctx_send)
2628
export(ctx_send_vec)
2729
export(dial)

NEWS.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
11
# nanonext 0.1.0.9000 (under development)
22

3-
* Add `ncurl()` minimalistic http(s) client.
3+
#### New Features
4+
5+
* New `ctx_rep()` and `ctx_req()` functions implement the full logic of an RPC server/client. Designed to be run in separate processes, the server will await data and apply an arbitrary function to it before returning a result, whilst the client will send data to the server and await a response.
6+
* New `ncurl()` minimalistic http(s) client.
47
* Allow setting the environment variable 'NANONEXT_TLS' prior to package installation to enable TLS where the system NNG library has been built with TLS support (using Mbed TLS).
5-
* All send and receive functions, e.g. `send()`/`recv()`, gain the argument 'mode' for choosing whether or not to serialize R objects, consolidating the functionality of the '_vec' series of functions, e.g. send_vec()/recv_vec(). The '_vec' series of functions is consequently deprecated.
8+
9+
#### Updates
10+
11+
* All send and receive functions, e.g. `send()`/`recv()`, gain a revamped 'mode' argument. This now permits the choice of whether to use R serialization, consolidating the functionality of the '_vec' series of functions.
12+
* The '_vec' series of functions ('send_vec', 'recv_vec' etc.) is deprecated and will be removed in a future release.
613

714
# nanonext 0.1.0
815

R/context.R

Lines changed: 157 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ ctx_send <- function(context, ..., mode = c("serial", "raw"), timeout) {
102102
#' Receive any number of R objects asynchronously over a Context, with the
103103
#' ability to set receive timeouts.
104104
#'
105-
#' @inheritParams ctx_send
105+
#' @param context a Context.
106106
#' @inheritParams recv_aio
107107
#'
108108
#' @return Named list of 2 elements: 'raw' containing a list of received raw
@@ -162,6 +162,162 @@ ctx_recv <- function(context,
162162

163163
}
164164

165+
#' Reply over Context (Server for Req/Rep Protocol)
166+
#'
167+
#' Implements an executor/server for the rep node of the req/rep protocol. Awaits
168+
#' data, applies an arbitrary specified function, and returns the result
169+
#' to the caller/client.
170+
#'
171+
#' @param context a Context.
172+
#' @param send_mode [default 'serial'] whether data will be sent serialized or
173+
#' as a raw vector. Use 'serial' for sending and receiving within R to ensure
174+
#' perfect reproducibility. Use 'raw' for sending vectors of any type (will be
175+
#' converted to a raw byte vector for sending) - essential when interfacing
176+
#' with external applications.
177+
#' @param recv_mode [default 'serial'] mode of vector to be read - one of 'serial',
178+
#' 'character', 'complex', 'double', 'integer', 'logical', 'numeric', or 'raw'.
179+
#' The default 'serial' means a serialised R object, for the other modes,
180+
#' the raw vector received will be converted into the respective mode.
181+
#' @param execute a function which takes the received (converted) data as its
182+
#' first argument. Can be an anonymous function of the form \code{function(x) do(x)}.
183+
#' Additional arguments can also be passed in through '...'.
184+
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
185+
#' be used. Note this applies to each of the receive and send legs, hence the
186+
#' total elapsed time could be up to twice this parameter plus the time to
187+
#' perform 'execute' on the received data.
188+
#' @param ... additional arguments passed to the function specified in 'reply'.
189+
#'
190+
#' @return Invisible NULL.
191+
#'
192+
#' @details Async recv will block while awaiting a message to arrive and is
193+
#' usually the desired result. Set a timeout to allow the function to return
194+
#' if no data is forthcoming.
195+
#'
196+
#' In case of an error in unserialisation or data conversion, the function
197+
#' will return the received raw vector to allow the data to be recovered.
198+
#'
199+
#' @examples
200+
#' req <- socket("req", listen = "tcp://127.0.0.1:6546")
201+
#' rep <- socket("rep", dial = "tcp://127.0.0.1:6546")
202+
#'
203+
#' ctxq <- context(req)
204+
#' ctxp <- context(rep)
205+
#'
206+
#' ctx_send(ctxq, 2022, timeout = 100)
207+
#' ctx_rep(ctxp, execute = function(x) x + 1, send_mode = "raw", timeout = 100)
208+
#' ctx_recv(ctxq, mode = "double", timeout = 100, keep.raw = FALSE)
209+
#'
210+
#' ctx_send(ctxq, 100, mode = "raw", timeout = 100)
211+
#' ctx_rep(ctxp, recv_mode = "double", execute = log, base = 10, timeout = 100)
212+
#' ctx_recv(ctxq, timeout = 100, keep.raw = FALSE)
213+
#'
214+
#' close(req)
215+
#' close(rep)
216+
#'
217+
#' @export
218+
#'
219+
ctx_rep <- function(context,
220+
...,
221+
recv_mode = c("serial", "character", "complex", "double",
222+
"integer", "logical", "numeric", "raw"),
223+
send_mode = c("serial", "raw"),
224+
execute,
225+
timeout) {
226+
227+
recv_mode <- match.arg(recv_mode)
228+
send_mode <- match.arg(send_mode)
229+
if (missing(timeout)) timeout <- -2L
230+
res <- .Call(rnng_ctx_recv, context, 1L, timeout)[[1L]]
231+
is.integer(res) && {
232+
message(res, " : ", nng_error(res))
233+
return(invisible(res))
234+
}
235+
on.exit(expr = return(res))
236+
data <- switch(recv_mode,
237+
serial = unserialize(connection = res),
238+
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
239+
raw = res,
240+
readBin(con = res, what = recv_mode, n = length(res)))
241+
on.exit(expr = NULL)
242+
msg <- execute(data, ...)
243+
ctx_send(context, msg, mode = send_mode, timeout = timeout)
244+
245+
}
246+
247+
#' Request over Context (Client for Req/Rep Protocol)
248+
#'
249+
#' Implements a caller/client for the req node of the req/rep protocol. Sends
250+
#' data to the rep node (executor/server) and awaits the result to be returned.
251+
#'
252+
#' @inheritParams ctx_rep
253+
#' @inheritParams ctx_recv
254+
#' @param data an R object (if mode is not 'serial', an R vector).
255+
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
256+
#' be used. Note this applies to each of the send and receive legs, hence the
257+
#' total elapsed time could be up to twice this parameter.
258+
#'
259+
#' @return Named list of 2 elements: 'raw' containing the raw vector received
260+
#' from the server and 'data' containing the converted R object, or else the
261+
#' converted R object if 'keep.raw' is set to FALSE.
262+
#'
263+
#' @details Async recv will block while awaiting a response from the server and
264+
#' is usually the desired behaviour. Set a timeout to allow the function to
265+
#' return in case of no response.
266+
#'
267+
#' In case of an error in unserialisation or data conversion, the function
268+
#' will return the received raw vector to allow the data to be recovered.
269+
#'
270+
#' @examples
271+
#' req <- socket("req", listen = "tcp://127.0.0.1:6546")
272+
#' rep <- socket("rep", dial = "tcp://127.0.0.1:6546")
273+
#'
274+
#' ctxq <- context(req)
275+
#' ctxp <- context(rep)
276+
#'
277+
#' # works if req and rep are running in parallel in different processes
278+
#' ctx_rep(ctxp, execute = function(x) x + 1, timeout = 10)
279+
#' ctx_req(ctxq, data = 2022, timeout = 10)
280+
#'
281+
#' close(req)
282+
#' close(rep)
283+
#'
284+
#' @export
285+
#'
286+
ctx_req <- function(context,
287+
data,
288+
send_mode = c("serial", "raw"),
289+
recv_mode = c("serial", "character", "complex", "double",
290+
"integer", "logical", "numeric", "raw"),
291+
timeout,
292+
keep.raw = TRUE) {
293+
294+
send_mode <- match.arg(send_mode)
295+
recv_mode <- match.arg(recv_mode)
296+
if (missing(timeout)) timeout <- -2L
297+
data <- list(switch(send_mode,
298+
serial = serialize(object = data, connection = NULL),
299+
if (is.raw(data)) data else writeBin(object = data, con = raw())))
300+
res <- .Call(rnng_ctx_send, context, data, timeout)[[1L]]
301+
res == 0L || {
302+
message(res, " : ", nng_error(res))
303+
return(invisible(res))
304+
}
305+
res <- .Call(rnng_ctx_recv, context, 1L, timeout)[[1L]]
306+
is.integer(res) && {
307+
message(res, " : ", nng_error(res))
308+
return(invisible(res))
309+
}
310+
on.exit(expr = return(res))
311+
data <- switch(recv_mode,
312+
serial = unserialize(connection = res),
313+
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
314+
raw = res,
315+
readBin(con = res, what = recv_mode, n = length(res)))
316+
on.exit(expr = NULL)
317+
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
318+
319+
}
320+
165321
# Deprecated - may be removed at any time - do not use -------------------------
166322

167323
#' Send Vector over Context (Async)

R/sendrecv.R

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
#' sent data. Set to FALSE for performance-critical applications where
1818
#' invisble NULL will be returned instead.
1919
#'
20-
#' @return Raw vector of sent data, or invisible NULL if 'echo' is set to FALSE.
20+
#' @return Raw vector of sent data, or zero (invisibly) if 'echo' is set to FALSE.
2121
#'
2222
#' @examples
2323
#' pub <- socket("pub", dial = "inproc://nanonext")
@@ -35,9 +35,9 @@ send <- function(socket, data, mode = c("serial", "raw"), block = FALSE, echo =
3535
data <- switch(mode,
3636
serial = serialize(object = data, connection = NULL),
3737
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
38-
res <- .Call(rnng_send, socket, data, block, echo)
38+
res <- .Call(rnng_send, socket, data, block)
3939
if (is.integer(res)) message(res, " : ", nng_error(res))
40-
if (is.null(res)) invisible() else res
40+
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
4141

4242
}
4343

@@ -47,7 +47,6 @@ send <- function(socket, data, mode = c("serial", "raw"), block = FALSE, echo =
4747
#' to set send timeouts.
4848
#'
4949
#' @inheritParams send
50-
#' @param socket a Socket.
5150
#' @param ... one or more R objects (if mode = 'raw', R vectors) to send
5251
#' asynchronously.
5352
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
@@ -247,10 +246,10 @@ recv_aio <- function(socket,
247246
#'
248247
send_vec <- function(socket, data, block = FALSE, echo = TRUE) {
249248

250-
251-
res <- .Call(rnng_send, socket, data, block, echo)
249+
data <- if (is.raw(data)) data else writeBin(object = data, con = raw())
250+
res <- .Call(rnng_send, socket, data, block)
252251
if (is.integer(res)) message(res, " : ", nng_error(res))
253-
if (is.null(res)) invisible() else res
252+
if (missing(echo) || isTRUE(echo)) res else invisible()
254253

255254
}
256255

man/ctx_recv.Rd

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/ctx_recv_vec.Rd

Lines changed: 4 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/ctx_rep.Rd

Lines changed: 76 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)