Skip to content

Commit a1288f1

Browse files
committed
serious performance tuning
1 parent 50511fc commit a1288f1

File tree

11 files changed

+516
-114
lines changed

11 files changed

+516
-114
lines changed

R/aio.R

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ send_aio <- function(con, data, mode = c("serial", "raw"), timeout = -2L) UseMet
4949
#'
5050
send_aio.nanoSocket <- function(con, data, mode = c("serial", "raw"), timeout = -2L) {
5151

52-
if (missing(mode) || match.arg1(mode) == 1L)
52+
if (.Call(rnng_serial, mode))
5353
data <- serialize(object = data, connection = NULL)
5454
aio <- .Call(rnng_send_aio, con, data, timeout)
5555
is.integer(aio) && return(aio)
@@ -77,7 +77,7 @@ send_aio.nanoSocket <- function(con, data, mode = c("serial", "raw"), timeout =
7777
#'
7878
send_aio.nanoContext <- function(con, data, mode = c("serial", "raw"), timeout = -2L) {
7979

80-
if (missing(mode) || match.arg1(mode) == 1L)
80+
if (.Call(rnng_serial, mode))
8181
data <- serialize(object = data, connection = NULL)
8282
aio <- .Call(rnng_ctx_send_aio, con, data, timeout)
8383
is.integer(aio) && return(aio)
@@ -200,8 +200,7 @@ recv_aio.nanoSocket <- function(con,
200200
keep.raw = TRUE,
201201
...) {
202202

203-
mode <- match.arg2(mode, c("serial", "character", "complex", "double",
204-
"integer", "logical", "numeric", "raw"))
203+
mode <- .Call(rnng_matcharg, mode)
205204
aio <- .Call(rnng_recv_aio, con, timeout)
206205
is.integer(aio) && return(aio)
207206

@@ -258,8 +257,7 @@ recv_aio.nanoContext <- function(con,
258257
keep.raw = TRUE,
259258
...) {
260259

261-
mode <- match.arg2(mode, c("serial", "character", "complex", "double",
262-
"integer", "logical", "numeric", "raw"))
260+
mode <- .Call(rnng_matcharg, mode)
263261
aio <- .Call(rnng_ctx_recv_aio, con, timeout)
264262
is.integer(aio) && return(aio)
265263

@@ -317,8 +315,7 @@ recv_aio.nanoStream <- function(con,
317315
n = 65536L,
318316
...) {
319317

320-
mode <- match.arg2(mode, c("character", "complex", "double", "integer",
321-
"logical", "numeric", "raw")) + 1L
318+
mode <- .Call(rnng_matchargs, mode)
322319
aio <- .Call(rnng_stream_recv_aio, con, n, timeout)
323320
is.integer(aio) && return(aio)
324321

R/context.R

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -113,18 +113,15 @@ reply <- function(context,
113113
timeout = -2L,
114114
...) {
115115

116-
recv_mode <- match.arg2(recv_mode, c("serial", "character", "complex", "double",
117-
"integer", "logical", "numeric", "raw"))
118116
res <- .Call(rnng_ctx_recv, context, recv_mode, timeout, FALSE)
119117
is_error_value(res) && return(invisible(res))
120118
on.exit(expr = send_aio(context, as.raw(0L), mode = send_mode))
121119
data <- execute(res, ...)
122-
if (missing(send_mode) || match.arg1(send_mode) == 1L)
120+
if (.Call(rnng_serial, send_mode))
123121
data <- serialize(object = data, connection = NULL)
124-
res <- .Call(rnng_ctx_send, context, data, timeout)
122+
res <- .Call(rnng_ctx_send, context, data, timeout, FALSE)
125123
on.exit()
126-
is.integer(res) && return(invisible(res))
127-
invisible(0L)
124+
if (missing(res)) invisible(0L) else invisible(res)
128125

129126
}
130127

@@ -183,9 +180,8 @@ request <- function(context,
183180
timeout = -2L,
184181
keep.raw = TRUE) {
185182

186-
recv_mode <- match.arg2(recv_mode, c("serial", "character", "complex", "double",
187-
"integer", "logical", "numeric", "raw"))
188-
if (missing(send_mode) || match.arg1(send_mode) == 1L)
183+
recv_mode <- .Call(rnng_matcharg, recv_mode)
184+
if (.Call(rnng_serial, send_mode))
189185
data <- serialize(object = data, connection = NULL)
190186
res <- .Call(rnng_ctx_send_aio, context, data, -2L)
191187
is.integer(res) && return(res)

R/messenger.R

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ messenger <- function(url) {
2929
sock <- .Call(rnng_messenger, url)
3030
is.integer(sock) && return(invisible(sock))
3131
on.exit(expr = {
32-
s <- suppressWarnings(.Call(rnng_send, sock, writeBin(":d ", raw()), 0L))
32+
suppressWarnings(.Call(rnng_send, sock, writeBin(":d ", raw()), 0L, FALSE))
3333
.Call(rnng_close, sock)
3434
invisible()
3535
})
@@ -43,7 +43,7 @@ messenger <- function(url) {
4343
cat(sprintf("\n| url: %s\n", url), file = stdout())
4444
cat("| connecting... ", file = stderr())
4545

46-
s <- suppressWarnings(.Call(rnng_send, sock, writeBin(":c ", raw()), 1000L))
46+
s <- suppressWarnings(.Call(rnng_send, sock, writeBin(":c ", raw()), 1000L, TRUE))
4747
if (is.integer(s)) {
4848
cat(sprintf("\r| peer offline: %s\n", format.POSIXct(Sys.time())), file = stderr())
4949
} else {
@@ -55,7 +55,7 @@ messenger <- function(url) {
5555
data <- readline()
5656
if (identical(data, ":q")) break
5757
if (identical(data, "")) next
58-
s <- suppressWarnings(.Call(rnng_send, sock, data, 0L))
58+
s <- suppressWarnings(.Call(rnng_send, sock, data, 0L, TRUE))
5959
if (is.integer(s)) {
6060
cat(sprintf("%*s > not sent: peer offline: %s\n", nchar(data), "", format.POSIXct(Sys.time())),
6161
file = stderr())

R/sendrecv.R

Lines changed: 8 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -74,11 +74,10 @@ send.nanoSocket <- function(con,
7474
block = FALSE,
7575
echo = TRUE) {
7676

77-
if (missing(mode) || match.arg1(mode) == 1L)
77+
if (.Call(rnng_serial, mode))
7878
data <- serialize(object = data, connection = NULL)
79-
res <- .Call(rnng_send, con, data, block)
80-
is.integer(res) && return(res)
81-
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
79+
res <- .Call(rnng_send, con, data, block, echo)
80+
if (missing(res)) invisible(0L) else res
8281

8382
}
8483

@@ -92,12 +91,10 @@ send.nanoContext <- function(con,
9291
block = TRUE,
9392
echo = TRUE) {
9493

95-
if (missing(mode) || match.arg1(mode) == 1L)
94+
if (.Call(rnng_serial, mode))
9695
data <- serialize(object = data, connection = NULL)
97-
if (missing(block) || isTRUE(block)) block <- -2L
98-
res <- .Call(rnng_ctx_send, con, data, block)
99-
is.integer(res) && return(res)
100-
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
96+
res <- .Call(rnng_ctx_send, con, data, block, echo)
97+
if (missing(res)) invisible(0L) else res
10198

10299
}
103100

@@ -111,14 +108,11 @@ send.nanoStream <- function(con,
111108
block = TRUE,
112109
echo = TRUE) {
113110

114-
if (missing(block) || isTRUE(block)) block <- -2L
115-
res <- .Call(rnng_stream_send, con, data, block)
116-
is.integer(res) && return(res)
117-
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
111+
res <- .Call(rnng_stream_send, con, data, block, echo)
112+
if (missing(res)) invisible(0L) else res
118113

119114
}
120115

121-
122116
#' Receive
123117
#'
124118
#' Receive data over a connection (Socket, Context or Stream).
@@ -220,8 +214,6 @@ recv.nanoSocket <- function(con,
220214
keep.raw = TRUE,
221215
...) {
222216

223-
mode <- match.arg2(mode, c("serial", "character", "complex", "double",
224-
"integer", "logical", "numeric", "raw"))
225217
.Call(rnng_recv, con, mode, block, keep.raw)
226218

227219
}
@@ -237,9 +229,6 @@ recv.nanoContext <- function(con,
237229
keep.raw = TRUE,
238230
...) {
239231

240-
mode <- match.arg2(mode, c("serial", "character", "complex", "double",
241-
"integer", "logical", "numeric", "raw"))
242-
if (missing(block) || isTRUE(block)) block <- -2L
243232
.Call(rnng_ctx_recv, con, mode, block, keep.raw)
244233

245234
}
@@ -256,9 +245,6 @@ recv.nanoStream <- function(con,
256245
n = 65536L,
257246
...) {
258247

259-
mode <- match.arg2(mode, c("character", "complex", "double", "integer",
260-
"logical", "numeric", "raw")) + 1L
261-
if (missing(block) || isTRUE(block)) block <- -2L
262248
.Call(rnng_stream_recv, con, mode, block, keep.raw, n)
263249

264250
}

R/utils.R

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,3 @@ match.arg2 <- function(choice, choices) {
264264
index
265265
}
266266

267-
match.arg1 <- function(choice) {
268-
index <- pmatch(choice[1L], c("serial", "raw"), nomatch = 0L, duplicates.ok = TRUE)
269-
index || stop(sprintf("'%s' should be one of serial, raw", deparse(substitute(choice))))
270-
index
271-
}
272-

README.Rmd

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ nano2$recv()
107107

108108
#### Functional Interface
109109

110-
The primary object in the functional interface is the Socket. Use `socket()` to create a socket, and optionally dial or listen at an address. The socket is then passed as the first argument of subsequent actions such as `send()` or `recv()`.
110+
The primary object in the functional interface is the Socket. Use `socket()` to create a socket and dial or listen at an address. The socket is then passed as the first argument of subsequent actions such as `send()` or `recv()`.
111111

112112
*Example using Pipeline (Push/Pull) protocol with TCP/IP transport:*
113113

@@ -268,7 +268,7 @@ close(s2)
268268

269269
Can be used to perform computationally-expensive calculations or I/O-bound operations such as writing large amounts of data to disk in a separate 'server' process running concurrently.
270270

271-
Server process: `reply()` will wait for a message and apply a function, in this case `rnorm()`, before sending back the result.
271+
[S] Server process: `reply()` will wait for a message and apply a function, in this case `rnorm()`, before sending back the result.
272272

273273
```{r rpcserver, eval=FALSE}
274274
@@ -279,7 +279,7 @@ reply(ctxp, execute = rnorm, send_mode = "raw")
279279
280280
```
281281

282-
Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object.
282+
[C] Client process: `request()` performs an async send and receive request and returns immediately with a `recvAio` object.
283283

284284
```{r rpcclient}
285285

README.md

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ nano2$recv()
126126
#### Functional Interface
127127

128128
The primary object in the functional interface is the Socket. Use
129-
`socket()` to create a socket, and optionally dial or listen at an
130-
address. The socket is then passed as the first argument of subsequent
131-
actions such as `send()` or `recv()`.
129+
`socket()` to create a socket and dial or listen at an address. The
130+
socket is then passed as the first argument of subsequent actions such
131+
as `send()` or `recv()`.
132132

133133
*Example using Pipeline (Push/Pull) protocol with TCP/IP transport:*
134134

@@ -339,8 +339,8 @@ Can be used to perform computationally-expensive calculations or
339339
I/O-bound operations such as writing large amounts of data to disk in a
340340
separate ‘server’ process running concurrently.
341341

342-
Server process: `reply()` will wait for a message and apply a function,
343-
in this case `rnorm()`, before sending back the result.
342+
\[S\] Server process: `reply()` will wait for a message and apply a
343+
function, in this case `rnorm()`, before sending back the result.
344344

345345
``` r
346346
library(nanonext)
@@ -349,8 +349,8 @@ ctxp <- context(rep)
349349
reply(ctxp, execute = rnorm, send_mode = "raw")
350350
```
351351

352-
Client process: `request()` performs an async send and receive request
353-
and returns immediately with a `recvAio` object.
352+
\[C\] Client process: `request()` performs an async send and receive
353+
request and returns immediately with a `recvAio` object.
354354

355355
``` r
356356
library(nanonext)
@@ -379,7 +379,7 @@ aio
379379
#> < recvAio >
380380
#> - $data for message data
381381
aio$data |> str()
382-
#> num [1:100000000] -1.072 1.447 0.962 -0.483 -0.843 ...
382+
#> num [1:100000000] 2.698 0.0645 -1.0922 2.1422 0.8848 ...
383383
```
384384

385385
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -516,11 +516,11 @@ ncurl("http://httpbin.org/headers")
516516
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
517517
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
518518
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
519-
#> [76] 2d 36 32 36 35 62 32 62 31 2d 33 33 30 61 30 30 30 37 32 33 61 32 61 39 63
520-
#> [101] 36 36 61 30 36 66 38 36 61 22 0a 20 20 7d 0a 7d 0a
519+
#> [76] 2d 36 32 36 37 31 36 30 32 2d 32 35 64 63 33 32 39 63 30 39 35 34 64 36 33
520+
#> [101] 39 37 37 62 34 30 35 32 39 22 0a 20 20 7d 0a 7d 0a
521521
#>
522522
#> $data
523-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6265b2b1-330a000723a2a9c66a06f86a\"\n }\n}\n"
523+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62671602-25dc329c0954d63977b40529\"\n }\n}\n"
524524
```
525525

526526
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -535,7 +535,7 @@ res
535535
#> - $raw for raw message
536536

537537
call_aio(res)$data
538-
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6265b2b1-09b62748201d88f5057952ab\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
538+
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62671603-121afeba10cc694a091360a9\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
539539
```
540540

541541
In this respect, it may be used as a performant and lightweight method

0 commit comments

Comments
 (0)