Skip to content

Commit 6327330

Browse files
committed
implement multi-threaded messenger
1 parent 0266c16 commit 6327330

File tree

18 files changed

+568
-366
lines changed

18 files changed

+568
-366
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: nanonext
22
Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
4-
Version: 0.3.0.9002
4+
Version: 0.3.0.9003
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library providing high-performance scalability protocols,
77
implementing a cross-platform standard for messaging and communications.

NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ export(is_error_value)
4949
export(is_nul_byte)
5050
export(listen)
5151
export(logging)
52+
export(messenger)
5253
export(nano)
5354
export(ncurl)
5455
export(nng_error)

NEWS.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
# nanonext 0.3.0.9002 (development)
1+
# nanonext 0.3.0.9003 (development)
22

33
#### New Features
44

55
* New `stream()` interface exposes low-level byte stream functionality in the NNG library, intended for communicating with non-NNG endpoints, including but not limited to websocket servers.
66
* `ncurl()` adds an 'async' option to perform HTTP requests asynchronously, returning immediately with a 'recvAio'. Also adds explicit arguments for HTTP method, content type and authorization headers and request data.
7+
* New `messenger()` function implements a console-based 2-way messaging system using NNG's scalability protocols [currently experimental and undergoing early-stage testing].
78

89
#### Updates
910

R/aio.R

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
# nanonext - Core Functions - Aio Functions ------------------------------------
1+
# nanonext - Core - Aio Functions ----------------------------------------------
2+
3+
# send_aio/recv_aio ------------------------------------------------------------
24

35
#' Send Async
46
#'
@@ -420,6 +422,8 @@ recv_aio.nanoStream <- function(con,
420422

421423
}
422424

425+
# Core aio functions -----------------------------------------------------------
426+
423427
#' Call the Value of an Asynchronous AIO Operation
424428
#'
425429
#' Retrieve the value of an asynchronous AIO operation, waiting for the AIO

R/context.R

Lines changed: 85 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# nanonext - Contexts ----------------------------------------------------------
1+
# nanonext - Contexts and RPC --------------------------------------------------
22

33
#' Open Context
44
#'
@@ -53,88 +53,6 @@ context <- function(socket) {
5353

5454
}
5555

56-
#' Send over Context
57-
#'
58-
#' Send data over a Context [Deprecated].
59-
#'
60-
#' @param context a Context.
61-
#' @inheritParams send
62-
#' @inheritParams send_aio
63-
#'
64-
#' @return Raw vector of sent data, or (invisibly) an integer exit code (zero on
65-
#' success) if 'echo' is set to FALSE.
66-
#'
67-
#' @details Will block if the send is in progress and has not yet completed -
68-
#' certain protocol / transport combinations may limit the number of messages
69-
#' that can be queued if they have yet to be received. Set a timeout to
70-
#' ensure the function returns under all scenarios.
71-
#'
72-
#' @keywords internal
73-
#' @export
74-
#'
75-
send_ctx <- function(context, data, mode = c("serial", "raw"), timeout, echo = TRUE) {
76-
77-
mode <- match.arg(mode)
78-
if (missing(timeout)) timeout <- -2L
79-
force(data)
80-
data <- encode(data = data, mode = mode)
81-
res <- .Call(rnng_ctx_send, context, data, timeout)
82-
is.integer(res) && {
83-
logerror(res)
84-
return(invisible(res))
85-
}
86-
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
87-
88-
}
89-
90-
#' Receive over Context
91-
#'
92-
#' Receive data over a Context [Deprecated].
93-
#'
94-
#' @param context a Context.
95-
#' @inheritParams recv
96-
#' @inheritParams send_aio
97-
#'
98-
#' @return Named list of 2 elements: 'raw' containing the received raw vector
99-
#' and 'data' containing the converted object, or else the converted object
100-
#' if 'keep.raw' is set to FALSE.
101-
#'
102-
#' @details Will block while awaiting the receive operation to complete.
103-
#' Set a timeout to ensure that the function returns under all scenarios.
104-
#'
105-
#' In case of an error, an integer 'errorValue' is returned (to be
106-
#' distiguishable from an integer message value). This can be verified using
107-
#' \code{\link{is_error_value}}.
108-
#'
109-
#' If the raw data was successfully received but an error occurred in
110-
#' unserialisation or data conversion (for example if the incorrect mode was
111-
#' specified), the received raw vector will always be returned to allow for
112-
#' the data to be recovered.
113-
#'
114-
#' @keywords internal
115-
#' @export
116-
#'
117-
recv_ctx <- function(context,
118-
mode = c("serial", "character", "complex", "double",
119-
"integer", "logical", "numeric", "raw"),
120-
timeout,
121-
keep.raw = TRUE) {
122-
123-
mode <- match.arg(mode)
124-
if (missing(timeout)) timeout <- -2L
125-
res <- .Call(rnng_ctx_recv, context, timeout)
126-
is.integer(res) && {
127-
logerror(res)
128-
return(invisible(`class<-`(res, "errorValue")))
129-
}
130-
on.exit(expr = return(res))
131-
data <- decode(con = res, mode = mode)
132-
on.exit()
133-
missing(data) && return(.Call(rnng_scm))
134-
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
135-
136-
}
137-
13856
#' Reply over Context (RPC Server for Req/Rep Protocol)
13957
#'
14058
#' Implements an executor/server for the rep node of the req/rep protocol. Awaits
@@ -349,3 +267,87 @@ request <- function(context,
349267

350268
}
351269

