Skip to content

Commit 9a68e63

Browse files
committed
compact codebase
1 parent 4a70216 commit 9a68e63

19 files changed

+220
-243
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ Description: R binding for NNG (Nanomsg Next Gen), a successor to ZeroMQ. NNG is
66
a socket library providing high-performance scalability protocols,
77
implementing a cross-platform standard for messaging and communications.
88
Serves as a concurrency framework for building distributed applications,
9-
utilising 'Aio' objects which return an unresolved value whilst its
9+
utilising 'Aio' objects which return an unresolved value whilst an
1010
asynchronous operation is ongoing, automatically resolving to a final value
1111
once complete.
1212
Authors@R:

NEWS.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
#### New Features
44

5-
* Aio values `$result`, `$data` and `$raw` now resolve without requiring `call_aio()`. Access the values directly and an 'unresolved' logical NA will be returned if the Aio operation is yet to complete.
5+
* Aio values `$result`, `$data` and `$raw` now resolve automatically without requiring `call_aio()`. Access the values directly and an 'unresolved' logical NA will be returned if the Aio operation is yet to complete.
66
* `unresolved()` added as an auxiliary function to query whether an Aio is unresolved, for use in control flow statements.
77
* Integer error values generated by receive functions are now classed 'errorValue'. `is_error_value()` helper function included.
88
* `is_nul_byte()` added as a helper function for request/reply setups.

