Skip to content

Commit d49d9e4

Browse files
committed
[SPARK-21693][R][FOLLOWUP] Reduce shuffle partitions running R worker in few tests to speed up
## What changes were proposed in this pull request? This is a followup to reduce AppVeyor test time. This PR proposes to reduce the number of shuffle partitions to reduce the tasks running R workers in few particular tests. The symptom is similar as described in `https://github.com/apache/spark/pull/19722`. There are many R processes newly launched on Windows without forking and it makes the differences of elapsed time between Linux and Windows. Here is the simple comparison for before/after of this change. I manually tested this by disabling `spark.sparkr.use.daemon`. Disabling it resembles the tests on Windows: **Before** <img width="672" alt="2017-11-25 12 22 13" src="https://user-images.githubusercontent.com/6477701/33217949-b5528dfa-d17d-11e7-8050-75675c39eb20.png"> **After** <img width="682" alt="2017-11-25 12 32 00" src="https://user-images.githubusercontent.com/6477701/33217958-c6518052-d17d-11e7-9f8e-1be21a784559.png"> So, this probably will reduce roughly more than 10 minutes. ## How was this patch tested? AppVeyor tests Author: hyukjinkwon <[email protected]> Closes #19816 from HyukjinKwon/SPARK-21693-followup.
1 parent fba63c1 commit d49d9e4

File tree

1 file changed

+148
-119
lines changed

1 file changed

+148
-119
lines changed

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 148 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -3021,41 +3021,54 @@ test_that("dapplyCollect() on DataFrame with a binary column", {
30213021
})
30223022

