Skip to content

Commit 33b2534

Browse files
committed
consistent messenger interface
1 parent 9879465 commit 33b2534

File tree

9 files changed

+56
-62
lines changed

9 files changed

+56
-62
lines changed

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* Unified `send()` and `recv()` functions, and their asynchronous counterparts `send_aio()` and `recv_aio()`, are now S3 generics and can be used across Sockets, Contexts and Streams.
1212
* Revised 'block' argument for `send()` and `recv()` now allows an integer value for setting a timeout.
1313
* `send_ctx()` and `recv_ctx()` are deprecated and will be removed in a future package version - the methods for `send()` and `recv()` should be used instead.
14-
* Logging is deprecated and is being phased out. Logging can still be set via 'NANONEXT_LOG' prior to package load but the ability to set via the `logging()` function has been removed. Logging will be removed entirely in a future package version.
14+
* Logging is deprecated and in the process of being phased out. Logging can still be set via 'NANONEXT_LOG' prior to package load but `logging()` can no longer be used. Logging will be removed entirely in a future package version.
1515

1616
# nanonext 0.3.0
1717

R/messenger.R

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,8 @@
44
#'
55
#' Console-based 2-way messaging system based on NNG scalability protocols.
66
#'
7-
#' @param dial (optional) a URL to dial, specifying the transport and address as
7+
#' @param url a URL to connect to, specifying the transport and address as
88
#' a character string e.g. 'tcp://127.0.0.1:5555' (see \link{transports}).
9-
#' @param listen (optional) a URL to listen at, specifying the transport and
10-
#' address as a character string e.g. 'tcp://127.0.0.1:5555' (see \link{transports}).
119
#'
1210
#' @return Invisible NULL.
1311
#'
@@ -23,21 +21,16 @@
2321
#'
2422
#' @export
2523
#'
26-
messenger <- function(dial = NULL, listen = NULL) {
24+
messenger <- function(url) {
2725

28-
if (!missing(dial) && is.character(dial)) {
29-
sock <- .Call(rnng_messenger, dial, 1L)
30-
} else if (!missing(listen) && is.character(listen)) {
31-
sock <- .Call(rnng_messenger, listen, 0L)
32-
} else {
33-
stop("missing or invalid input")
34-
}
26+
is.character(url) || stop("the url must be supplied as a character string")
27+
sock <- .Call(rnng_messenger, url)
3528
is.integer(sock) && {
3629
logerror(sock)
3730
return(invisible(sock))
3831
}
3932
on.exit(expr = {
40-
s <- .Call(rnng_send, sock, as.raw(0L), 0L)
33+
s <- .Call(rnng_send, sock, writeBin(":d ", raw()), 0L)
4134
close(sock)
4235
invisible()
4336
})
@@ -47,23 +40,28 @@ messenger <- function(dial = NULL, listen = NULL) {
4740
cat("\r", `length<-`(intro, i), sep = " ", file = stdout())
4841
Sys.sleep(0.02)
4942
}
50-
cat("\n", file = stdout())
51-
s <- .Call(rnng_send, sock, as.raw(0L), 0L)
43+
cat(sprintf("\n| url: %s\n", url), file = stdout())
44+
s <- .Call(rnng_send, sock, writeBin(":c ", raw()), 0L)
5245
if (is.integer(s)) {
53-
cat(sprintf("| peer offline: waiting for connection: %s\n", format.POSIXct(Sys.time())),
46+
cat(sprintf("| peer offline: %s\n", format.POSIXct(Sys.time())),
5447
file = stderr())
5548
} else {
56-
cat(sprintf("| peer online: connected: %s\n", format.POSIXct(Sys.time())),
49+
cat(sprintf("| peer online: %s\n", format.POSIXct(Sys.time())),
5750
file = stderr())
5851
}
5952
cat("type your message:\n", file = stdout())
6053
repeat {
6154
data <- readline()
6255
if (identical(data, ":q")) break
6356
if (identical(data, "")) next
64-
data <- writeBin(object = data, con = raw())
65-
s <- .Call(rnng_send, sock, data, 0L)
66-
if (is.integer(s)) message(sprintf("| peer offline: message not sent > %s", format.POSIXct(Sys.time())))
57+
rdata <- writeBin(object = data, con = raw())
58+
s <- .Call(rnng_send, sock, rdata, 0L)
59+
if (is.integer(s)) {
60+
cat(sprintf("| peer offline: message not sent > %s\n", format.POSIXct(Sys.time())),
61+
file = stderr())
62+
} else {
63+
cat(sprintf(" > %s\n", format.POSIXct(Sys.time())), file = stdout())
64+
}
6765
}
6866

6967
}

R/utils.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ is_error_value <- function(x) inherits(x, "errorValue")
8888
#' package load. If the variable is set incorrectly, the default level
8989
#' of 'error' is used instead.
9090
#'
91+
#' @keywords internal
9192
#' @export
9293
#'
9394
logging <- function(level = c("keep", "check", "error", "info")) {

man/logging.Rd

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

man/messenger.Rd

Lines changed: 2 additions & 5 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/init.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ static const R_CallMethodDef CallEntries[] = {
7272
{"rnng_listener_set_string", (DL_FUNC) &rnng_listener_set_string, 3},
7373
{"rnng_listener_set_uint64", (DL_FUNC) &rnng_listener_set_uint64, 3},
7474
{"rnng_listener_start", (DL_FUNC) &rnng_listener_start, 1},
75-
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 2},
75+
{"rnng_messenger", (DL_FUNC) &rnng_messenger, 1},
7676
{"rnng_ncurl", (DL_FUNC) &rnng_ncurl, 5},
7777
{"rnng_ncurl_aio", (DL_FUNC) &rnng_ncurl_aio, 5},
7878
{"rnng_protocol_open", (DL_FUNC) &rnng_protocol_open, 1},

src/nanonext.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ extern SEXP rnng_listener_set_size(SEXP, SEXP, SEXP);
6363
extern SEXP rnng_listener_set_string(SEXP, SEXP, SEXP);
6464
extern SEXP rnng_listener_set_uint64(SEXP, SEXP, SEXP);
6565
extern SEXP rnng_listener_start(SEXP);
66-
extern SEXP rnng_messenger(SEXP, SEXP);
66+
extern SEXP rnng_messenger(SEXP);
6767
extern SEXP rnng_ncurl(SEXP, SEXP, SEXP, SEXP, SEXP);
6868
extern SEXP rnng_ncurl_aio(SEXP, SEXP, SEXP, SEXP, SEXP);
6969
extern SEXP rnng_protocol_open(SEXP);

src/sockets.c

Lines changed: 32 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ static void thread_finalizer(SEXP xptr) {
9898

9999
static void rnng_thread(void *arg) {
100100

101-
unsigned char *buf = NULL;
101+
char *buf = NULL;
102102
size_t sz;
103103
int xc;
104104
time_t now;
@@ -110,19 +110,24 @@ static void rnng_thread(void *arg) {
110110
if (xc) {
111111
time(&now);
112112
tms = localtime(&now);
113-
REprintf("| messenger session: ended: %d-%02d-%02d %02d:%02d:%02d\n",
113+
REprintf("| messenger session ended: %d-%02d-%02d %02d:%02d:%02d\n",
114114
tms->tm_year + 1900, tms->tm_mon + 1, tms->tm_mday,
115115
tms->tm_hour, tms->tm_min, tms->tm_sec);
116116
break;
117117
}
118-
if (!strcmp((const char *) buf, ":q")) {
118+
if (!strcmp(buf, ":c ")) {
119+
time(&now);
120+
tms = localtime(&now);
121+
REprintf("| peer connected: %d-%02d-%02d %02d:%02d:%02d\n",
122+
tms->tm_year + 1900, tms->tm_mon + 1, tms->tm_mday,
123+
tms->tm_hour, tms->tm_min, tms->tm_sec);
119124
nng_free(buf, sz);
120-
break;
125+
continue;
121126
}
122-
if (!strcmp((const char *) buf, "")) {
127+
if (!strcmp(buf, ":d ")) {
123128
time(&now);
124129
tms = localtime(&now);
125-
REprintf("| peer status: changed: %d-%02d-%02d %02d:%02d:%02d\n",
130+
REprintf("| peer disconnected: %d-%02d-%02d %02d:%02d:%02d\n",
126131
tms->tm_year + 1900, tms->tm_mon + 1, tms->tm_mday,
127132
tms->tm_hour, tms->tm_min, tms->tm_sec);
128133
nng_free(buf, sz);
@@ -131,7 +136,7 @@ static void rnng_thread(void *arg) {
131136

132137
time(&now);
133138
tms = localtime(&now);
134-
Rprintf("%s < %d-%02d-%02d %02d:%02d:%02d\n",
139+
Rprintf("%s\n < %d-%02d-%02d %02d:%02d:%02d\n",
135140
buf,
136141
tms->tm_year + 1900, tms->tm_mon + 1, tms->tm_mday,
137142
tms->tm_hour, tms->tm_min, tms->tm_sec);
@@ -140,51 +145,44 @@ static void rnng_thread(void *arg) {
140145

141146
}
142147

143-
SEXP rnng_messenger(SEXP url, SEXP mode) {
148+
SEXP rnng_messenger(SEXP url) {
144149

145150
const char *up = CHAR(STRING_ELT(url, 0));
146-
int dial = INTEGER(mode)[0];
147-
148151
SEXP con;
149152
SEXP socket;
150153
int xc;
154+
nng_listener *dlp;
151155

152156
nng_socket *sock = R_Calloc(1, nng_socket);
153157
xc = nng_pair0_open(sock);
154158
if (xc) {
155159
R_Free(sock);
156160
return Rf_ScalarInteger(xc);
157161
}
158-
159-
if (dial) {
160-
nng_dialer *dp = R_Calloc(1, nng_dialer);
161-
int xc = nng_dial(*sock, up, dp, 2u);
162+
dlp = R_Calloc(1, nng_listener);
163+
xc = nng_listen(*sock, up, dlp, 0);
164+
if (xc == 10) {
165+
R_Free(dlp);
166+
nng_dialer *dlp = R_Calloc(1, nng_dialer);
167+
xc = nng_dial(*sock, up, dlp, 2u);
162168
if (xc) {
163-
R_Free(dp);
169+
R_Free(dlp);
164170
R_Free(sock);
165171
return Rf_ScalarInteger(xc);
166172
}
167-
socket = PROTECT(R_MakeExternalPtr(sock, nano_SocketSymbol, R_NilValue));
168-
R_RegisterCFinalizerEx(socket, socket_finalizer, TRUE);
169-
con = PROTECT(R_MakeExternalPtr(dp, nano_DialerSymbol, R_NilValue));
170-
R_MakeWeakRef(socket, con, R_NilValue, TRUE);
171-
UNPROTECT(1);
172-
173-
} else {
174-
nng_listener *lp = R_Calloc(1, nng_listener);
175-
int xc = nng_listen(*sock, up, lp, 0);
176-
if (xc) {
177-
R_Free(lp);
178-
R_Free(sock);
179-
return Rf_ScalarInteger(xc);
180-
}
181-
socket = PROTECT(R_MakeExternalPtr(sock, nano_SocketSymbol, R_NilValue));
182-
R_RegisterCFinalizerEx(socket, socket_finalizer, TRUE);
183-
con = PROTECT(R_MakeExternalPtr(lp, nano_ListenerSymbol, R_NilValue));
184-
R_MakeWeakRef(socket, con, R_NilValue, TRUE);
185-
UNPROTECT(1);
173+
174+
} else if (xc) {
175+
R_Free(dlp);
176+
R_Free(sock);
177+
return Rf_ScalarInteger(xc);
186178
}
187179

180+
socket = PROTECT(R_MakeExternalPtr(sock, nano_SocketSymbol, R_NilValue));
181+
R_RegisterCFinalizerEx(socket, socket_finalizer, TRUE);
182+
con = PROTECT(R_MakeExternalPtr(dlp, nano_ListenerSymbol, R_NilValue));
183+
R_MakeWeakRef(socket, con, R_NilValue, TRUE);
184+
UNPROTECT(1);
185+
188186
nng_thread *thr;
189187
nng_thread_create(&thr, rnng_thread, sock);
190188
SEXP xptr = PROTECT(R_MakeExternalPtr(thr, R_NilValue, R_NilValue));

tests/tests.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ invisible(close(sock2) == 0L || stop())
9999

100100
invisible(is.integer(suppressMessages(ncurl("http://127.0.0.1:5555"))) || stop())
101101
invisible(is.integer(suppressMessages(stream(dial = "tcp://127.0.0.1:5555"))) || stop())
102-
logging(level = "keep")
103102

104103
invisible(is.character(ver <- nng_version()) && length(ver) == 2L || stop())
105104
invisible(is.character(nng_error(0L)) || stop())

0 commit comments

Comments
 (0)