Skip to content

Commit eb51392

Browse files
committed
move decode to C level for async recv
1 parent eca2f17 commit eb51392

File tree

13 files changed

+108
-199
lines changed

13 files changed

+108
-199
lines changed

R/aio.R

Lines changed: 36 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -216,24 +216,14 @@ recv_aio.nanoSocket <- function(con,
216216
if (keep.raw) {
217217
makeActiveBinding(sym = "raw", fun = function(x) {
218218
if (unresolv) {
219-
res <- .Call(rnng_aio_get_msg, aio)
219+
res <- .Call(rnng_aio_get_msg, aio, mode, keep.raw)
220220
missing(res) && return(.Call(rnng_aio_unresolv))
221-
is.integer(res) && {
221+
if (is_error_value(res)) {
222222
data <<- raw <<- res
223-
aio <<- env[["aio"]] <<- NULL
224-
unresolv <<- FALSE
225-
return(res)
223+
} else {
224+
raw <<- .subset2(res, "raw")
225+
data <<- .subset2(res, "data")
226226
}
227-
on.exit(expr = {
228-
raw <<- res
229-
aio <<- env[["aio"]] <<- NULL
230-
unresolv <<- FALSE
231-
return(res)
232-
})
233-
data <- decode(con = res, mode = mode)
234-
on.exit()
235-
raw <<- res
236-
data <<- data
237227
aio <<- env[["aio"]] <<- NULL
238228
unresolv <<- FALSE
239229
}
@@ -242,24 +232,16 @@ recv_aio.nanoSocket <- function(con,
242232
}
243233
makeActiveBinding(sym = "data", fun = function(x) {
244234
if (unresolv) {
245-
res <- .Call(rnng_aio_get_msg, aio)
235+
res <- .Call(rnng_aio_get_msg, aio, mode, keep.raw)
246236
missing(res) && return(.Call(rnng_aio_unresolv))
247-
is.integer(res) && {
237+
if (is_error_value(res)) {
248238
data <<- raw <<- res
249-
aio <<- env[["aio"]] <<- NULL
250-
unresolv <<- FALSE
251-
return(res)
252-
}
253-
on.exit(expr = {
239+
} else if (keep.raw) {
240+
raw <<- .subset2(res, "raw")
241+
data <<- .subset2(res, "data")
242+
} else {
254243
data <<- res
255-
aio <<- env[["aio"]] <<- NULL
256-
unresolv <<- FALSE
257-
return(res)
258-
})
259-
data <- decode(con = res, mode = mode)
260-
on.exit()
261-
if (keep.raw) raw <<- res
262-
data <<- data
244+
}
263245
aio <<- env[["aio"]] <<- NULL
264246
unresolv <<- FALSE
265247
}
@@ -292,24 +274,14 @@ recv_aio.nanoContext <- function(con,
292274
if (keep.raw) {
293275
makeActiveBinding(sym = "raw", fun = function(x) {
294276
if (unresolv) {
295-
res <- .Call(rnng_aio_get_msg, aio)
277+
res <- .Call(rnng_aio_get_msg, aio, mode, keep.raw)
296278
missing(res) && return(.Call(rnng_aio_unresolv))
297-
is.integer(res) && {
279+
if (is_error_value(res)) {
298280
data <<- raw <<- res
299-
aio <<- env[["aio"]] <<- NULL
300-
unresolv <<- FALSE
301-
return(res)
281+
} else {
282+
raw <<- .subset2(res, "raw")
283+
data <<- .subset2(res, "data")
302284
}
303-
on.exit(expr = {
304-
raw <<- res
305-
aio <<- env[["aio"]] <<- NULL
306-
unresolv <<- FALSE
307-
return(res)
308-
})
309-
data <- decode(con = res, mode = mode)
310-
on.exit()
311-
raw <<- res
312-
data <<- data
313285
aio <<- env[["aio"]] <<- NULL
314286
unresolv <<- FALSE
315287
}
@@ -318,24 +290,16 @@ recv_aio.nanoContext <- function(con,
318290
}
319291
makeActiveBinding(sym = "data", fun = function(x) {
320292
if (unresolv) {
321-
res <- .Call(rnng_aio_get_msg, aio)
293+
res <- .Call(rnng_aio_get_msg, aio, mode, keep.raw)
322294
missing(res) && return(.Call(rnng_aio_unresolv))
323-
is.integer(res) && {
295+
if (is_error_value(res)) {
324296
data <<- raw <<- res
325-
aio <<- env[["aio"]] <<- NULL
326-
unresolv <<- FALSE
327-
return(res)
328-
}
329-
on.exit(expr = {
297+
} else if (keep.raw) {
298+
raw <<- .subset2(res, "raw")
299+
data <<- .subset2(res, "data")
300+
} else {
330301
data <<- res
331-
aio <<- env[["aio"]] <<- NULL
332-
unresolv <<- FALSE
333-
return(res)
334-
})
335-
data <- decode(con = res, mode = mode)
336-
on.exit()
337-
if (keep.raw) raw <<- res
338-
data <<- data
302+
}
339303
aio <<- env[["aio"]] <<- NULL
340304
unresolv <<- FALSE
341305
}
@@ -369,24 +333,14 @@ recv_aio.nanoStream <- function(con,
369333
if (keep.raw) {
370334
makeActiveBinding(sym = "raw", fun = function(x) {
371335
if (unresolv) {
372-
res <- .Call(rnng_aio_stream_in, aio)
336+
res <- .Call(rnng_aio_stream_in, aio, mode, keep.raw)
373337
missing(res) && return(.Call(rnng_aio_unresolv))
374-
is.integer(res) && {
338+
if (is_error_value(res)) {
375339
data <<- raw <<- res
376-
aio <<- env[["aio"]] <<- NULL
377-
unresolv <<- FALSE
378-
return(res)
340+
} else {
341+
raw <<- .subset2(res, "raw")
342+
data <<- .subset2(res, "data")
379343
}
380-
on.exit(expr = {
381-
raw <<- res
382-
aio <<- env[["aio"]] <<- NULL
383-
unresolv <<- FALSE
384-
return(res)
385-
})
386-
data <- decode(con = res, mode = mode)
387-
on.exit()
388-
raw <<- res
389-
data <<- data
390344
aio <<- env[["aio"]] <<- NULL
391345
unresolv <<- FALSE
392346
}
@@ -395,24 +349,16 @@ recv_aio.nanoStream <- function(con,
395349
}
396350
makeActiveBinding(sym = "data", fun = function(x) {
397351
if (unresolv) {
398-
res <- .Call(rnng_aio_stream_in, aio)
352+
res <- .Call(rnng_aio_stream_in, aio, mode, keep.raw)
399353
missing(res) && return(.Call(rnng_aio_unresolv))
400-
is.integer(res) && {
354+
if (is_error_value(res)) {
401355
data <<- raw <<- res
402-
aio <<- env[["aio"]] <<- NULL
403-
unresolv <<- FALSE
404-
return(res)
405-
}
406-
on.exit(expr = {
356+
} else if (keep.raw) {
357+
raw <<- .subset2(res, "raw")
358+
data <<- .subset2(res, "data")
359+
} else {
407360
data <<- res
408-
unresolv <<- FALSE
409-
aio <<- env[["aio"]] <<- NULL
410-
return(res)
411-
})
412-
data <- decode(con = res, mode = mode)
413-
on.exit()
414-
if (keep.raw) raw <<- res
415-
data <<- data
361+
}
416362
aio <<- env[["aio"]] <<- NULL
417363
unresolv <<- FALSE
418364
}

R/context.R

Lines changed: 13 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ request <- function(context,
190190
is.integer(res) && return(res)
191191

192192
aio <- .Call(rnng_ctx_recv_aio, context, timeout)
193-
is.integer(aio) && return(aio)
193+
is_error_value(aio) && return(aio)
194194

195195
keep.raw <- missing(keep.raw) || isTRUE(keep.raw)
196196
data <- raw <- NULL
@@ -199,24 +199,14 @@ request <- function(context,
199199
if (keep.raw) {
200200
makeActiveBinding(sym = "raw", fun = function(x) {
201201
if (unresolv) {
202-
res <- .Call(rnng_aio_get_msg, aio)
202+
res <- .Call(rnng_aio_get_msg, aio, recv_mode, keep.raw)
203203
missing(res) && return(.Call(rnng_aio_unresolv))
204-
is.integer(res) && {
204+
if (is_error_value(res)) {
205205
data <<- raw <<- res
206-
aio <<- env[["aio"]] <<- NULL
207-
unresolv <<- FALSE
208-
return(res)
206+
} else {
207+
raw <<- .subset2(res, "raw")
208+
data <<- .subset2(res, "data")
209209
}
210-
on.exit(expr = {
211-
raw <<- res
212-
aio <<- env[["aio"]] <<- NULL
213-
unresolv <<- FALSE
214-
return(res)
215-
})
216-
data <- decode(con = res, mode = recv_mode)
217-
on.exit()
218-
raw <<- res
219-
data <<- data
220210
aio <<- env[["aio"]] <<- NULL
221211
unresolv <<- FALSE
222212
}
@@ -225,24 +215,16 @@ request <- function(context,
225215
}
226216
makeActiveBinding(sym = "data", fun = function(x) {
227217
if (unresolv) {
228-
res <- .Call(rnng_aio_get_msg, aio)
218+
res <- .Call(rnng_aio_get_msg, aio, recv_mode, keep.raw)
229219
missing(res) && return(.Call(rnng_aio_unresolv))
230-
is.integer(res) && {
220+
if (is_error_value(res)) {
231221
data <<- raw <<- res
232-
aio <<- env[["aio"]] <<- NULL
233-
unresolv <<- FALSE
234-
return(res)
235-
}
236-
on.exit(expr = {
222+
} else if (keep.raw) {
223+
raw <<- .subset2(res, "raw")
224+
data <<- .subset2(res, "data")
225+
} else {
237226
data <<- res
238-
aio <<- env[["aio"]] <<- NULL
239-
unresolv <<- FALSE
240-
return(res)
241-
})
242-
data <- decode(con = res, mode = recv_mode)
243-
on.exit()
244-
if (keep.raw) raw <<- res
245-
data <<- data
227+
}
246228
aio <<- env[["aio"]] <<- NULL
247229
unresolv <<- FALSE
248230
}

R/ncurl.R

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,12 @@ ncurl <- function(url,
8484
if (unresolv) {
8585
res <- .Call(rnng_aio_http, aio)
8686
missing(res) && return(.Call(rnng_aio_unresolv))
87-
is.integer(res) && {
87+
if (is.integer(res)) {
8888
data <<- raw <<- res
89-
aio <<- env[["aio"]] <<- NULL
90-
unresolv <<- FALSE
91-
return(res)
89+
} else {
90+
raw <<- res
91+
data <<- if (convert) tryCatch(rawToChar(res), error = function(e) NULL)
9292
}
93-
raw <<- res
94-
data <<- if (convert) tryCatch(rawToChar(res), error = function(e) NULL)
9593
aio <<- env[["aio"]] <<- NULL
9694
unresolv <<- FALSE
9795
}
@@ -101,14 +99,12 @@ ncurl <- function(url,
10199
if (unresolv) {
102100
res <- .Call(rnng_aio_http, aio)
103101
missing(res) && return(.Call(rnng_aio_unresolv))
104-
is.integer(res) && {
102+
if (is.integer(res)) {
105103
data <<- raw <<- res
106-
aio <<- env[["aio"]] <<- NULL
107-
unresolv <<- FALSE
108-
return(res)
104+
} else {
105+
raw <<- res
106+
data <<- if (convert) tryCatch(rawToChar(res), error = function(e) NULL)
109107
}
110-
raw <<- res
111-
data <<- if (convert) tryCatch(rawToChar(res), error = function(e) NULL)
112108
aio <<- env[["aio"]] <<- NULL
113109
unresolv <<- FALSE
114110
}

R/sendrecv.R

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
#'
77
#' @param con a Socket, Context or Stream.
88
#' @param data an object (a vector, if mode = 'raw').
9-
#' @param mode either 'serial' for sending serialised R objects, or 'raw' for
9+
#' @param mode [default 'serial'] for sending serialised R objects, or 'raw' for
1010
#' sending vectors of any type (converted to a raw byte vector for sending).
11-
#' For Streams, 'raw' is the only choice and any other value is ignored. Use
11+
#' For Streams, 'raw' is the only option and any other value is ignored. Use
1212
#' 'serial' for perfect reproducibility within R, although 'raw' must be used
1313
#' when interfacing with external applications that do not understand R
1414
#' serialisation.
@@ -128,22 +128,21 @@ send.nanoStream <- function(con,
128128
#' Receive data over a connection (Socket, Context or Stream).
129129
#'
130130
#' @param con a Socket, Context or Stream.
131-
#' @param mode <Sockets and Contexts> [default 'serial'] mode of vector to be
132-
#' received - one of 'serial', 'character', 'complex', 'double', 'integer',
133-
#' 'logical', 'numeric', or 'raw'. The default 'serial' means a serialised
134-
#' R object, for the other modes, the raw vector received will be converted
135-
#' into the respective mode.
136-
#' <Streams> [default 'character'] note that 'serial' is not an option for
137-
#' Streams.
131+
#' @param mode [default 'serial'] mode of vector to be received - one of 'serial',
132+
#' 'character', 'complex', 'double', 'integer', 'logical', 'numeric', or 'raw'.
133+
#' The default 'serial' means a serialised R object, for the other modes,
134+
#' the raw vector received will be converted into the respective mode.
135+
#' For Streams, 'serial' is not an option and the default is 'character'.
138136
#' @param block logical TRUE to block until successful or FALSE to return
139137
#' immediately even if unsuccessful (e.g. if no messages are available),
140138
#' or else an integer value specifying the maximum time to block in
141139
#' milliseconds, after which the operation will time out.
142140
#' @param keep.raw [default TRUE] logical flag whether to keep the received raw
143141
#' vector (useful for verification e.g. via hashing). If FALSE, will return
144142
#' the converted data only.
145-
#' @param n <Streams> [default 65536L] the maximum number of bytes to receive.
146-
#' Can be an over-estimate, but note that a buffer of this size is reserved.
143+
#' @param n [default 65536L] applicable to Streams only, the maximum number of
144+
#' bytes to receive. Can be an over-estimate, but note that a buffer of this
145+
#' size is reserved.
147146
#' @param ... currently unused.
148147
#'
149148
#' @return Named list of 2 elements: 'raw' containing the received raw vector

R/utils.R

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -262,18 +262,6 @@ encode <- function(data, mode) {
262262
if (is.raw(data)) data else writeBin(object = data, con = raw()))
263263
}
264264

265-
decode <- function(con, mode) {
266-
switch(mode,
267-
unserialize(connection = con),
268-
(r <- readBin(con = con, what = "character", n = length(con) / 2L))[nzchar(r)],
269-
readBin(con = con, what = "complex", n = length(con) / 16L),
270-
readBin(con = con, what = double(), n = length(con) / 8L),
271-
readBin(con = con, what = integer(), n = length(con) / 4L),
272-
readBin(con = con, what = logical(), n = length(con) / 4L),
273-
readBin(con = con, what = numeric(), n = length(con) / 8L),
274-
con)
275-
}
276-
277265
match.arg2 <- function(choice, choices) {
278266
identical(choice, choices) && return(1L)
279267
index <- pmatch(choice[1L], choices, nomatch = 0L, duplicates.ok = TRUE)

README.md

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ aio
377377
#> < recvAio >
378378
#> - $data for message data
379379
aio$data |> str()
380-
#> num [1:100000000] -0.42 1.154 0.243 -0.609 1.335 ...
380+
#> num [1:100000000] -0.0308 0.1989 -0.9806 -0.1879 -0.3781 ...
381381
```
382382

383383
As `call_aio()` is blocking and will wait for completion, an alternative
@@ -514,11 +514,11 @@ ncurl("http://httpbin.org/headers")
514514
#> [1] 7b 0a 20 20 22 68 65 61 64 65 72 73 22 3a 20 7b 0a 20 20 20 20 22 48 6f 73
515515
#> [26] 74 22 3a 20 22 68 74 74 70 62 69 6e 2e 6f 72 67 22 2c 20 0a 20 20 20 20 22
516516
#> [51] 58 2d 41 6d 7a 6e 2d 54 72 61 63 65 2d 49 64 22 3a 20 22 52 6f 6f 74 3d 31
517-
#> [76] 2d 36 32 36 31 63 32 31 62 2d 34 61 33 30 62 32 36 31 31 32 62 64 30 33 63
518-
#> [101] 65 33 38 64 30 31 66 31 64 22 0a 20 20 7d 0a 7d 0a
517+
#> [76] 2d 36 32 36 32 36 61 65 62 2d 33 66 33 39 61 37 31 31 35 37 65 65 35 62 63
518+
#> [101] 65 35 38 36 65 62 38 66 34 22 0a 20 20 7d 0a 7d 0a
519519
#>
520520
#> $data
521-
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6261c21b-4a30b26112bd03ce38d01f1d\"\n }\n}\n"
521+
#> [1] "{\n \"headers\": {\n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62626aeb-3f39a71157ee5bce586eb8f4\"\n }\n}\n"
522522
```
523523

524524
For advanced use, supports additional HTTP methods such as POST or PUT.
@@ -533,7 +533,7 @@ res
533533
#> - $raw for raw message
534534

535535
call_aio(res)$data
536-
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-6261c21b-46a08c5728a80a39448bdeca\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
536+
#> [1] "{\n \"args\": {}, \n \"data\": \"{\\\"key\\\": \\\"value\\\"}\", \n \"files\": {}, \n \"form\": {}, \n \"headers\": {\n \"Authorization\": \"Bearer APIKEY\", \n \"Content-Length\": \"16\", \n \"Content-Type\": \"application/json\", \n \"Host\": \"httpbin.org\", \n \"X-Amzn-Trace-Id\": \"Root=1-62626aeb-71a64d74239ceae125f0a20f\"\n }, \n \"json\": {\n \"key\": \"value\"\n }, \n \"origin\": \"78.145.225.121\", \n \"url\": \"http://httpbin.org/post\"\n}\n"
537537
```
538538

539539
In this respect, it may be used as a performant and lightweight method

0 commit comments

Comments
 (0)