Skip to content

Commit f0502ae

Browse files
committed
more consistency to function names, expand readme, push experimental timer
1 parent d0ce795 commit f0502ae

33 files changed

+788
-276
lines changed

DESCRIPTION

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ Type: Package
33
Title: NNG (Nanomsg Next Gen) Lightweight Messaging Library
44
Version: 0.1.0.9000
55
Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
6-
a socket library providing high-performance scalability protocols, or common
7-
communication patterns, the basic building blocks for distributed systems.
6+
a socket library providing high-performance scalability protocols,
7+
implementing a cross-platform standard for messaging and communications.
8+
Serves as a concurrency framework that can be used for building distributed
9+
systems.
810
Authors@R:
911
c(person(given = "Charlie",
1012
family = "Gao",

NAMESPACE

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,8 @@ S3method(setopt,nanoListener)
2121
S3method(setopt,nanoSocket)
2222
S3method(start,nanoDialer)
2323
S3method(start,nanoListener)
24-
export(aio_call)
25-
export(aio_stop)
24+
export(call_aio)
2625
export(context)
27-
export(ctx_recv)
28-
export(ctx_rep)
29-
export(ctx_req)
30-
export(ctx_send)
3126
export(dial)
3227
export(listen)
3328
export(nano)
@@ -37,12 +32,17 @@ export(nng_timer)
3732
export(nng_version)
3833
export(recv)
3934
export(recv_aio)
35+
export(recv_ctx)
4036
export(recv_vec)
37+
export(reply)
38+
export(request)
4139
export(send)
4240
export(send_aio)
41+
export(send_ctx)
4342
export(send_vec)
4443
export(setopt)
4544
export(socket)
45+
export(stop_aio)
4646
export(subscribe)
4747
export(unsubscribe)
4848
importFrom(stats,start)

NEWS.md

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,26 @@
22

33
#### New Features
44

5-
* Implements full async I/O capabilities - `send_aio()` and `recv_aio()` now return Aio objects, for which the results may be called using `aio_call()`.
6-
* New `ctx_rep()` and `ctx_req()` functions implement the full logic of an RPC server/client. Designed to be run in separate processes, the server will await data and apply an arbitrary function before returning a result, whilst the client will send data to the server and await a response.
5+
* Implements full async I/O capabilities
6+
+ `send_aio()` and `recv_aio()` now return Aio objects, for which the results may be called using `call_aio()`.
7+
* New `request()` and `reply()` functions implement the full logic of an RPC client/server.
8+
+ Designed to be run in separate processes, the reply server will await data and apply a function before returning a result.
9+
+ The request client performs an async request to the server and returns immediately with an Aio.
10+
+ This allows processes to run concurrently on the client and server.
711
* New `ncurl()` minimalistic http(s) client.
8-
* Allows setting the environment variable 'NANONEXT_TLS' prior to package installation to enable TLS where the system NNG library has been built with TLS support (using Mbed TLS).
12+
* Allows setting the environment variable 'NANONEXT_TLS' prior to package installation
13+
+ Enables TLS where the system NNG library has been built with TLS support (using Mbed TLS).
914
* New `nng_timer()` utility as a demonstration of NNG's multithreading capabilities.
1015

1116
#### Updates
1217

13-
* Successful starts of dialers/listeners and successful close operations no longer print a message to stdout for less verbosity by default. The state of respective objects can always be queried by their state attribute using `$state`
14-
* All send and receive functions, e.g. `send()`/`recv()`, gain a revised 'mode' argument. This now permits the choice of whether to use R serialization, consolidating the functionality of the '_vec' series of functions.
18+
* Dialer/listener starts and close operations no longer print a message to stderr when successful for less verbosity by default.
19+
+ The state of respective objects can always be queried using `$state`
20+
* All send and receive functions, e.g. `send()`/`recv()`, gain a revised 'mode' argument.
21+
+ This now permits R serialization as an option, consolidating the functionality of the '_vec' series of functions.
1522
* Functions 'send_vec' and 'recv_vec' are deprecated and will be removed in a future release.
23+
* Functions 'ctx_send' and 'ctx_recv' have been renamed `send_ctx()` and `recv_ctx()` for consistency.
24+
* The `$socket_close()` method of nano objects has been renamed `$close()` to better align with the functional API.
1625

1726
# nanonext 0.1.0
1827

R/aio.R

Lines changed: 48 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
#' Call the Result of an Asynchronous AIO Operation
44
#'
55
#' Retrieve the result of an asynchronous AIO operation. Will wait for the AIO
6-
#' operation (blocking) if not yet complete. Once the result is retrieved,
7-
#' the Aio is deallocated and further actions cannot be performed on it.
6+
#' operation to complete (blocking) if this is still in progress.
87
#'
98
#' @param aio An Aio (object of class 'sendAio' or 'recvAio').
109
#'
11-
#' @return The original Aio object (invisibly).
10+
#' @return The passed Aio object (invisibly).
1211
#'
13-
#' @details For a 'sendAio', the send result will be attached to the Aio in
14-
#' \code{$result}. This will be zero on success.
12+
#' @details To access the values directly, use for example on a sendAio 'x':
13+
#' \code{call_aio(x)$result}.
14+
#'
15+
#' For a 'sendAio', the send result will be attached to the Aio in \code{$result}.
16+
#' This will be zero on success.
1517
#'
1618
#' For a 'recvAio', the received raw vector will be attached in \code{$raw}
1719
#' (unless 'keep.raw' was set to FALSE when receiving), and the converted R
@@ -21,62 +23,71 @@
2123
#' the received raw vector will always be saved in \code{$raw} to allow the
2224
#' data to be recovered.
2325
#'
26+
#' Once the result is retrieved, the Aio is deallocated and only the result
27+
#' is stored in the Aio object.
28+
#'
2429
#' @examples
2530
#' s1 <- socket("pair", listen = "inproc://nanonext")
2631
#' s2 <- socket("pair", dial = "inproc://nanonext")
2732
#'
2833
#' res <- send_aio(s1, data.frame(a = 1, b = 2), timeout = 100)
2934
#' res
30-
#' aio_call(res)
35+
#' call_aio(res)
3136
#' res
3237
#' res$result
3338
#'
3439
#' res <- recv_aio(s2, timeout = 100)
3540
#' res
36-
#' aio_call(res)$data
41+
#' call_aio(res)$data
3742
#' res
3843
#'
3944
#' close(s1)
4045
#' close(s2)
4146
#'
4247
#' @export
4348
#'
44-
aio_call <- function(aio) {
49+
call_aio <- function(aio) {
4550

46-
if (inherits(aio, "recvAio")) {
51+
if (length(.subset2(aio, "aio"))) {
4752

48-
mode <- aio[["callparams"]][[1L]]
49-
keep.raw <- aio[["callparams"]][[2L]]
50-
res <- .Call(rnng_aio_get_msg, aio[["aio"]])
51-
if (keep.raw) aio[["raw"]] <- res
52-
is.integer(res) && {
53-
message(res, " : ", nng_error(res))
54-
return(invisible(aio))
55-
}
56-
on.exit(expr = {
57-
aio[["raw"]] <- res
58-
return(invisible(aio))
59-
})
60-
data <- switch(mode,
61-
serial = unserialize(connection = res),
62-
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
63-
raw = res,
64-
readBin(con = res, what = mode, n = length(res)))
65-
aio[["data"]] <- data
66-
on.exit(expr = NULL)
67-
invisible(aio)
53+
if (inherits(aio, "recvAio")) {
6854

69-
} else if (inherits(aio, "sendAio")) {
55+
mode <- .subset2(aio, "callparams")[[1L]]
56+
keep.raw <- .subset2(aio, "callparams")[[2L]]
57+
res <- .Call(rnng_aio_get_msg, .subset2(aio, "aio"))
58+
if (keep.raw) aio[["raw"]] <- res
59+
is.integer(res) && {
60+
message(res, " : ", nng_error(res))
61+
return(invisible(aio))
62+
}
63+
on.exit(expr = {
64+
aio[["raw"]] <- res
65+
return(invisible(aio))
66+
})
67+
data <- switch(mode,
68+
serial = unserialize(connection = res),
69+
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
70+
raw = res,
71+
readBin(con = res, what = mode, n = length(res)))
72+
aio[["data"]] <- data
73+
on.exit(expr = NULL)
74+
rm("aio", envir = aio)
75+
rm("callparams", envir = aio)
76+
invisible(aio)
7077

71-
res <- .Call(rnng_aio_result, aio[["aio"]])
72-
aio[["result"]] <- res
73-
if (res) {
74-
message(res, " : ", nng_error(res))
78+
} else if (inherits(aio, "sendAio")) {
79+
res <- .Call(rnng_aio_result, .subset2(aio, "aio"))
80+
aio[["result"]] <- res
81+
rm("aio", envir = aio)
82+
if (res) {
83+
message(res, " : ", nng_error(res))
84+
}
7585
}
86+
7687
invisible(aio)
7788

7889
} else {
79-
stop("this function may only be used on an Aio")
90+
invisible(aio)
8091
}
8192

8293
}
@@ -96,9 +107,9 @@ aio_call <- function(aio) {
96107
#'
97108
#' @export
98109
#'
99-
aio_stop <- function(aio) {
110+
stop_aio <- function(aio) {
100111

101-
invisible(.Call(rnng_aio_stop, aio[["aio"]]))
112+
invisible(.Call(rnng_aio_stop, .subset2(aio, "aio")))
102113

103114
}
104115

0 commit comments

Comments
 (0)