270+
# Deprecated - do not use ------------------------------------------------------
271+
272+
#' Send over Context
273+
#'
274+
#' Send data over a Context [Deprecated].
275+
#'
276+
#' @param context a Context.
277+
#' @inheritParams send
278+
#' @inheritParams send_aio
279+
#'
280+
#' @return Raw vector of sent data, or (invisibly) an integer exit code (zero on
281+
#' success) if 'echo' is set to FALSE.
282+
#'
283+
#' @details Will block if the send is in progress and has not yet completed -
284+
#' certain protocol / transport combinations may limit the number of messages
285+
#' that can be queued if they have yet to be received. Set a timeout to
286+
#' ensure the function returns under all scenarios.
287+
#'
288+
#' @keywords internal
289+
#' @export
290+
#'
291+
send_ctx <- function(context, data, mode = c("serial", "raw"), timeout, echo = TRUE) {
292+
293+
mode <- match.arg(mode)
294+
if (missing(timeout)) timeout <- -2L
295+
force(data)
296+
data <- encode(data = data, mode = mode)
297+
res <- .Call(rnng_ctx_send, context, data, timeout)
298+
is.integer(res) && {
299+
logerror(res)
300+
return(invisible(res))
301+
}
302+
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
303+
304+
}
305+
306+
#' Receive over Context
307+
#'
308+
#' Receive data over a Context [Deprecated].
309+
#'
310+
#' @param context a Context.
311+
#' @inheritParams recv
312+
#' @inheritParams send_aio
313+
#'
314+
#' @return Named list of 2 elements: 'raw' containing the received raw vector
315+
#' and 'data' containing the converted object, or else the converted object
316+
#' if 'keep.raw' is set to FALSE.
317+
#'
318+
#' @details Will block while awaiting the receive operation to complete.
319+
#' Set a timeout to ensure that the function returns under all scenarios.
320+
#'
321+
#' In case of an error, an integer 'errorValue' is returned (to be
322+
#' distiguishable from an integer message value). This can be verified using
323+
#' \code{\link{is_error_value}}.
324+
#'
325+
#' If the raw data was successfully received but an error occurred in
326+
#' unserialisation or data conversion (for example if the incorrect mode was
327+
#' specified), the received raw vector will always be returned to allow for
328+
#' the data to be recovered.
329+
#'
330+
#' @keywords internal
331+
#' @export
332+
#'
333+
recv_ctx <- function(context,
334+
mode = c("serial", "character", "complex", "double",
335+
"integer", "logical", "numeric", "raw"),
336+
timeout,
337+
keep.raw = TRUE) {
338+
339+
mode <- match.arg(mode)
340+
if (missing(timeout)) timeout <- -2L
341+
res <- .Call(rnng_ctx_recv, context, timeout)
342+
is.integer(res) && {
343+
logerror(res)
344+
return(invisible(`class<-`(res, "errorValue")))
345+
}
346+
on.exit(expr = return(res))
347+
data <- decode(con = res, mode = mode)
348+
on.exit()
349+
missing(data) && return(.Call(rnng_scm))
350+
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
351+
352+
}
353+

R/messenger.R

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# nanonext - Messenger ---------------------------------------------------------
2+
3+
#' Messenger
4+
#'
5+
#' Console-based 2-way messaging system based on NNG scalability protocols.
6+
#'
7+
#' @param dial (optional) a URL to dial, specifying the transport and address as
8+
#' a character string e.g. 'tcp://127.0.0.1:5555' (see \link{transports}).
9+
#' @param listen (optional) a URL to listen at, specifying the transport and
10+
#' address as a character string e.g. 'tcp://127.0.0.1:5555' (see \link{transports}).
11+
#'
12+
#' @return Invisible NULL.
13+
#'
14+
#' @section Usage:
15+
#'
16+
#' Type outgoing messages and hit return to send.
17+
#' Incoming messages are prefixed by \code{>}.
18+
#'
19+
#' \code{:q} is the command to quit.
20+
#'
21+
#' NOTE: This is an experimental feature that is currently undergoing
22+
#' early-stage testing.
23+
#'
24+
#' @export
25+
#'
26+
messenger <- function(dial = NULL, listen = NULL) {
27+
28+
if (!missing(dial) && is.character(dial)) {
29+
sock <- .Call(rnng_messenger, dial, 1L)
30+
} else if (!missing(listen) && is.character(listen)) {
31+
sock <- .Call(rnng_messenger, listen, 0L)
32+
} else {
33+
stop("missing or invalid input")
34+
}
35+
is.integer(sock) && {
36+
logerror(sock)
37+
return(invisible(sock))
38+
}
39+
on.exit(expr = {
40+
close(sock)
41+
invisible()
42+
})
43+
. <- unlist(strsplit("nanonext messenger", ""))
44+
.. <- .[length(.):1]
45+
for (i in seq_along(..)) {
46+
cat("\r", `length<-`(.., i), sep = " ", file = stdout())
47+
if (i %in% c(1:5, 15:20)) Sys.sleep(0.03) else Sys.sleep(0.01)
48+
}
49+
for (i in seq_along(.)) {
50+
cat("\r", `length<-`(., i), sep = " ", file = stdout())
51+
if (i %in% c(1:5, 15:20)) Sys.sleep(0.01) else Sys.sleep(0.03)
52+
}
53+
cat(" | type your message:\n", file = stdout())
54+
repeat {
55+
data <- readline()
56+
if (identical(data, ":q")) break
57+
data <- writeBin(object = data, con = raw())
58+
s <- .Call(rnng_send, sock, data, FALSE)
59+
if (is.integer(s)) message(sprintf("%s [ no connection ] message not sent", format.POSIXct(Sys.time())))
60+
}
61+
on.exit()
62+
close(sock)
63+
invisible()
64+
65+
}
66+

0 commit comments

Comments
 (0)