Skip to content

Commit fba707c

Browse files
committed
re-restrict request/reply to contexts
1 parent 1e1f352 commit fba707c

File tree

7 files changed

+29
-128
lines changed

7 files changed

+29
-128
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.13.3.9003
4+
Version: 0.13.3.9004
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library implementing 'Scalability Protocols', a reliable,
77
high-performance standard for common communications patterns including

NEWS.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
# nanonext 0.13.3.9003 (development)
1+
# nanonext 0.13.3.9004 (development)
22

3-
* `request()` and `reply()` now accept sockets in place of contexts, with the argument 'context' amended to the more general 'con'.
43
* An integer file descriptor is appended to 'nanoSockets' as the attribute 'fd' - see updated documentation for `socket()`.
54

65
# nanonext 0.13.3

R/context.R

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,13 @@ context <- function(socket) .Call(rnng_ctx_open, socket)
9191
#'
9292
close.nanoContext <- function(con, ...) invisible(.Call(rnng_ctx_close, con))
9393

94-
#' Reply (RPC Server for Req/Rep Protocol)
94+
#' Reply over Context (RPC Server for Req/Rep Protocol)
9595
#'
9696
#' Implements an executor/server for the rep node of the req/rep protocol.
9797
#' Awaits data, applies an arbitrary specified function, and returns the
9898
#' result to the caller/client.
9999
#'
100-
#' @param con a Socket or Context.
100+
#' @param context a Context.
101101
#' @param execute a function which takes the received (converted) data as its
102102
#' first argument. Can be an anonymous function of the form
103103
#' \code{function(x) do(x)}. Additional arguments can also be passed in
@@ -154,7 +154,7 @@ close.nanoContext <- function(con, ...) invisible(.Call(rnng_ctx_close, con))
154154
#'
155155
#' @export
156156
#'
157-
reply <- function(con,
157+
reply <- function(context,
158158
execute,
159159
recv_mode = c("serial", "character", "complex", "double",
160160
"integer", "logical", "numeric", "raw"),
@@ -163,16 +163,16 @@ reply <- function(con,
163163
...) {
164164

165165
block <- if (is.null(timeout)) TRUE else timeout
166-
res <- recv(con, mode = recv_mode, block = block)
166+
res <- recv(context, mode = recv_mode, block = block)
167167
is_error_value(res) && return(res)
168-
on.exit(expr = send(con, data = as.raw(0L), mode = send_mode, block = TRUE))
168+
on.exit(expr = send(context, data = as.raw(0L), mode = send_mode, block = TRUE))
169169
data <- execute(res, ...)
170170
on.exit()
171-
send(con, data = data, mode = send_mode, block = block)
171+
send(context, data = data, mode = send_mode, block = block)
172172

173173
}
174174

175-
#' Request (RPC Client for Req/Rep Protocol)
175+
#' Request over Context (RPC Client for Req/Rep Protocol)
176176
#'
177177
#' Implements a caller/client for the req node of the req/rep protocol. Sends
178178
#' data to the rep node (executor/server) and returns an Aio, which can be
@@ -222,13 +222,13 @@ reply <- function(con,
222222
#'
223223
#' @export
224224
#'
225-
request <- function(con,
225+
request <- function(context,
226226
data,
227227
send_mode = c("serial", "raw", "next"),
228228
recv_mode = c("serial", "character", "complex", "double",
229229
"integer", "logical", "numeric", "raw", "string"),
230230
timeout = NULL)
231-
data <- .Call(rnng_request, con, data, send_mode, recv_mode, timeout, environment())
231+
data <- .Call(rnng_request, context, data, send_mode, recv_mode, timeout, environment())
232232

233233
#' Request and Signal a Condition Variable (RPC Client for Req/Rep Protocol)
234234
#'
@@ -261,11 +261,11 @@ request <- function(con,
261261
#' @rdname request
262262
#' @export
263263
#'
264-
request_signal <- function(con,
264+
request_signal <- function(context,
265265
data,
266266
cv,
267267
send_mode = c("serial", "raw", "next"),
268268
recv_mode = c("serial", "character", "complex", "double",
269269
"integer", "logical", "numeric", "raw", "string"),
270270
timeout = NULL)
271-
data <- .Call(rnng_request_signal, con, data, cv, send_mode, recv_mode, timeout, environment())
271+
data <- .Call(rnng_request_signal, context, data, cv, send_mode, recv_mode, timeout, environment())

man/reply.Rd

Lines changed: 3 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/request.Rd

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/aio.c

Lines changed: 8 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -322,20 +322,6 @@ static void request_finalizer(SEXP xptr) {
322322

323323
}
324324

325-
static void request_sock_finalizer(SEXP xptr) {
326-
327-
if (R_ExternalPtrAddr(xptr) == NULL) return;
328-
nano_aio *xp = (nano_aio *) R_ExternalPtrAddr(xptr);
329-
nano_aio *saio = (nano_aio *) xp->next;
330-
nng_aio_free(saio->aio);
331-
nng_aio_free(xp->aio);
332-
if (xp->data != NULL)
333-
nng_msg_free((nng_msg *) xp->data);
334-
R_Free(saio);
335-
R_Free(xp);
336-
337-
}
338-
339325
static void cv_finalizer(SEXP xptr) {
340326

341327
if (R_ExternalPtrAddr(xptr) == NULL) return;
@@ -1225,84 +1211,9 @@ SEXP rnng_ncurl_session_close(SEXP session) {
12251211

12261212
// request ---------------------------------------------------------------------
12271213

1228-
SEXP rnng_request_sock(const SEXP con, const SEXP data, const SEXP sendmode,
1214+
SEXP rnng_request_impl(const SEXP con, const SEXP data, const SEXP sendmode,
12291215
const SEXP recvmode, const SEXP timeout, const SEXP clo, nano_cv *ncv) {
12301216

1231-
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
1232-
const int mod = nano_matcharg(recvmode);
1233-
const int signal = ncv != NULL;
1234-
nng_socket *sock = (nng_socket *) R_ExternalPtrAddr(con);
1235-
SEXP aio, env, fun;
1236-
nano_buf buf;
1237-
nano_aio *saio, *raio;
1238-
nng_msg *msg;
1239-
int xc;
1240-
1241-
switch (nano_encodes(sendmode)) {
1242-
case 1:
1243-
nano_serialize(&buf, data); break;
1244-
case 2:
1245-
nano_encode(&buf, data); break;
1246-
default:
1247-
nano_serialize_next(&buf, data); break;
1248-
}
1249-
1250-
saio = R_Calloc(1, nano_aio);
1251-
saio->next = ncv;
1252-
1253-
if ((xc = nng_msg_alloc(&msg, 0)))
1254-
goto exitlevel1;
1255-
1256-
if ((xc = nng_msg_append(msg, buf.buf, buf.cur)) ||
1257-
(xc = nng_aio_alloc(&saio->aio, sendaio_complete, &saio->aio))) {
1258-
nng_msg_free(msg);
1259-
goto exitlevel1;
1260-
}
1261-
1262-
nng_aio_set_msg(saio->aio, msg);
1263-
nng_send_aio(*sock, saio->aio);
1264-
1265-
raio = R_Calloc(1, nano_aio);
1266-
raio->type = RECVAIO;
1267-
raio->mode = mod;
1268-
raio->next = saio;
1269-
1270-
if ((xc = nng_aio_alloc(&raio->aio, signal ? request_complete_signal : raio_complete, raio)))
1271-
goto exitlevel2;
1272-
1273-
nng_aio_set_timeout(raio->aio, dur);
1274-
nng_recv_aio(*sock, raio->aio);
1275-
NANO_FREE(buf);
1276-
1277-
PROTECT(aio = R_MakeExternalPtr(raio, nano_AioSymbol, R_NilValue));
1278-
R_RegisterCFinalizerEx(aio, request_sock_finalizer, TRUE);
1279-
1280-
PROTECT(env = Rf_allocSExp(ENVSXP));
1281-
NANO_CLASS(env, "recvAio");
1282-
Rf_defineVar(nano_AioSymbol, aio, env);
1283-
1284-
PROTECT(fun = Rf_allocSExp(CLOSXP));
1285-
SET_FORMALS(fun, nano_aioFormals);
1286-
SET_BODY(fun, signal ? CADDDR(nano_aioFuncs) : CADR(nano_aioFuncs));
1287-
SET_CLOENV(fun, clo);
1288-
R_MakeActiveBinding(nano_DataSymbol, fun, env);
1289-
1290-
UNPROTECT(3);
1291-
return env;
1292-
1293-
exitlevel2:
1294-
R_Free(raio);
1295-
nng_aio_free(saio->aio);
1296-
exitlevel1:
1297-
R_Free(saio);
1298-
NANO_FREE(buf);
1299-
return mk_error_data(xc);
1300-
1301-
}
1302-
1303-
SEXP rnng_request_ctx(const SEXP con, const SEXP data, const SEXP sendmode,
1304-
const SEXP recvmode, const SEXP timeout, const SEXP clo, nano_cv *ncv) {
1305-
13061217
const nng_duration dur = timeout == R_NilValue ? NNG_DURATION_DEFAULT : (nng_duration) Rf_asInteger(timeout);
13071218
const int mod = nano_matcharg(recvmode);
13081219
const int signal = ncv != NULL;
@@ -1380,31 +1291,22 @@ SEXP rnng_request_ctx(const SEXP con, const SEXP data, const SEXP sendmode,
13801291

13811292
SEXP rnng_request(SEXP con, SEXP data, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo) {
13821293

1383-
const SEXP ptrtag = R_ExternalPtrTag(con);
1384-
if (ptrtag == nano_ContextSymbol) {
1385-
return rnng_request_ctx(con, data, sendmode, recvmode, timeout, clo, NULL);
1386-
} else if (ptrtag == nano_SocketSymbol) {
1387-
return rnng_request_sock(con, data, sendmode, recvmode, timeout, clo, NULL);
1388-
} else {
1389-
error_return("'con' is not a valid Socket or Context");
1390-
}
1294+
if (R_ExternalPtrTag(con) != nano_ContextSymbol)
1295+
Rf_error("'con' is not a valid Context");
1296+
1297+
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, NULL);
13911298

13921299
}
13931300

13941301
SEXP rnng_request_signal(SEXP con, SEXP data, SEXP cvar, SEXP sendmode, SEXP recvmode, SEXP timeout, SEXP clo) {
13951302

1303+
if (R_ExternalPtrTag(con) != nano_ContextSymbol)
1304+
Rf_error("'con' is not a valid Context");
13961305
if (R_ExternalPtrTag(cvar) != nano_CvSymbol)
13971306
Rf_error("'cv' is not a valid Condition Variable");
13981307
nano_cv *ncv = (nano_cv *) R_ExternalPtrAddr(cvar);
13991308

1400-
const SEXP ptrtag = R_ExternalPtrTag(con);
1401-
if (ptrtag == nano_ContextSymbol) {
1402-
return rnng_request_ctx(con, data, sendmode, recvmode, timeout, clo, ncv);
1403-
} else if (ptrtag == nano_SocketSymbol) {
1404-
return rnng_request_sock(con, data, sendmode, recvmode, timeout, clo, ncv);
1405-
} else {
1406-
error_return("'con' is not a valid Socket or Context");
1407-
}
1309+
return rnng_request_impl(con, data, sendmode, recvmode, timeout, clo, ncv);
14081310

14091311
}
14101312

tests/tests.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ nanotest(!.unresolved(fakesock))
486486
fakectx <- `class<-`("test", "nanoContext")
487487
nanotest(!unresolved(fakectx))
488488
nanotest(!.unresolved(fakectx))
489-
nanotesterr(request(fakectx, data = "test"), "valid Socket or Context")
489+
nanotesterr(request(fakectx, data = "test"), "valid Context")
490490
nanotesterr(subscribe(fakectx, NULL), "valid")
491491
nanotesterr(close(fakectx), "valid Context")
492492
nanotest(reap(fakectx) == 3L)

0 commit comments

Comments
 (0)