R/aio.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ stop_aio <- function(aio) {
108108
#' aio <- send_aio(s1, "test", timeout = 100)
109109
#'
110110
#' while (unresolved(aio)) {
111-
#' # do stuff here before checking resolution again
111+
#' # do stuff before checking resolution again
112112
#' cat("unresolved")
113113
#' s2 <- socket("pair", dial = "inproc://nanonext")
114114
#' Sys.sleep(0.01)

R/context.R

Lines changed: 16 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,7 @@ context <- function(socket) {
4545

4646
if (is.environment(socket)) socket <- .subset2(socket, "socket")
4747
res <- .Call(rnng_ctx_open, socket)
48-
if (is.integer(res)) {
49-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
50-
}
48+
if (is.integer(res)) logerror(res)
5149
res
5250

5351
}
@@ -87,12 +85,10 @@ send_ctx <- function(context, data, mode = c("serial", "raw"), timeout, echo = T
8785
mode <- match.arg(mode)
8886
if (missing(timeout)) timeout <- -2L
8987
force(data)
90-
data <- switch(mode,
91-
serial = serialize(object = data, connection = NULL),
92-
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
88+
data <- encode(data = data, mode = mode)
9389
res <- .Call(rnng_ctx_send, context, data, timeout)
9490
is.integer(res) && {
95-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
91+
logerror(res)
9692
return(invisible(res))
9793
}
9894
if (missing(echo) || isTRUE(echo)) res else invisible(0L)
@@ -144,15 +140,11 @@ recv_ctx <- function(context,
144140
if (missing(timeout)) timeout <- -2L
145141
res <- .Call(rnng_ctx_recv, context, timeout)
146142
is.integer(res) && {
147-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
143+
logerror(res)
148144
return(invisible(`class<-`(res, "errorValue")))
149145
}
150146
on.exit(expr = return(res))
151-
data <- switch(mode,
152-
serial = unserialize(connection = res),
153-
character = (r <- readBin(con = res, what = mode, n = length(res)))[r != ""],
154-
raw = res,
155-
readBin(con = res, what = mode, n = length(res)))
147+
data <- decode(con = res, mode = mode)
156148
on.exit()
157149
missing(data) && return(.Call(rnng_scm))
158150
if (missing(keep.raw) || isTRUE(keep.raw)) list(raw = res, data = data) else data
@@ -230,23 +222,17 @@ reply <- function(context,
230222
if (missing(timeout)) timeout <- -2L
231223
res <- .Call(rnng_ctx_recv, context, timeout)
232224
is.integer(res) && {
233-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
225+
logerror(res)
234226
return(invisible(`class<-`(res, "errorValue")))
235227
}
236228
on.exit(expr = send_aio(context, as.raw(0L), mode = send_mode))
237-
data <- switch(recv_mode,
238-
serial = unserialize(connection = res),
239-
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
240-
raw = res,
241-
readBin(con = res, what = recv_mode, n = length(res)))
229+
data <- decode(con = res, mode = recv_mode)
242230
data <- execute(data, ...)
243-
data <- switch(send_mode,
244-
serial = serialize(object = data, connection = NULL),
245-
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
231+
data <- encode(data = data, mode = send_mode)
246232
on.exit()
247233
res <- .Call(rnng_ctx_send, context, data, timeout)
248234
is.integer(res) && {
249-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
235+
logerror(res)
250236
return(invisible(res))
251237
}
252238
invisible(0L)
@@ -312,18 +298,16 @@ request <- function(context,
312298
keep.raw <- missing(keep.raw) || isTRUE(keep.raw)
313299
if (missing(timeout)) timeout <- -2L
314300
force(data)
315-
data <- switch(send_mode,
316-
serial = serialize(object = data, connection = NULL),
317-
raw = if (is.raw(data)) data else writeBin(object = data, con = raw()))
301+
data <- encode(data = data, mode = send_mode)
318302
res <- .Call(rnng_send_aio, context, data, -2L)
319303
is.integer(res) && {
320-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
304+
logerror(res)
321305
return(invisible(res))
322306
}
323307

324308
aio <- .Call(rnng_recv_aio, context, timeout)
325309
is.integer(aio) && {
326-
message(Sys.time(), " [ ", aio, " ] ", nng_error(aio))
310+
logerror(aio)
327311
return(invisible(`class<-`(aio, "errorValue")))
328312
}
329313
env <- `class<-`(new.env(), "recvAio")
@@ -335,19 +319,15 @@ request <- function(context,
335319
missing(res) && return(.Call(rnng_aio_unresolv))
336320
is.integer(res) && {
337321
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
338-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
322+
logerror(res)
339323
return(invisible(resolv))
340324
}
341325
on.exit(expr = {
342326
raw <<- res
343327
resolv <<- 0L
344328
return(res)
345329
})
346-
data <- switch(recv_mode,
347-
serial = unserialize(connection = res),
348-
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
349-
raw = res,
350-
readBin(con = res, what = recv_mode, n = length(res)))
330+
data <- decode(con = res, mode = recv_mode)
351331
on.exit()
352332
raw <<- res
353333
data <<- data
@@ -362,19 +342,15 @@ request <- function(context,
362342
missing(res) && return(.Call(rnng_aio_unresolv))
363343
is.integer(res) && {
364344
data <<- raw <<- resolv <<- `class<-`(res, "errorValue")
365-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
345+
logerror(res)
366346
return(invisible(resolv))
367347
}
368348
on.exit(expr = {
369349
data <<- res
370350
resolv <<- 0L
371351
return(res)
372352
})
373-
data <- switch(recv_mode,
374-
serial = unserialize(connection = res),
375-
character = (r <- readBin(con = res, what = recv_mode, n = length(res)))[r != ""],
376-
raw = res,
377-
readBin(con = res, what = recv_mode, n = length(res)))
353+
data <- decode(con = res, mode = recv_mode)
378354
on.exit()
379355
if (keep.raw) raw <<- res
380356
data <<- data

R/listdial.R

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,15 @@ dial <- function(socket,
7878
if (missing(autostart) || isTRUE(autostart)) {
7979
res <- .Call(rnng_dial, .subset2(socket, "socket"), url)
8080
if (is.integer(res)) {
81-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
81+
logerror(res)
8282
return(invisible(res))
8383
}
84-
if (logging()) {
85-
cat(format.POSIXct(Sys.time()), "[ dial start ] sock:",
86-
attr(res, "socket"), "| url:", url, "\n", file = stdout())
87-
}
84+
if (logging()) loginfo(evt = "dial start", pkey = "sock", pval = attr(res, "socket"),
85+
skey = "url", sval = url)
8886
} else {
8987
res <- .Call(rnng_dialer_create, .subset2(socket, "socket"), url)
9088
if (is.integer(res)) {
91-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
89+
logerror(res)
9290
return(invisible(res))
9391
}
9492
}
@@ -107,17 +105,15 @@ dial <- function(socket,
107105
if (missing(autostart) || isTRUE(autostart)) {
108106
res <- .Call(rnng_dial, socket, url)
109107
if (is.integer(res)) {
110-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
108+
logerror(res)
111109
return(invisible(res))
112110
}
113-
if (logging()) {
114-
cat(format.POSIXct(Sys.time()), "[ dial start ] sock:",
115-
attr(res, "socket"), "| url:", url, "\n", file = stdout())
116-
}
111+
if (logging()) loginfo(evt = "dial start", pkey = "sock", pval = attr(res, "socket"),
112+
skey = "url", sval = url)
117113
} else {
118114
res <- .Call(rnng_dialer_create, socket, url)
119115
if (is.integer(res)) {
120-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
116+
logerror(res)
121117
return(invisible(res))
122118
}
123119
}
@@ -207,17 +203,15 @@ listen <- function(socket,
207203
if (missing(autostart) || isTRUE(autostart)) {
208204
res <- .Call(rnng_listen, .subset2(socket, "socket"), url)
209205
if (is.integer(res)) {
210-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
206+
logerror(res)
211207
return(invisible(res))
212208
}
213-
if (logging()) {
214-
cat(format.POSIXct(Sys.time()), "[ list start ] sock:",
215-
attr(res, "socket"), "| url:", url, "\n", file = stdout())
216-
}
209+
if (logging()) loginfo(evt = "list start", pkey = "sock", pval = attr(res, "socket"),
210+
skey = "url", sval = url)
217211
} else {
218212
res <- .Call(rnng_listener_create, .subset2(socket, "socket"), url)
219213
if (is.integer(res)) {
220-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
214+
logerror(res)
221215
return(invisible(res))
222216
}
223217
}
@@ -236,17 +230,15 @@ listen <- function(socket,
236230
if (missing(autostart) || isTRUE(autostart)) {
237231
res <- .Call(rnng_listen, socket, url)
238232
if (is.integer(res)) {
239-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
233+
logerror(res)
240234
return(invisible(res))
241235
}
242-
if (logging()) {
243-
cat(format.POSIXct(Sys.time()), "[ list start ] sock:",
244-
attr(res, "socket"), "| url:", url, "\n", file = stdout())
245-
}
236+
if (logging()) loginfo(evt = "list start", pkey = "sock", pval = attr(res, "socket"),
237+
skey = "url", sval = url)
246238
} else {
247239
res <- .Call(rnng_listener_create, socket, url)
248240
if (is.integer(res)) {
249-
message(Sys.time(), " [ ", res, " ] ", nng_error(res))
241+
logerror(res)
250242
return(invisible(res))
251243
}
252244
}

R/methods.R

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,10 @@ start.nanoListener <- function(x, ...) {
2828

2929
xc <- .Call(rnng_listener_start, x)
3030
if (xc) {
31-
message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
31+
logerror(xc)
3232
} else if (logging()) {
33-
cat(format.POSIXct(Sys.time()), "[ list start ] sock:",
34-
attr(x, "socket"), "| url:", attr(x, "url"), "\n", file = stdout())
33+
loginfo(evt = "list start", pkey = "sock", pval = attr(x, "socket"),
34+
skey = "url", sval = attr(x, "url"))
3535
}
3636
invisible(xc)
3737

@@ -45,10 +45,10 @@ start.nanoDialer <- function(x, async = TRUE, ...) {
4545

4646
xc <- .Call(rnng_dialer_start, x, async)
4747
if (xc) {
48-
message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
48+
logerror(xc)
4949
} else if (logging()) {
50-
cat(format.POSIXct(Sys.time()), "[ dial start ] sock:",
51-
attr(x, "socket"), "| url:", attr(x, "url"), "\n", file = stdout())
50+
loginfo(evt = "dial start", pkey = "sock", pval = attr(x, "socket"),
51+
skey = "url", sval = attr(x, "url"))
5252
}
5353
invisible(xc)
5454

@@ -91,10 +91,10 @@ close.nanoSocket <- function(con, ...) {
9191

9292
xc <- .Call(rnng_close, con)
9393
if (xc) {
94-
message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
94+
logerror(xc)
9595
} else if (logging()) {
96-
cat(format.POSIXct(Sys.time()), "[ sock close ] id:",
97-
attr(con, "id"), "| protocol:", attr(con, "protocol"), "\n", file = stdout())
96+
loginfo(evt = "sock close", pkey = "id", pval = attr(con, "id"),
97+
skey = "protocol", sval = attr(con, "protocol"))
9898
}
9999
invisible(xc)
100100

@@ -107,7 +107,7 @@ close.nanoSocket <- function(con, ...) {
107107
close.nanoContext <- function(con, ...) {
108108

109109
xc <- .Call(rnng_ctx_close, con)
110-
if (xc) message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
110+
if (xc) logerror(xc)
111111
invisible(xc)
112112

113113
}
@@ -120,10 +120,10 @@ close.nanoDialer <- function(con, ...) {
120120

121121
xc <- .Call(rnng_dialer_close, con)
122122
if (xc) {
123-
message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
123+
logerror(xc)
124124
} else if (logging()) {
125-
cat(format.POSIXct(Sys.time()), "[ dial start ] sock:",
126-
attr(con, "socket"), "| url:", attr(con, "url"), "\n", file = stdout())
125+
loginfo(evt = "dial close", pkey = "sock", pval = attr(con, "socket"),
126+
skey = "url", sval = attr(con, "url"))
127127
}
128128

129129
invisible(xc)
@@ -138,10 +138,10 @@ close.nanoListener <- function(con, ...) {
138138

139139
xc <- .Call(rnng_listener_close, con)
140140
if (xc) {
141-
message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
141+
logerror(xc)
142142
} else if (logging()) {
143-
cat(format.POSIXct(Sys.time()), "[ list close ] sock:",
144-
attr(con, "socket"), "| url:", attr(con, "url"), "\n", file = stdout())
143+
loginfo(evt = "list close", pkey = "sock", pval = attr(con, "socket"),
144+
skey = "url", sval = attr(con, "url"))
145145
}
146146

147147
invisible(xc)

R/nanonext-package.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#' library providing high-performance scalability protocols, implementing a
77
#' cross-platform standard for messaging and communications. Serves as a
88
#' concurrency framework for building distributed applications, utilising
9-
#' 'Aio' objects which return an unresolved value whilst its asynchronous
9+
#' 'Aio' objects which return an unresolved value whilst an asynchronous
1010
#' operation is ongoing, automatically resolving to a final value once
1111
#' complete.
1212
#'

R/opts.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ setopt.nanoSocket <- function(object,
5353
string = .Call(rnng_socket_set_string, object, opt, value),
5454
uint64 = .Call(rnng_socket_set_uint64, object, opt, value))
5555

56-
if (xc) message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
56+
if (xc) logerror(xc)
5757
invisible(xc)
5858

5959
}
@@ -83,7 +83,7 @@ setopt.nanoDialer <- function(object,
8383
string = .Call(rnng_dialer_set_string, object, opt, value),
8484
uint64 = .Call(rnng_dialer_set_uint64, object, opt, value))
8585

86-
if (xc) message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
86+
if (xc) logerror(xc)
8787
invisible(xc)
8888

8989
}
@@ -113,7 +113,7 @@ setopt.nanoListener <- function(object,
113113
string = .Call(rnng_listener_set_string, object, opt, value),
114114
uint64 = .Call(rnng_listener_set_uint64, object, opt, value))
115115

116-
if (xc) message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
116+
if (xc) logerror(xc)
117117
invisible(xc)
118118

119119
}
@@ -144,7 +144,7 @@ setopt.nanoContext <- function(object,
144144
string = .Call(rnng_ctx_set_string, object, opt, value),
145145
uint64 = .Call(rnng_ctx_set_uint64, object, opt, value))
146146

147-
if (xc) message(Sys.time(), " [ ", xc, " ] ", nng_error(xc))
147+
if (xc) logerror(xc)
148148
invisible(xc)
149149

150150
}

0 commit comments

Comments
 (0)