Skip to content

Commit ad32fdd

Browse files
committed
slightly better state for nano objects, more safety for nng_timer
1 parent f0502ae commit ad32fdd

File tree

9 files changed

+97
-67
lines changed

9 files changed

+97
-67
lines changed

R/aio.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ call_aio <- function(aio) {
6262
}
6363
on.exit(expr = {
6464
aio[["raw"]] <- res
65+
rm("aio", envir = aio)
66+
rm("callparams", envir = aio)
6567
return(invisible(aio))
6668
})
6769
data <- switch(mode,

R/context.R

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ recv_ctx <- function(context,
156156

157157
}
158158

159-
#' Reply over Context (Server for Req/Rep Protocol)
159+
#' Reply over Context (RPC Server for Req/Rep Protocol)
160160
#'
161161
#' Implements an executor/server for the rep node of the req/rep protocol. Awaits
162162
#' data, applies an arbitrary specified function, and returns the result
@@ -176,9 +176,10 @@ recv_ctx <- function(context,
176176
#' The default 'serial' means a serialised R object, for the other modes,
177177
#' the raw vector received will be converted into the respective mode.
178178
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
179-
#' be used. Note this applies to each of the receive and send legs, hence the
180-
#' total elapsed time could be up to twice this parameter plus the time to
181-
#' perform 'execute' on the received data.
179+
#' be used. Note that this applies to receiving the request. The total elapsed
180+
#' time would also include the time for performing 'execute' on the received
181+
#' data. The timeout then also applies to sending the result (in the event
182+
#' that the requestor has become unavailable since sending the request).
182183
#' @param ... additional arguments passed to the function specified by 'execute'.
183184
#'
184185
#' @return Invisible NULL.
@@ -187,11 +188,10 @@ recv_ctx <- function(context,
187188
#' the desired behaviour. Set a timeout to allow the function to return
188189
#' if no data is forthcoming.
189190
#'
190-
#' In the event of an error in unserialisation or conversion of the
191-
#' received message, in the evaluation of the function with respect to the
192-
#' data, or in the serialization or conversion of the message to be sent,
193-
#' a NULL byte (or serialized NULL byte) will be sent in reply to signal an
194-
#' error to the client.
191+
#' In the event of an error in either processing the messages or in evaluation
192+
#' of the function with respect to the data, a nul byte \code{00} (or serialized
193+
#' nul byte) will be sent in reply to the client to signal an error. This makes
194+
#' it easy to distigush an error from a NULL return value.
195195
#'
196196
#' @examples
197197
#' req <- socket("req", listen = "tcp://127.0.0.1:6546")
@@ -229,7 +229,7 @@ reply <- function(context,
229229
message(res, " : ", nng_error(res))
230230
return(invisible(res))
231231
}
232-
on.exit(expr = send_aio(context, writeBin(object = "", con = raw()), mode = send_mode))
232+
on.exit(expr = send_aio(context, as.raw(0L), mode = send_mode))
233233
data <- switch(recv_mode,
234234
serial = unserialize(connection = res),
235235
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
@@ -249,7 +249,7 @@ reply <- function(context,
249249

250250
}
251251

252-
#' Request over Context (Client for Req/Rep Protocol)
252+
#' Request over Context (RPC Client for Req/Rep Protocol)
253253
#'
254254
#' Implements a caller/client for the req node of the req/rep protocol. Sends
255255
#' data to the rep node (executor/server) and returns an Aio, which can be
@@ -259,8 +259,7 @@ reply <- function(context,
259259
#' @inheritParams recv
260260
#' @param data an R object (if send_mode = 'raw', an R vector).
261261
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
262-
#' be used. Note this applies to each of the send and receive legs, hence the
263-
#' total elapsed time could be up to twice this parameter.
262+
#' be used. Note that this applies to receiving the result.
264263
#'
265264
#' @return A recv Aio (object of class 'recvAio').
266265
#'
@@ -271,8 +270,10 @@ reply <- function(context,
271270
#' without blocking the client. Use \code{\link{call_aio}} on the 'recvAio'
272271
#' to call the result when required.
273272
#'
274-
#' If an error occured in the server process, a NULL byte will be received
275-
#' (as \code{$data} if 'recv_mode' = 'serial', as \code{$raw} otherwise).
273+
#' If an error occured in the server process, a nul byte \code{00} will be
274+
#' received (as \code{$data} if 'recv_mode' = 'serial', as \code{$raw}
275+
#' otherwise). This allows an error to be easily distinguished from a NULL
276+
#' return value.
276277
#'
277278
#' @examples
278279
#' req <- socket("req", listen = "tcp://127.0.0.1:6546")

R/nano.R

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,8 +68,10 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
6868
dial(nano, url = dial, autostart = TRUE)
6969
} else {
7070
dial(nano, url = dial, autostart = FALSE)
71-
nano[["dialer_start"]] <- function(async = TRUE) start(.subset2(nano, "dialer")[[1L]],
72-
async = async)
71+
nano[["dialer_start"]] <- function(async = TRUE) {
72+
rm("dialer_start", envir = nano)
73+
start(.subset2(nano, "dialer")[[1L]], async = async)
74+
}
7375
}
7476
}
7577

@@ -78,7 +80,10 @@ nano <- function(protocol = c("pair", "bus", "push", "pull", "req", "rep",
7880
listen(nano, url = listen, autostart = TRUE)
7981
} else {
8082
listen(nano, url = listen, autostart = FALSE)
81-
nano[["listener_start"]] <- function() start(.subset2(nano, "listener")[[1L]])
83+
nano[["listener_start"]] <- function() {
84+
rm("listener_start", envir = nano)
85+
start(.subset2(nano, "listener")[[1L]])
86+
}
8287
}
8388
}
8489

R/utils.R

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,12 @@ nng_error <- function(error) {
6969
#'
7070
nng_timer <- function(time) {
7171

72-
invisible(.Call(rnng_threaded_timer, as.integer(time)))
72+
if (is.numeric(time) && time >= 0) {
73+
time <- as.integer(time)
74+
} else {
75+
stop("a numeric value >= 0 is required")
76+
}
77+
invisible(.Call(rnng_threaded_timer, time))
7378

7479
}
7580

README.Rmd

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ knitr::opts_chunk$set(
2323

2424
R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is a socket library providing high-performance scalability protocols, implementing a cross-platform standard for messaging and communications. Serves as a concurrency framework that can be used for building distributed systems.
2525

26-
Designed for performance and reliability, the NNG library is written in C and {nanonext} is a lightweight wrapper with no external package dependencies. Supported transports include inproc (intra-process), IPC (inter-process), TCP/IP (IPv4 or IPv6), and WebSocket.
26+
Designed for performance and reliability, the NNG library is written in C and {nanonext} is a lightweight wrapper depending on no other packages. Supported transports include inproc (intra-process), IPC (inter-process), TCP/IP (IPv4 or IPv6), and WebSocket. The inproc transport uses zero-copy where possible for a much faster solution than alternatives.
2727

2828
Can be used for sending data across networks, but equally as an interface for code and processes to communicate with each other. Receive data generated in Python, perform analysis in R, and send results to a C++ program – all on the same computer or on networks spanning the globe.
2929

@@ -117,7 +117,7 @@ recv(socket2)
117117

118118
The following example demonstrates the exchange of numerical data between R and Python (NumPy), two of the most commonly-used languages for data science and machine learning.
119119

120-
Using a messaging interface provides a clean and robust approach that is light on resources and provides fewer points of failure. This is especially relevant when processing real-time data, as an example.
120+
Using a messaging interface provides a clean and robust approach which is light on resources and offers limited and identifiable points of failure. This is especially relevant when processing real-time data, as an example.
121121

122122
This approach can also serve as an interface / pipe between different processes written in the same or different languages, running on the same computer or distributed across networks, and is an enabler of modular software design as espoused by the Unix philosophy.
123123

@@ -169,7 +169,7 @@ s2 <- socket("pair", dial = "inproc://nano")
169169
170170
```
171171

172-
For a 'sendAio' object, calling the result causes it to be stored in the AIO as `$result`. 0 denotes a successful send.
172+
For a 'sendAio' object, calling the result causes it to be stored in the AIO as `$result`. An exit code of 0 denotes a successful send.
173173

174174
```{r async2}
175175
@@ -226,18 +226,20 @@ reply(ctxp, execute = rnorm, send_mode = "raw")
226226
Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object.
227227

228228
```{r rpcclient}
229+
229230
library(nanonext)
230231
req <- socket("req", dial = "tcp://127.0.0.1:6546")
231232
ctxq <- context(req)
232-
aio <- request(ctxq, data = 1e6, recv_mode = "double", keep.raw = FALSE)
233+
aio <- request(ctxq, data = 1e8, recv_mode = "double", keep.raw = FALSE)
234+
233235
```
234236

235237
At this point, the client can run additional code concurrent with the server processing the request.
236238
```{r rpcclient2}
237239
# do more...
238240
```
239241

240-
When the result of the server calculation is required (or, as the case may be, an exit code confirming that the server operation has completed), the `recvAio` may be called using `call_aio()`.
242+
When the result of the server calculation is required, the `recvAio` may be called using `call_aio()`.
241243

242244
The return value from the server request is then retrieved and stored in the Aio as `$data`.
243245

@@ -250,6 +252,10 @@ str(aio$data)
250252
251253
```
252254

255+
In this example the calculation is returned, but other operations may reside entirely on the server side, for example writing data to disk.
256+
257+
In such a case, using `call_aio()` confirms that the operation has completed (or it will wait for completion) and calls the return value of the function, which may typically be NULL or an exit code.
258+
253259
[&laquo; Back to ToC](#table-of-contents)
254260

255261
### Publisher Subscriber Model

README.md

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@ Serves as a concurrency framework that can be used for building
1919
distributed systems.
2020

2121
Designed for performance and reliability, the NNG library is written in
22-
C and {nanonext} is a lightweight wrapper with no external package
23-
dependencies. Supported transports include inproc (intra-process), IPC
24-
(inter-process), TCP/IP (IPv4 or IPv6), and WebSocket.
22+
C and {nanonext} is a lightweight wrapper depending on no other
23+
packages. Supported transports include inproc (intra-process), IPC
24+
(inter-process), TCP/IP (IPv4 or IPv6), and WebSocket. The inproc
25+
transport uses zero-copy where possible for a much faster solution than
26+
alternatives.
2527

2628
Can be used for sending data across networks, but equally as an
2729
interface for code and processes to communicate with each other. Receive
@@ -150,9 +152,10 @@ The following example demonstrates the exchange of numerical data
150152
between R and Python (NumPy), two of the most commonly-used languages
151153
for data science and machine learning.
152154

153-
Using a messaging interface provides a clean and robust approach that is
154-
light on resources and provides fewer points of failure. This is
155-
especially relevant when processing real-time data, as an example.
155+
Using a messaging interface provides a clean and robust approach which
156+
is light on resources and offers limited and identifiable points of
157+
failure. This is especially relevant when processing real-time data, as
158+
an example.
156159

157160
This approach can also serve as an interface / pipe between different
158161
processes written in the same or different languages, running on the
@@ -219,7 +222,7 @@ s2 <- socket("pair", dial = "inproc://nano")
219222
```
220223

221224
For a ‘sendAio’ object, calling the result causes it to be stored in the
222-
AIO as `$result`. 0 denotes a successful send.
225+
AIO as `$result`. An exit code of 0 denotes a successful send.
223226

224227
``` r
225228
res <- send_aio(s1, data.frame(a = 1, b = 2))
@@ -292,7 +295,7 @@ and returns immediately with a `recvAio` object.
292295
library(nanonext)
293296
req <- socket("req", dial = "tcp://127.0.0.1:6546")
294297
ctxq <- context(req)
295-
aio <- request(ctxq, data = 1e6, recv_mode = "double", keep.raw = FALSE)
298+
aio <- request(ctxq, data = 1e8, recv_mode = "double", keep.raw = FALSE)
296299
```
297300

298301
At this point, the client can run additional code concurrent with the
@@ -302,9 +305,8 @@ server processing the request.
302305
# do more...
303306
```
304307

305-
When the result of the server calculation is required (or, as the case
306-
may be, an exit code confirming that the server operation has
307-
completed), the `recvAio` may be called using `call_aio()`.
308+
When the result of the server calculation is required, the `recvAio` may
309+
be called using `call_aio()`.
308310

309311
The return value from the server request is then retrieved and stored in
310312
the Aio as `$data`.
@@ -316,9 +318,16 @@ aio
316318
#> < recvAio >
317319
#> - $data for message data
318320
str(aio$data)
319-
#> num [1:1000000] -1.014 -0.8599 -1.7137 1.9008 0.0125 ...
321+
#> num [1:100000000] -1.104 1.34 0.442 -0.738 0.66 ...
320322
```
321323

324+
In this example the calculation is returned, but other operations may
325+
reside entirely on the server side, for example writing data to disk.
326+
327+
In such a case, using `call_aio()` confirms that the operation has
328+
completed (or it will wait for completion) and calls the return value of
329+
the function, which may typically be NULL or an exit code.
330+
322331
[« Back to ToC](#table-of-contents)
323332

324333
### Publisher Subscriber Model
@@ -378,11 +387,11 @@ ncurl("http://httpbin.org/headers")
378387
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
379388
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
380389
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
381-
#> [76] 2d 36 32 30 32 35 35 39 30 2d 34 65 37 31 61 37 65 37 37 30 63 65 31 64 36
382-
#> [101] 64 30 61 36 61 62 37 39 39 22 0a 20 20 7d 0a 7d 0a
390+
#> [76] 2d 36 32 30 34 30 34 66 65 2d 30 62 39 31 61 34 61 63 32 61 36 62 31 36 34
391+
#> [101] 31 36 61 30 32 38 30 30 63 22 0a 20 20 7d 0a 7d 0a
383392
#>
384393
#> $data
385-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62025590-4e71a7e770ce1d6d0a6ab799\"\n }\n}\n"
394+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-620404fe-0b91a4ac2a6b16416a02800c\"\n }\n}\n"
386395
```
387396

388397
[« Back to ToC](#table-of-contents)

man/reply.Rd

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/request.Rd

Lines changed: 6 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)