Skip to content

Commit b8a4bc0

Browse files
committed
improved non-blocking call_aio(), remove experimental tag
1 parent 30e0597 commit b8a4bc0

File tree

5 files changed

+81
-30
lines changed

5 files changed

+81
-30
lines changed

R/aio.R

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
#'
88
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
99
#' @param block [default TRUE] whether to wait for completion of the AIO
10-
#' operation (blocking) or return immediately. [experimental]
10+
#' operation (blocking) or return immediately.
1111
#'
1212
#' @return The passed Aio object (invisibly), or NULL if non-blocking and the
1313
#' Aio has yet to resolve.
@@ -36,13 +36,11 @@
3636
#' (invisibly) instead of NULL. The data may then be extracted from the Aio
3737
#' using \code{$result}, \code{$raw} or \code{$data} as the case may be.
3838
#'
39-
#' It is not advisable to try to extract the data in one step in the
40-
#' non-blocking case as NULL$result is also NULL, hence it would be impossible
41-
#' to distinguish between an unresolved Aio and a NULL return value.
42-
#'
43-
#' This argument has the tag [experimental], which indicates that it remains
44-
#' under development. Please note that the final implementation may differ
45-
#' from the current version.
39+
#' It is not advisable to try to extract the data from a 'recvAio' in one
40+
#' step using something like \code{call_aio(x)$data} in the non-blocking case.
41+
#' This is as \code{call_aio()} will return NULL if the Aio is unresolved and
42+
#' \code{NULL$data} is also \code{NULL}, hence it would be impossible to
43+
#' distinguish between an unresolved Aio and a NULL return value.
4644
#'
4745
#' @examples
4846
#' s1 <- socket("pair", listen = "inproc://nanonext")
@@ -68,15 +66,16 @@ call_aio <- function(aio, block = TRUE) {
6866

6967
if (length(.subset2(aio, "aio"))) {
7068

71-
if (!missing(block) && !isTRUE(block)) {
72-
.Call(rnng_aio_check, .subset2(aio, "aio")) || return()
73-
}
74-
7569
if (inherits(aio, "recvAio")) {
7670

71+
if (!missing(block) && !isTRUE(block)) {
72+
res <- .Call(rnng_aio_get_msg, .subset2(aio, "aio"))
73+
missing(res) && return()
74+
} else {
75+
res <- .Call(rnng_aio_wait_get_msg, .subset2(aio, "aio"))
76+
}
7777
mode <- .subset2(aio, "callparams")[[1L]]
7878
keep.raw <- .subset2(aio, "callparams")[[2L]]
79-
res <- .Call(rnng_aio_get_msg, .subset2(aio, "aio"))
8079
if (keep.raw) aio[["raw"]] <- res
8180
is.integer(res) && {
8281
message(res, " : ", nng_error(res))
@@ -99,7 +98,13 @@ call_aio <- function(aio, block = TRUE) {
9998
rm("callparams", envir = aio)
10099

101100
} else if (inherits(aio, "sendAio")) {
102-
res <- .Call(rnng_aio_result, .subset2(aio, "aio"))
101+
102+
if (!missing(block) && !isTRUE(block)) {
103+
res <- .Call(rnng_aio_result, .subset2(aio, "aio"))
104+
missing(res) && return()
105+
} else {
106+
res <- .Call(rnng_aio_wait_result, .subset2(aio, "aio"))
107+
}
103108
aio[["result"]] <- res
104109
rm("aio", envir = aio)
105110
if (res) {

man/call_aio.Rd

Lines changed: 6 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aio.c

Lines changed: 52 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,29 +171,49 @@ SEXP rnng_send_aio(SEXP socket, SEXP data, SEXP timeout) {
171171

172172
}
173173

174-
SEXP rnng_aio_check(SEXP aio) {
174+
SEXP rnng_aio_get_msg(SEXP aio) {
175175

176176
if (R_ExternalPtrTag(aio) != nano_AioSymbol)
177177
error_return("'aio' is not a valid Aio");
178178
if (R_ExternalPtrAddr(aio) == NULL)
179179
error_return("'aio' is not an active Aio");
180-
int res;
180+
181+
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
181182
int_mtx *mutex = (int_mtx *) R_ExternalPtrAddr(Rf_getAttrib(aio, nano_StateSymbol));
182183
nng_mtx_lock(mutex->mtx);
183-
res = mutex->state;
184+
int resolv = mutex->state;
184185
nng_mtx_unlock(mutex->mtx);
186+
if (!resolv)
187+
return R_MissingArg;
188+
189+
int xc = nng_aio_result(aiop);
190+
if (xc) {
191+
nng_aio_free(aiop);
192+
R_ClearExternalPtr(aio);
193+
return Rf_ScalarInteger(xc);
194+
}
195+
nng_msg *msgp = nng_aio_get_msg(aiop);
196+
size_t sz = nng_msg_len(msgp);
197+
SEXP res = PROTECT(Rf_allocVector(RAWSXP, sz));
198+
unsigned char *rp = RAW(res);
199+
memcpy(rp, nng_msg_body(msgp), sz);
200+
nng_msg_free(msgp);
201+
nng_aio_free(aiop);
202+
R_ClearExternalPtr(aio);
203+
204+
UNPROTECT(1);
205+
return res;
185206

186-
return Rf_ScalarInteger(res);
187207
}
188208

189-
SEXP rnng_aio_get_msg(SEXP aio) {
209+
SEXP rnng_aio_wait_get_msg(SEXP aio) {
190210

191211
if (R_ExternalPtrTag(aio) != nano_AioSymbol)
192212
error_return("'aio' is not a valid Aio");
193213
if (R_ExternalPtrAddr(aio) == NULL)
194214
error_return("'aio' is not an active Aio");
195-
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
196215

216+
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
197217
nng_aio_wait(aiop);
198218
int xc = nng_aio_result(aiop);
199219
if (xc) {
@@ -221,6 +241,30 @@ SEXP rnng_aio_result(SEXP aio) {
221241
error_return("'aio' is not a valid Aio");
222242
if (R_ExternalPtrAddr(aio) == NULL)
223243
error_return("'aio' is not an active Aio");
244+
245+
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
246+
int_mtx *mutex = (int_mtx *) R_ExternalPtrAddr(Rf_getAttrib(aio, nano_StateSymbol));
247+
nng_mtx_lock(mutex->mtx);
248+
int resolv = mutex->state;
249+
nng_mtx_unlock(mutex->mtx);
250+
if (!resolv)
251+
return R_MissingArg;
252+
253+
int xc = nng_aio_result(aiop);
254+
255+
nng_aio_free(aiop);
256+
R_ClearExternalPtr(aio);
257+
return Rf_ScalarInteger(xc);
258+
259+
}
260+
261+
SEXP rnng_aio_wait_result(SEXP aio) {
262+
263+
if (R_ExternalPtrTag(aio) != nano_AioSymbol)
264+
error_return("'aio' is not a valid Aio");
265+
if (R_ExternalPtrAddr(aio) == NULL)
266+
error_return("'aio' is not an active Aio");
267+
224268
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
225269
nng_aio_wait(aiop);
226270
int xc = nng_aio_result(aiop);
@@ -237,8 +281,10 @@ SEXP rnng_aio_stop(SEXP aio) {
237281
error_return("'aio' is not a valid Aio");
238282
if (R_ExternalPtrAddr(aio) == NULL)
239283
error_return("'aio' is not an active Aio");
284+
240285
nng_aio *aiop = (nng_aio *) R_ExternalPtrAddr(aio);
241286
nng_aio_stop(aiop);
287+
242288
nng_aio_free(aiop);
243289
R_ClearExternalPtr(aio);
244290
return R_NilValue;

src/init.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ static void RegisterSymbols(void) {
2626
}
2727

2828
static const R_CallMethodDef CallEntries[] = {
29-
{"rnng_aio_check", (DL_FUNC) &rnng_aio_check, 1},
3029
{"rnng_aio_get_msg", (DL_FUNC) &rnng_aio_get_msg, 1},
3130
{"rnng_aio_result", (DL_FUNC) &rnng_aio_result, 1},
3231
{"rnng_aio_stop", (DL_FUNC) &rnng_aio_stop, 1},
32+
{"rnng_aio_wait_get_msg", (DL_FUNC) &rnng_aio_wait_get_msg, 1},
33+
{"rnng_aio_wait_result", (DL_FUNC) &rnng_aio_wait_result, 1},
3334
{"rnng_close", (DL_FUNC) &rnng_close, 1},
3435
{"rnng_ctx_close", (DL_FUNC) &rnng_ctx_close, 1},
3536
{"rnng_ctx_open", (DL_FUNC) &rnng_ctx_open, 1},

src/nanonext.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@ extern SEXP nano_UrlSymbol;
1919
extern SEXP nano_AioSymbol;
2020

2121
/* define functions */
22-
extern SEXP rnng_aio_check(SEXP);
2322
extern SEXP rnng_aio_get_msg(SEXP);
2423
extern SEXP rnng_aio_result(SEXP);
2524
extern SEXP rnng_aio_stop(SEXP);
25+
extern SEXP rnng_aio_wait_get_msg(SEXP);
26+
extern SEXP rnng_aio_wait_result(SEXP);
2627
extern SEXP rnng_close(SEXP);
2728
extern SEXP rnng_ctx_close(SEXP);
2829
extern SEXP rnng_ctx_open(SEXP);

0 commit comments

Comments
 (0)