Skip to content

Commit 7a27728

Browse files
committed
Add threaded timer, decrease verbosity, reimplement Aios as environments
1 parent 5892b34 commit 7a27728

File tree

16 files changed

+132
-47
lines changed

16 files changed

+132
-47
lines changed

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ export(listen)
3333
export(nano)
3434
export(ncurl)
3535
export(nng_error)
36+
export(nng_timer)
3637
export(nng_version)
3738
export(recv)
3839
export(recv_aio)

NEWS.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
* New `ctx_rep()` and `ctx_req()` functions implement the full logic of an RPC server/client. Designed to be run in separate processes, the server will await data and apply an arbitrary function before returning a result, whilst the client will send data to the server and await a response.
77
* New `ncurl()` minimalistic http(s) client.
88
* Allows setting the environment variable 'NANONEXT_TLS' prior to package installation to enable TLS where the system NNG library has been built with TLS support (using Mbed TLS).
9+
* New `nng_timer()` utility as a demonstration of NNG's multithreading capabilities.
910

1011
#### Updates
1112

13+
* Successful starts of dialers/listeners and successful close operations no longer print a message to stdout for less verbosity by default. The state of respective objects can always be queried by their state attribute using `$state`
1214
* All send and receive functions, e.g. `send()`/`recv()`, gain a revised 'mode' argument. This now permits the choice of whether to use R serialization, consolidating the functionality of the '_vec' series of functions.
1315
* Functions 'send_vec' and 'recv_vec' are deprecated and will be removed in a future release.
1416

