Skip to content

Commit c13c870

Browse files
Add tests to increase test coverage.
The following files now have 100% test coverage: * R/as.cluster.R * R/availableConnections.R * R/isConnectionValid.R * R/isForkedNode.R * R/serializedSize.R * R/stealth_sample.R Increased coverage for: * R/makeClusterSequential.R [95.74%]
1 parent 82c4226 commit c13c870

15 files changed

+366
-75
lines changed

DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
Package: parallelly
2-
Version: 1.45.1-9012
2+
Version: 1.45.1-9013
33
Title: Enhancing the 'parallel' Package
44
Imports:
55
parallel,

R/makeClusterSequential.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ sendData.sequential_node <- function(node, data) {
7373
data <- data[["data"]] ## sic!
7474
fun <- data[["fun"]]
7575
args <- data[["args"]]
76-
ret <- data[["return"]]
7776

7877
## Don't evaluate in the global environment, which is the default
7978
if (identical(args[["envir"]], globalenv())) {

inst/testme/_prologue/090.context.R

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,3 @@ on_solaris <- grepl("^solaris", R.version[["os"]])
88

99
covr_testing <- ("covr" %in% loadedNamespaces())
1010
on_githubactions <- isTRUE(as.logical(Sys.getenv("GITHUB_ACTIONS")))
11-
12-
useXDR <- isTRUE(parallelly:::getOption2("parallelly.makeNodePSOCK.useXDR"))

inst/testme/test-as.cluster.R

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
library(parallelly)
22

33
stopCluster <- parallel::stopCluster
4+
useXDR <- isTRUE(parallelly:::getOption2("parallelly.makeNodePSOCK.useXDR"))
45

56
message("*** cluster operations ...")
67

@@ -69,3 +70,25 @@ if (length(remotes) > 0) {
6970
message("*** cluster operations - makeClusterPSOCK(remotes) ... DONE")
7071

7172
message("*** cluster operations ... DONE")
73+
74+
message("*** cluster operations - as.cluster(<non-cluster>) ...")
75+
res <- tryCatch({
76+
as.cluster(1)
77+
}, error = function(ex) ex)
78+
stopifnot(inherits(res, "error"))
79+
message("*** cluster operations - as.cluster(<non-cluster>) ... DONE")
80+
81+
message("*** cluster operations - c(...) with duplicated nodes ...")
82+
cl_dup1 <- makeClusterPSOCK(1L)
83+
on.exit(stopCluster(cl_dup1), add = TRUE)
84+
cl_combined <- c(cl_dup1, cl_dup1)
85+
stopifnot(inherits(cl_combined, "cluster"), length(cl_combined) == 2L)
86+
message("*** cluster operations - c(...) with duplicated nodes ... DONE")
87+
88+
message("*** cluster operations - as.cluster(SOCKnode) ...")
89+
cl_base <- parallel::makeCluster(1L, type = "PSOCK")
90+
node_base <- cl_base[[1]]
91+
res_base <- as.cluster(node_base)
92+
stopifnot(inherits(res_base, "cluster"), inherits(res_base[[1]], "SOCKnode"))
93+
parallel::stopCluster(cl_base)
94+
message("*** cluster operations - as.cluster(SOCKnode) ... DONE")
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
library(parallelly)
2+
3+
message("availableConnections() ...")
4+
5+
## Test with overriding option
6+
options(parallelly.availableConnections = 200L)
7+
n <- availableConnections()
8+
stopifnot(n == 200L)
9+
options(parallelly.availableConnections = NULL)
10+
11+
## Reset memoized value
12+
assign("max", NULL, envir = environment(availableConnections))
13+
14+
## Test with invalid option
15+
res <- tryCatch({
16+
options(parallelly.availableConnections = "abc")
17+
availableConnections()
18+
}, error = function(e) e)
19+
stopifnot(inherits(res, "simpleError"))
20+
options(parallelly.availableConnections = NULL)
21+
22+
## Reset memoized value
23+
assign("max", NULL, envir = environment(availableConnections))
24+
25+
## Test with a small number of tries to trigger +Inf
26+
options(parallelly.availableConnections.tries = 10L)
27+
n <- availableConnections()
28+
stopifnot(is.infinite(n))
29+
options(parallelly.availableConnections.tries = NULL)
30+
31+
## Reset memoized value
32+
assign("max", NULL, envir = environment(availableConnections))
33+
34+
## Test with invalid tries option
35+
res <- tryCatch({
36+
options(parallelly.availableConnections.tries = -1L)
37+
availableConnections()
38+
}, error = function(e) e)
39+
stopifnot(inherits(res, "simpleError"))
40+
options(parallelly.availableConnections.tries = NULL)
41+
42+
## Reset memoized value
43+
assign("max", NULL, envir = environment(availableConnections))
44+
45+
## Basic functionality
46+
n <- availableConnections()
47+
stopifnot(is.integer(n) || is.infinite(n), n >= 3L)
48+
49+
f <- freeConnections()
50+
stopifnot(is.integer(f) || is.infinite(f), f <= n)
51+
52+
message("availableConnections() ... done")
Lines changed: 66 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,52 +1,68 @@
11
library(parallelly)
22

3-
stopCluster <- parallel::stopCluster
4-
5-
message("*** Connections ...")
6-
7-
con <- stdin()
8-
idx <- as.integer(con)
9-
id <- connectionId(con)
10-
valid <- isConnectionValid(con)
11-
stopifnot(inherits(con, "connection"), idx == 0L, id == 0L, isTRUE(valid))
12-
13-
con <- stdout()
14-
idx <- as.integer(con)
15-
id <- connectionId(con)
16-
valid <- isConnectionValid(con)
17-
stopifnot(inherits(con, "connection"), idx == 1L, id == 1L, isTRUE(valid))
18-
19-
con <- stderr()
20-
idx <- as.integer(con)
21-
id <- connectionId(con)
22-
valid <- isConnectionValid(con)
23-
stopifnot(inherits(con, "connection"), idx == 2L, id == 2L, isTRUE(valid))
24-
25-
message("- Connections cannot be serialized")
26-
27-
con <- file(tempfile(), open = "w")
28-
x <- list(value = 42, stderr = stderr(), con = con)
29-
y <- unserialize(serialize(x, connection = NULL))
30-
print(connectionId(x$stderr))
31-
print(connectionId(x$con))
32-
print(isConnectionValid(x$stderr))
33-
print(isConnectionValid(x$con))
34-
print(connectionId(y$stderr))
35-
print(connectionId(y$con))
36-
print(isConnectionValid(y$stderr))
37-
print(isConnectionValid(y$con))
38-
39-
stopifnot(
40-
identical(y$value, x$value),
41-
connectionId(x$stderr) == 2L,
42-
isConnectionValid(x$stderr),
43-
isConnectionValid(y$stderr),
44-
identical(connectionId(y$stderr), connectionId(x$stderr)),
45-
connectionId(x$con) >= 3L,
46-
isConnectionValid(x$con),
47-
connectionId(y$con) == -1L,
48-
!isConnectionValid(y$con)
49-
)
50-
close(con)
51-
52-
message("*** Connections ... DONE")
3+
message("isConnectionValid() ...")
4+
5+
## Test connectionId()
6+
7+
## Test with stdin, stdout, stderr
8+
stopifnot(connectionId(stdin()) == 0L)
9+
stopifnot(connectionId(stdout()) == 1L)
10+
stopifnot(connectionId(stderr()) == 2L)
11+
12+
## Test with a connection that has no conn_id attribute
13+
con_no_id <- file(tempfile(), open = "w+")
14+
class(con_no_id) <- c("test_connection", class(con_no_id)) ## Inherit from connection
15+
attr(con_no_id, "conn_id") <- NULL ## Ensure no conn_id
16+
res <- connectionId(con_no_id)
17+
stopifnot(is.na(res))
18+
close(con_no_id)
19+
20+
## Test with a serialized connection (should return -1L)
21+
con_ser <- file(tempfile(), open = "w")
22+
x_ser <- list(con = con_ser)
23+
y_ser <- unserialize(serialize(x_ser, connection = NULL))
24+
stopifnot(connectionId(y_ser$con) == -1L)
25+
close(con_ser)
26+
27+
## Test with a valid file connection
28+
con_valid <- file(tempfile(), open = "w+")
29+
id_valid <- connectionId(con_valid)
30+
stopifnot(is.integer(id_valid), id_valid >= 3L)
31+
close(con_valid)
32+
33+
34+
## Test isConnectionValid()
35+
36+
## Test with stdin, stdout, stderr
37+
stopifnot(isConnectionValid(stdin()))
38+
stopifnot(isConnectionValid(stdout()))
39+
stopifnot(isConnectionValid(stderr()))
40+
41+
## Test with a serialized connection (should be FALSE)
42+
con_ser_valid <- file(tempfile(), open = "w")
43+
x_ser_valid <- list(con = con_ser_valid)
44+
y_ser_valid <- unserialize(serialize(x_ser_valid, connection = NULL))
45+
res_ser_valid <- isConnectionValid(y_ser_valid$con)
46+
stopifnot(!res_ser_valid, inherits(attr(res_ser_valid, "reason"), "character"))
47+
close(con_ser_valid)
48+
49+
## Test with a valid connection
50+
con_real_valid <- file(tempfile(), open = "w+")
51+
stopifnot(isConnectionValid(con_real_valid))
52+
close(con_real_valid)
53+
54+
55+
## Test with a connection where index is not found in getAllConnections()
56+
# 1. Create a connection
57+
con_temp_file <- file(tempfile(), open = "w+")
58+
59+
# 2. Close it, so its index is no longer in getAllConnections()
60+
close(con_temp_file)
61+
62+
# 3. Now `con_temp_file` is a "zombie" object - its index is preserved
63+
# but `is.element(as.integer(con_temp_file), getAllConnections())` is FALSE.
64+
res_non_existent <- isConnectionValid(con_temp_file)
65+
stopifnot(!res_non_existent, inherits(attr(res_non_existent, "reason"), "character"))
66+
67+
68+
message("isConnectionValid() ... done")

inst/testme/test-isForkedNode.R

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
isForkedNode <- parallelly:::isForkedNode
2+
3+
message("isForkedNode() ...")
4+
5+
# 1. Test with a default object
6+
x <- 123
7+
res <- isForkedNode(x)
8+
stopifnot(is.na(res))
9+
10+
x <- list(a = 1, b = 2)
11+
res <- isForkedNode(x)
12+
stopifnot(is.na(res))
13+
14+
15+
# 2. Test with a forknode object
16+
## Create a dummy forknode object
17+
## It's enough to set its class attribute
18+
node <- structure(list(), class = "forknode")
19+
res <- isForkedNode(node)
20+
stopifnot(res)
21+
22+
23+
# 3. Test with a cluster object
24+
## Create dummy cluster objects
25+
node1 <- structure(list(), class = "forknode")
26+
node2 <- structure(list(), class = "notforknode") ## A dummy node that is not a forknode
27+
node3 <- structure(list(), class = "forknode")
28+
29+
cluster_obj <- list(node1, node2, node3)
30+
class(cluster_obj) <- "cluster"
31+
32+
res <- isForkedNode(cluster_obj)
33+
stopifnot(is.logical(res), length(res) == 3)
34+
stopifnot(res[1])
35+
stopifnot(is.na(res[2]))
36+
stopifnot(res[3])
37+
38+
message("isForkedNode() ... done")

inst/testme/test-makeClusterPSOCK.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ is_ip_number <- parallelly:::is_ip_number
55
is_localhost <- parallelly:::is_localhost
66
find_rshcmd <- parallelly:::find_rshcmd
77

8+
useXDR <- isTRUE(parallelly:::getOption2("parallelly.makeNodePSOCK.useXDR"))
9+
810
message("*** makeClusterPSOCK() ...")
911

1012
message("- makeClusterPSOCK() - internal utility functions")
Lines changed: 88 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,98 @@
1-
if (getRversion() >= "4.4.0") {
1+
message("makeClusterSequential() ...")
2+
3+
## Requires R (>= 4.4.0)
4+
if (getRversion() < "4.4.0") {
5+
message("Skipping because R version is less than 4.4.0")
6+
message("makeClusterSequential() ... skipped")
7+
} else {
28
library(parallelly)
9+
10+
## Temporarily override parallel::makeCluster and parallel::stopCluster
11+
## to use the sequential versions
12+
old_makeCluster <- parallel::makeCluster
13+
old_stopCluster <- parallel::stopCluster
14+
on.exit({
15+
unlockBinding("makeCluster", getNamespace("parallel"))
16+
assign("makeCluster", old_makeCluster, envir = getNamespace("parallel"))
17+
unlockBinding("stopCluster", getNamespace("parallel"))
18+
assign("stopCluster", old_stopCluster, envir = getNamespace("parallel"))
19+
}, add = TRUE)
20+
21+
unlockBinding("makeCluster", getNamespace("parallel"))
22+
assign("makeCluster", makeClusterSequential, envir = getNamespace("parallel"))
23+
unlockBinding("stopCluster", getNamespace("parallel"))
24+
assign("stopCluster", function(cl) {
25+
## Do nothing for sequential_cluster, as it's not a real parallel cluster
26+
## Except for invalidating the node state
27+
for (node in cl) {
28+
if (inherits(node, "sequential_node")) {
29+
node$envir[["...parallelly.valid..."]] <- FALSE
30+
}
31+
}
32+
}, envir = getNamespace("parallel"))
33+
334
library(parallel)
4-
35+
536
cl <- makeClusterSequential()
6-
print(cl)
37+
stopifnot(inherits(cl, "sequential_cluster"), length(cl) == 1L)
738

8-
y_truth <- lapply(X = 1:3, FUN = sqrt)
39+
## Test print.sequential_cluster
40+
capture.output(print(cl))
41+
42+
## Test print.sequential_node
43+
capture.output(print(cl[[1]]))
44+
45+
## Test basic parLapply functionality
946
y <- parLapply(cl, X = 1:3, fun = sqrt)
10-
str(y)
11-
stopifnot(identical(y, y_truth))
12-
13-
pid <- Sys.getpid()
14-
print(pid)
15-
y <- clusterEvalQ(cl, Sys.getpid())
16-
str(y)
17-
stopifnot(identical(y[[1]], pid))
18-
47+
stopifnot(identical(y, lapply(1:3, sqrt)))
48+
49+
## Test clusterEvalQ with side effects
50+
abc <- 3.14
1951
y <- clusterEvalQ(cl, { abc <- 42; abc })
20-
str(y)
21-
stopifnot(!exists("abc", inherits = FALSE))
52+
stopifnot(identical(y, list(42)), abc == 3.14)
2253

54+
## Test stopping the cluster
2355
stopCluster(cl)
24-
print(cl)
56+
57+
## Test accessing an invalid node (after stopCluster)
58+
res <- tryCatch({
59+
parallel::sendData(cl[[1]], list(type = "EXEC", data = list(fun = identity, args = list(1))))
60+
}, error = function(e) e)
61+
stopifnot(inherits(res, "simpleError"), grepl("node is no longer valid", res$message))
62+
63+
64+
## Re-create cluster for other tests
65+
cl <- makeClusterSequential()
66+
node <- cl[[1]]
67+
68+
## Test sendData with "DONE" type
69+
parallel::sendData(node, list(type = "DONE"))
70+
stopifnot(!isTRUE(node$envir[["...parallelly.valid..."]]))
71+
72+
## Test sendData with unknown type
73+
## Test sendData with unknown type
74+
## Re-create cluster for this test, as the previous one was invalidated
75+
cl_unknown <- makeClusterSequential()
76+
node_unknown <- cl_unknown[[1]]
2577

2678
res <- tryCatch({
27-
y <- clusterEvalQ(cl, { 42 })
28-
}, error = identity)
29-
print(res)
30-
stopifnot(inherits(res, "error"))
31-
} ## if (getRversion() >= "4.4.0")
79+
parallel::sendData(node_unknown, list(type = "UNKNOWN"))
80+
}, error = function(e) e)
81+
message("Error message for UNKNOWN type: ", res$message)
82+
stopifnot(inherits(res, "simpleError"), grepl("type = ['‘]UNKNOWN['’] not yet implemented", res$message))
83+
84+
## Test recvData internal error
85+
## Re-create cluster for this test
86+
cl_internal_error <- makeClusterSequential()
87+
node_internal_error <- cl_internal_error[[1]]
88+
89+
## Manipulate internal state to trigger error
90+
node_internal_error$envir[["value"]] <- list(type = "BAD_TYPE")
91+
92+
res <- tryCatch({
93+
parallel::recvData(node_internal_error)
94+
}, error = function(e) e)
95+
stopifnot(inherits(res, "simpleError"), grepl("INTERNAL ERROR", res$message))
96+
97+
message("makeClusterSequential() ... done")
98+
}

0 commit comments

Comments
 (0)