30233023
test_that("repartition by columns on DataFrame", {
3024-
df <- createDataFrame(
3025-
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
3026-
c("a", "b", "c", "d"))
3027-
3028-
# no column and number of partitions specified
3029-
retError <- tryCatch(repartition(df), error = function(e) e)
3030-
expect_equal(grepl
3031-
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
3032-
3033-
# repartition by column and number of partitions
3034-
actual <- repartition(df, 3, col = df$"a")
3035-
3036-
# Checking that at least the dimensions are identical
3037-
expect_identical(dim(df), dim(actual))
3038-
expect_equal(getNumPartitions(actual), 3L)
3039-
3040-
# repartition by number of partitions
3041-
actual <- repartition(df, 13L)
3042-
expect_identical(dim(df), dim(actual))
3043-
expect_equal(getNumPartitions(actual), 13L)
3044-
3045-
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
3046-
3047-
# a test case with a column and dapply
3048-
schema <- structType(structField("a", "integer"), structField("avg", "double"))
3049-
df <- repartition(df, col = df$"a")
3050-
df1 <- dapply(
3051-
df,
3052-
function(x) {
3053-
y <- (data.frame(x$a[1], mean(x$b)))
3054-
},
3055-
schema)
3024+
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3025+
# partitions to reduce the number of the tasks to speed up the test. This is particularly
3026+
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3027+
conf <- callJMethod(sparkSession, "conf")
3028+
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
3029+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
3030+
tryCatch({
3031+
df <- createDataFrame(
3032+
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
3033+
c("a", "b", "c", "d"))
3034+
3035+
# no column and number of partitions specified
3036+
retError <- tryCatch(repartition(df), error = function(e) e)
3037+
expect_equal(grepl
3038+
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)
3039+
3040+
# repartition by column and number of partitions
3041+
actual <- repartition(df, 3, col = df$"a")
3042+
3043+
# Checking that at least the dimensions are identical
3044+
expect_identical(dim(df), dim(actual))
3045+
expect_equal(getNumPartitions(actual), 3L)
3046+
3047+
# repartition by number of partitions
3048+
actual <- repartition(df, 13L)
3049+
expect_identical(dim(df), dim(actual))
3050+
expect_equal(getNumPartitions(actual), 13L)
3051+
3052+
expect_equal(getNumPartitions(coalesce(actual, 1L)), 1L)
3053+
3054+
# a test case with a column and dapply
3055+
schema <- structType(structField("a", "integer"), structField("avg", "double"))
3056+
df <- repartition(df, col = df$"a")
3057+
3058+
df1 <- dapply(
3059+
df,
3060+
function(x) {
3061+
y <- (data.frame(x$a[1], mean(x$b)))
3062+
},
3063+
schema)
30563064

3057-
# Number of partitions is equal to 2
3058-
expect_equal(nrow(df1), 2)
3065+
# Number of partitions is equal to 2
3066+
expect_equal(nrow(df1), 2)
3067+
},
3068+
finally = {
3069+
# Resetting the conf back to default value
3070+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
3071+
})
30593072
})
30603073

30613074
test_that("coalesce, repartition, numPartitions", {
@@ -3078,101 +3091,117 @@ test_that("coalesce, repartition, numPartitions", {
30783091
})
30793092

30803093
test_that("gapply() and gapplyCollect() on a DataFrame", {
3081-
df <- createDataFrame(
3082-
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
3083-
c("a", "b", "c", "d"))
3084-
expected <- collect(df)
3085-
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
3086-
actual <- collect(df1)
3087-
expect_identical(actual, expected)
3088-
3089-
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
3090-
expect_identical(df1Collect, expected)
3091-
3092-
# gapply on empty grouping columns.
3093-
df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
3094-
actual <- collect(df1)
3095-
expect_identical(actual, expected)
3096-
3097-
# Computes the sum of second column by grouping on the first and third columns
3098-
# and checks if the sum is larger than 2
3099-
schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
3100-
"a INT, e BOOLEAN")
3101-
for (schema in schemas) {
3102-
df2 <- gapply(
3094+
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3095+
# partitions to reduce the number of the tasks to speed up the test. This is particularly
3096+
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3097+
conf <- callJMethod(sparkSession, "conf")
3098+
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
3099+
# TODO: Lower number of 'spark.sql.shuffle.partitions' causes test failures
3100+
# for an unknown reason. Probably we should fix it.
3101+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "16")
3102+
tryCatch({
3103+
df <- createDataFrame(
3104+
list(list(1L, 1, "1", 0.1), list(1L, 2, "1", 0.2), list(3L, 3, "3", 0.3)),
3105+
c("a", "b", "c", "d"))
3106+
expected <- collect(df)
3107+
df1 <- gapply(df, "a", function(key, x) { x }, schema(df))
3108+
actual <- collect(df1)
3109+
expect_identical(actual, expected)
3110+
3111+
df1Collect <- gapplyCollect(df, list("a"), function(key, x) { x })
3112+
expect_identical(df1Collect, expected)
3113+
3114+
# gapply on empty grouping columns.
3115+
df1 <- gapply(df, c(), function(key, x) { x }, schema(df))
3116+
actual <- collect(df1)
3117+
expect_identical(actual, expected)
3118+
3119+
# Computes the sum of second column by grouping on the first and third columns
3120+
# and checks if the sum is larger than 2
3121+
schemas <- list(structType(structField("a", "integer"), structField("e", "boolean")),
3122+
"a INT, e BOOLEAN")
3123+
for (schema in schemas) {
3124+
df2 <- gapply(
3125+
df,
3126+
c(df$"a", df$"c"),
3127+
function(key, x) {
3128+
y <- data.frame(key[1], sum(x$b) > 2)
3129+
},
3130+
schema)
3131+
actual <- collect(df2)$e
3132+
expected <- c(TRUE, TRUE)
3133+
expect_identical(actual, expected)
3134+
3135+
df2Collect <- gapplyCollect(
3136+
df,
3137+
c(df$"a", df$"c"),
3138+
function(key, x) {
3139+
y <- data.frame(key[1], sum(x$b) > 2)
3140+
colnames(y) <- c("a", "e")
3141+
y
3142+
})
3143+
actual <- df2Collect$e
3144+
expect_identical(actual, expected)
3145+
}
3146+
3147+
# Computes the arithmetic mean of the second column by grouping
3148+
# on the first and third columns. Output the groupping value and the average.
3149+
schema <- structType(structField("a", "integer"), structField("c", "string"),
3150+
structField("avg", "double"))
3151+
df3 <- gapply(
31033152
df,
3104-
c(df$"a", df$"c"),
3153+
c("a", "c"),
31053154
function(key, x) {
3106-
y <- data.frame(key[1], sum(x$b) > 2)
3155+
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
31073156
},
31083157
schema)
3109-
actual <- collect(df2)$e
3110-
expected <- c(TRUE, TRUE)
3158+
actual <- collect(df3)
3159+
actual <- actual[order(actual$a), ]
3160+
rownames(actual) <- NULL
3161+
expected <- collect(select(df, "a", "b", "c"))
3162+
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
3163+
colnames(expected) <- c("a", "c", "avg")
3164+
expected <- expected[order(expected$a), ]
3165+
rownames(expected) <- NULL
31113166
expect_identical(actual, expected)
31123167

3113-
df2Collect <- gapplyCollect(
3168+
df3Collect <- gapplyCollect(
31143169
df,
3115-
c(df$"a", df$"c"),
3170+
c("a", "c"),
31163171
function(key, x) {
3117-
y <- data.frame(key[1], sum(x$b) > 2)
3118-
colnames(y) <- c("a", "e")
3172+
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
3173+
colnames(y) <- c("a", "c", "avg")
31193174
y
31203175
})
3121-
actual <- df2Collect$e
3122-
expect_identical(actual, expected)
3123-
}
3124-
3125-
# Computes the arithmetic mean of the second column by grouping
3126-
# on the first and third columns. Output the groupping value and the average.
3127-
schema <- structType(structField("a", "integer"), structField("c", "string"),
3128-
structField("avg", "double"))
3129-
df3 <- gapply(
3130-
df,
3131-
c("a", "c"),
3132-
function(key, x) {
3133-
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
3134-
},
3135-
schema)
3136-
actual <- collect(df3)
3137-
actual <- actual[order(actual$a), ]
3138-
rownames(actual) <- NULL
3139-
expected <- collect(select(df, "a", "b", "c"))
3140-
expected <- data.frame(aggregate(expected$b, by = list(expected$a, expected$c), FUN = mean))
3141-
colnames(expected) <- c("a", "c", "avg")
3142-
expected <- expected[order(expected$a), ]
3143-
rownames(expected) <- NULL
3144-
expect_identical(actual, expected)
3145-
3146-
df3Collect <- gapplyCollect(
3147-
df,
3148-
c("a", "c"),
3149-
function(key, x) {
3150-
y <- data.frame(key, mean(x$b), stringsAsFactors = FALSE)
3151-
colnames(y) <- c("a", "c", "avg")
3152-
y
3153-
})
3154-
actual <- df3Collect[order(df3Collect$a), ]
3155-
expect_identical(actual$avg, expected$avg)
3156-
3157-
irisDF <- suppressWarnings(createDataFrame(iris))
3158-
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
3159-
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
3160-
df4 <- gapply(
3161-
cols = "Sepal_Length",
3162-
irisDF,
3163-
function(key, x) {
3164-
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
3165-
},
3166-
schema)
3167-
actual <- collect(df4)
3168-
actual <- actual[order(actual$Sepal_Length), ]
3169-
rownames(actual) <- NULL
3170-
agg_local_df <- data.frame(aggregate(iris$Sepal.Width, by = list(iris$Sepal.Length), FUN = mean),
3171-
stringsAsFactors = FALSE)
3172-
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
3173-
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
3174-
rownames(expected) <- NULL
3175-
expect_identical(actual, expected)
3176+
actual <- df3Collect[order(df3Collect$a), ]
3177+
expect_identical(actual$avg, expected$avg)
3178+
3179+
irisDF <- suppressWarnings(createDataFrame(iris))
3180+
schema <- structType(structField("Sepal_Length", "double"), structField("Avg", "double"))
3181+
# Groups by `Sepal_Length` and computes the average for `Sepal_Width`
3182+
df4 <- gapply(
3183+
cols = "Sepal_Length",
3184+
irisDF,
3185+
function(key, x) {
3186+
y <- data.frame(key, mean(x$Sepal_Width), stringsAsFactors = FALSE)
3187+
},
3188+
schema)
3189+
actual <- collect(df4)
3190+
actual <- actual[order(actual$Sepal_Length), ]
3191+
rownames(actual) <- NULL
3192+
agg_local_df <- data.frame(aggregate(iris$Sepal.Width,
3193+
by = list(iris$Sepal.Length),
3194+
FUN = mean),
3195+
stringsAsFactors = FALSE)
3196+
colnames(agg_local_df) <- c("Sepal_Length", "Avg")
3197+
expected <- agg_local_df[order(agg_local_df$Sepal_Length), ]
3198+
rownames(expected) <- NULL
3199+
expect_identical(actual, expected)
3200+
},
3201+
finally = {
3202+
# Resetting the conf back to default value
3203+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
3204+
})
31763205
})
31773206

31783207
test_that("Window functions on a DataFrame", {

0 commit comments

Comments
 (0)