R/aio.R

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,32 +45,31 @@ aio_call <- function(aio) {
4545

4646
if (inherits(aio, "recvAio")) {
4747

48-
mode <- attr(aio, "callparams")[[1L]]
49-
keep.raw <- attr(aio, "callparams")[[2L]]
50-
res <- .Call(rnng_aio_get_msg, aio)
51-
if (keep.raw) attr(aio, "raw") <- res
48+
mode <- aio[["callparams"]][[1L]]
49+
keep.raw <- aio[["callparams"]][[2L]]
50+
res <- .Call(rnng_aio_get_msg, aio[["aio"]])
51+
if (keep.raw) aio[["raw"]] <- res
5252
is.integer(res) && {
5353
message(res, " : ", nng_error(res))
5454
return(invisible(aio))
5555
}
5656
on.exit(expr = {
57-
attr(aio, "raw") <- res
57+
aio[["raw"]] <- res
5858
return(invisible(aio))
5959
})
6060
data <- switch(mode,
6161
serial = unserialize(connection = res),
6262
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
6363
raw = res,
6464
readBin(con = res, what = mode, n = length(res)))
65-
if (is.null(data)) data <- list(NULL)
66-
attr(aio, "data") <- data
65+
aio[["data"]] <- data
6766
on.exit(expr = NULL)
6867
invisible(aio)
6968

7069
} else if (inherits(aio, "sendAio")) {
7170

72-
res <- .Call(rnng_aio_result, aio)
73-
attr(aio, "result") <- res
71+
res <- .Call(rnng_aio_result, aio[["aio"]])
72+
aio[["result"]] <- res
7473
if (res) {
7574
message(res, " : ", nng_error(res))
7675
}
@@ -99,7 +98,7 @@ aio_call <- function(aio) {
9998
#'
10099
aio_stop <- function(aio) {
101100

102-
invisible(.Call(rnng_aio_stop, aio))
101+
invisible(.Call(rnng_aio_stop, aio[["aio"]]))
103102

104103
}
105104

R/listdial.R

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,6 @@ dial <- function(socket,
8181
message(res, " : ", nng_error(res))
8282
return(res)
8383
}
84-
message("dialer started...")
8584
} else {
8685
res <- .Call(rnng_dialer_create, socket[["socket"]], url)
8786
if (is.integer(res)) {
@@ -107,7 +106,6 @@ dial <- function(socket,
107106
message(res, " : ", nng_error(res))
108107
return(res)
109108
}
110-
message("dialer started...")
111109
} else {
112110
res <- .Call(rnng_dialer_create, socket, url)
113111
if (is.integer(res)) {
@@ -204,7 +202,6 @@ listen <- function(socket,
204202
message(res, " : ", nng_error(res))
205203
return(res)
206204
}
207-
message("listener started...")
208205
} else {
209206
res <- .Call(rnng_listener_create, socket[["socket"]], url)
210207
if (is.integer(res)) {
@@ -230,7 +227,6 @@ listen <- function(socket,
230227
message(res, " : ", nng_error(res))
231228
return(res)
232229
}
233-
message("listener started...")
234230
} else {
235231
res <- .Call(rnng_listener_create, socket, url)
236232
if (is.integer(res)) {

R/methods.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ NULL
2727
start.nanoListener <- function(x, ...) {
2828

2929
xc <- .Call(rnng_listener_start, x)
30-
if (xc) message(xc, " : ", nng_error(xc)) else message("listener started...")
30+
if (xc) message(xc, " : ", nng_error(xc))
3131
invisible(xc)
3232

3333
}
@@ -39,7 +39,7 @@ start.nanoListener <- function(x, ...) {
3939
start.nanoDialer <- function(x, async = TRUE, ...) {
4040

4141
xc <- .Call(rnng_dialer_start, x, async)
42-
if (xc) message(xc, " : ", nng_error(xc)) else message("dialer started...")
42+
if (xc) message(xc, " : ", nng_error(xc))
4343
invisible(xc)
4444

4545
}
@@ -76,7 +76,7 @@ NULL
7676
close.nanoSocket <- function(con, ...) {
7777

7878
xc <- .Call(rnng_close, con)
79-
if (xc) message(xc, " : ", nng_error(xc)) else message("socket closed")
79+
if (xc) message(xc, " : ", nng_error(xc))
8080
invisible(xc)
8181

8282
}
@@ -88,7 +88,7 @@ close.nanoSocket <- function(con, ...) {
8888
close.nanoContext <- function(con, ...) {
8989

9090
xc <- .Call(rnng_ctx_close, con)
91-
if (xc) message(xc, " : ", nng_error(xc)) else message("context closed")
91+
if (xc) message(xc, " : ", nng_error(xc))
9292
invisible(xc)
9393

9494
}
@@ -100,7 +100,7 @@ close.nanoContext <- function(con, ...) {
100100
close.nanoDialer <- function(con, ...) {
101101

102102
xc <- .Call(rnng_dialer_close, con)
103-
if (xc) message(xc, " : ", nng_error(xc)) else message("dialer closed")
103+
if (xc) message(xc, " : ", nng_error(xc))
104104
invisible(xc)
105105

106106
}
@@ -112,7 +112,7 @@ close.nanoDialer <- function(con, ...) {
112112
close.nanoListener <- function(con, ...) {
113113

114114
xc <- .Call(rnng_listener_close, con)
115-
if (xc) message(xc, " : ", nng_error(xc)) else message("listener closed")
115+
if (xc) message(xc, " : ", nng_error(xc))
116116
invisible(xc)
117117

118118
}

R/nano.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,13 @@ print.nanoListener <- function(x, ...) {
209209
print.recvAio <- function(x, ...) {
210210

211211
cat("< recvAio >\n")
212-
is.null(attr(x, "raw")) && is.null(attr(x, "data")) && {
212+
is.null(x[["raw"]]) && is.null(x[["data"]]) && {
213213
cat(": use aio_call() to retrieve message\n")
214214
return(invisible(x))
215215
}
216-
if (!is.null(attr(x, "raw")))
216+
if (!is.null(x[["raw"]]))
217217
cat(" - $raw for raw message\n")
218-
if (!is.null(attr(x, "data")))
218+
if (!is.null(x[["data"]]))
219219
cat(" - $data for message data\n")
220220
invisible(x)
221221

@@ -226,7 +226,7 @@ print.recvAio <- function(x, ...) {
226226
print.sendAio <- function(x, ...) {
227227

228228
cat("< sendAio >\n")
229-
if (is.null(attr(x, "result"))) {
229+
if (is.null(x[["result"]])) {
230230
cat(": use aio_call() to retrieve result\n")
231231
} else {
232232
cat(" - $result for send result\n")

R/sendrecv.R

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ send <- function(socket,
5858
#' @param timeout in ms. If unspecified, a socket-specific default timeout will
5959
#' be used.
6060
#'
61-
#' @return A send Aio (object of class 'sendAio' and 'nano').
61+
#' @return A send Aio (object of class 'sendAio').
6262
#'
6363
#' @details Async send is always non-blocking. To wait for and check the result
6464
#' of the send operation, use \code{\link{aio_call}} on the returned 'sendAio'
@@ -71,7 +71,7 @@ send <- function(socket,
7171
#' aio
7272
#' aio_call(aio)$result
7373
#'
74-
#' aio <- send_aio(pub, "message 1", mode = "raw", timeout = 100)
74+
#' aio <- send_aio(pub, "example message", mode = "raw", timeout = 100)
7575
#' aio_call(aio)$result
7676
#'
7777
#' close(pub)
@@ -86,9 +86,11 @@ send_aio <- function(socket, data, mode = c("serial", "raw"), timeout) {
8686
data <- switch(mode,
8787
serial = serialize(object = data, connection = NULL),
8888
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
89+
env <- `class<-`(new.env(), "sendAio")
8990
aio <- .Call(rnng_send_aio, socket, data, timeout)
91+
env[["aio"]] <- aio
9092
if (is.integer(aio)) message(aio, " : ", nng_error(aio))
91-
invisible(aio)
93+
invisible(env)
9294

9395
}
9496

@@ -128,7 +130,7 @@ send_aio <- function(socket, data, mode = c("serial", "raw"), timeout) {
128130
#' send(s1, c(1.1, 2.2, 3.3), mode = "raw")
129131
#' res <- recv(s2, mode = "double")
130132
#' res
131-
#' send(s1, "example test", mode = "raw", echo = FALSE)
133+
#' send(s1, "example message", mode = "raw", echo = FALSE)
132134
#' recv(s2, mode = "character", keep.raw = FALSE)
133135
#'
134136
#' close(s1)
@@ -167,7 +169,7 @@ recv <- function(socket,
167169
#' @inheritParams recv
168170
#' @inheritParams send_aio
169171
#'
170-
#' @return A recv Aio (object of class 'recvAio' and 'nano').
172+
#' @return A recv Aio (object of class 'recvAio').
171173
#'
172174
#' @details Async receive is always non-blocking. To wait for the AIO to complete
173175
#' and retrieve the received message, use \code{\link{aio_call}} on the
@@ -188,7 +190,7 @@ recv <- function(socket,
188190
#' aio_call(res)
189191
#' res
190192
#'
191-
#' send_aio(s1, "message 1", mode = "raw", timeout = 100)
193+
#' send_aio(s1, "example message", mode = "raw", timeout = 100)
192194
#' res <- recv_aio(s2, mode = "character", timeout = 100)
193195
#' aio_call(res)
194196
#' res$raw
@@ -207,13 +209,15 @@ recv_aio <- function(socket,
207209

208210
mode <- match.arg(mode)
209211
if (missing(timeout)) timeout <- -2L
212+
env <- `class<-`(new.env(), "recvAio")
210213
aio <- .Call(rnng_recv_aio, socket, timeout)
214+
env[["aio"]] <- aio
211215
if (is.integer(aio)) {
212216
message(aio, " : ", nng_error(aio))
213217
} else {
214-
attr(aio, "callparams") <- list(mode, missing(keep.raw) || isTRUE(keep.raw))
218+
env[["callparams"]] <- list(mode, missing(keep.raw) || isTRUE(keep.raw))
215219
}
216-
aio
220+
invisible(env)
217221

218222
}
219223

R/utils.R

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,34 @@ nng_error <- function(error) {
4545

4646
}
4747

48+
#' Timer Utility
49+
#'
50+
#' Set a timer (stopwatch). Will print a message to the console (stdout) upon
51+
#' completion.
52+
#'
53+
#' @param time time in ms. Non-integer values will be translated to integer using
54+
#' \code{as.integer()}.
55+
#'
56+
#' @return An external pointer to the thread created by this function.
57+
#'
58+
#' @details The return value of this function should not normally be assigned as
59+
#' this preserves the thread instead of it being automatically reaped during
60+
#' garbage collection.
61+
#'
62+
#' As reaping the thread waits until the timer has completed, a
63+
#' possible side effect is blocking garbage collection until this has happened
64+
#' (not guaranteed, as garbage collection may happen on other objects first).
65+
#' If this is undesirable, assign the external pointer to an object and then
66+
#' remove it after completion.
67+
#'
68+
#' @export
69+
#'
70+
nng_timer <- function(time) {
71+
72+
invisible(.Call(rnng_threaded_timer, as.integer(time)))
73+
74+
}
75+
4876
#' ncurl
4977
#'
5078
#' nano cURL - a minimalistic http(s) client.

man/nng_timer.Rd

Lines changed: 30 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/recv.Rd

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)