Skip to content

Commit 505480c

Browse files
committed
[SPARK-23770][R] Exposes repartitionByRange in SparkR
## What changes were proposed in this pull request? This PR proposes to expose `repartitionByRange`. ```R > df <- createDataFrame(iris) ... > getNumPartitions(repartitionByRange(df, 3, col = df$Species)) [1] 3 ``` ## How was this patch tested? Manually tested and the unit tests were added. The diff with `repartition` can be checked as below: ```R > df <- createDataFrame(mtcars) > take(repartition(df, 10, df$wt), 3) mpg cyl disp hp drat wt qsec vs am gear carb 1 14.3 8 360.0 245 3.21 3.570 15.84 0 0 3 4 2 10.4 8 460.0 215 3.00 5.424 17.82 0 0 3 4 3 32.4 4 78.7 66 4.08 2.200 19.47 1 1 4 1 > take(repartitionByRange(df, 10, df$wt), 3) mpg cyl disp hp drat wt qsec vs am gear carb 1 30.4 4 75.7 52 4.93 1.615 18.52 1 1 4 2 2 33.9 4 71.1 65 4.22 1.835 19.90 1 1 4 1 3 27.3 4 79.0 66 4.08 1.935 18.90 1 1 4 1 ``` Author: hyukjinkwon <[email protected]> Closes apache#20902 from HyukjinKwon/r-repartitionByRange.
1 parent 641aec6 commit 505480c

File tree

4 files changed

+112
-2
lines changed

4 files changed

+112
-2
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,7 @@ exportMethods("arrange",
151151
"registerTempTable",
152152
"rename",
153153
"repartition",
154+
"repartitionByRange",
154155
"rollup",
155156
"sample",
156157
"sample_frac",

R/pkg/R/DataFrame.R

Lines changed: 63 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,7 @@ setMethod("storageLevel",
687687
#' @rdname coalesce
688688
#' @name coalesce
689689
#' @aliases coalesce,SparkDataFrame-method
690-
#' @seealso \link{repartition}
690+
#' @seealso \link{repartition}, \link{repartitionByRange}
691691
#' @examples
692692
#'\dontrun{
693693
#' sparkR.session()
@@ -723,7 +723,7 @@ setMethod("coalesce",
723723
#' @rdname repartition
724724
#' @name repartition
725725
#' @aliases repartition,SparkDataFrame-method
726-
#' @seealso \link{coalesce}
726+
#' @seealso \link{coalesce}, \link{repartitionByRange}
727727
#' @examples
728728
#'\dontrun{
729729
#' sparkR.session()
@@ -759,6 +759,67 @@ setMethod("repartition",
759759
dataFrame(sdf)
760760
})
761761

762+
763+
#' Repartition by range
764+
#'
765+
#' The following options for repartition by range are possible:
766+
#' \itemize{
767+
#' \item{1.} {Return a new SparkDataFrame range partitioned by
768+
#' the given columns into \code{numPartitions}.}
769+
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
770+
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
771+
#'}
772+
#'
773+
#' @param x a SparkDataFrame.
774+
#' @param numPartitions the number of partitions to use.
775+
#' @param col the column by which the range partitioning will be performed.
776+
#' @param ... additional column(s) to be used in the range partitioning.
777+
#'
778+
#' @family SparkDataFrame functions
779+
#' @rdname repartitionByRange
780+
#' @name repartitionByRange
781+
#' @aliases repartitionByRange,SparkDataFrame-method
782+
#' @seealso \link{repartition}, \link{coalesce}
783+
#' @examples
784+
#'\dontrun{
785+
#' sparkR.session()
786+
#' path <- "path/to/file.json"
787+
#' df <- read.json(path)
788+
#' newDF <- repartitionByRange(df, col = df$col1, df$col2)
789+
#' newDF <- repartitionByRange(df, 3L, col = df$col1, df$col2)
790+
#'}
791+
#' @note repartitionByRange since 2.4.0
792+
setMethod("repartitionByRange",
793+
signature(x = "SparkDataFrame"),
794+
function(x, numPartitions = NULL, col = NULL, ...) {
795+
if (!is.null(numPartitions) && !is.null(col)) {
796+
# number of partitions and columns both are specified
797+
if (is.numeric(numPartitions) && class(col) == "Column") {
798+
cols <- list(col, ...)
799+
jcol <- lapply(cols, function(c) { c@jc })
800+
sdf <- callJMethod(x@sdf, "repartitionByRange", numToInt(numPartitions), jcol)
801+
} else {
802+
stop(paste("numPartitions and col must be numeric and Column; however, got",
803+
class(numPartitions), "and", class(col)))
804+
}
805+
} else if (!is.null(col)) {
806+
# only columns are specified
807+
if (class(col) == "Column") {
808+
cols <- list(col, ...)
809+
jcol <- lapply(cols, function(c) { c@jc })
810+
sdf <- callJMethod(x@sdf, "repartitionByRange", jcol)
811+
} else {
812+
stop(paste("col must be Column; however, got", class(col)))
813+
}
814+
} else if (!is.null(numPartitions)) {
815+
# only numPartitions is specified
816+
stop("At least one partition-by column must be specified.")
817+
} else {
818+
stop("Please, specify a column(s) or the number of partitions with a column(s)")
819+
}
820+
dataFrame(sdf)
821+
})
822+
762823
#' toJSON
763824
#'
764825
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.

R/pkg/R/generics.R

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,6 +531,9 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") })
531531
#' @rdname repartition
532532
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
533533

534+
#' @rdname repartitionByRange
535+
setGeneric("repartitionByRange", function(x, ...) { standardGeneric("repartitionByRange") })
536+
534537
#' @rdname sample
535538
setGeneric("sample",
536539
function(x, withReplacement = FALSE, fraction, seed) {

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3104,6 +3104,51 @@ test_that("repartition by columns on DataFrame", {
31043104
})
31053105
})
31063106

3107+
test_that("repartitionByRange on a DataFrame", {
3108+
# The tasks here launch R workers with shuffles. So, we decrease the number of shuffle
3109+
# partitions to reduce the number of the tasks to speed up the test. This is particularly
3110+
# slow on Windows because the R workers are unable to be forked. See also SPARK-21693.
3111+
conf <- callJMethod(sparkSession, "conf")
3112+
shufflepartitionsvalue <- callJMethod(conf, "get", "spark.sql.shuffle.partitions")
3113+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", "5")
3114+
tryCatch({
3115+
df <- createDataFrame(mtcars)
3116+
expect_error(repartitionByRange(df, "haha", df$mpg),
3117+
"numPartitions and col must be numeric and Column.*")
3118+
expect_error(repartitionByRange(df),
3119+
".*specify a column.*or the number of partitions with a column.*")
3120+
expect_error(repartitionByRange(df, col = "haha"),
3121+
"col must be Column; however, got.*")
3122+
expect_error(repartitionByRange(df, 3),
3123+
"At least one partition-by column must be specified.")
3124+
3125+
# The order of rows should be different with a normal repartition.
3126+
actual <- repartitionByRange(df, 3, df$mpg)
3127+
expect_equal(getNumPartitions(actual), 3)
3128+
expect_false(identical(collect(actual), collect(repartition(df, 3, df$mpg))))
3129+
3130+
actual <- repartitionByRange(df, col = df$mpg)
3131+
expect_false(identical(collect(actual), collect(repartition(df, col = df$mpg))))
3132+
3133+
# They should have same data.
3134+
actual <- collect(repartitionByRange(df, 3, df$mpg))
3135+
actual <- actual[order(actual$mpg), ]
3136+
expected <- collect(repartition(df, 3, df$mpg))
3137+
expected <- expected[order(expected$mpg), ]
3138+
expect_true(all(actual == expected))
3139+
3140+
actual <- collect(repartitionByRange(df, col = df$mpg))
3141+
actual <- actual[order(actual$mpg), ]
3142+
expected <- collect(repartition(df, col = df$mpg))
3143+
expected <- expected[order(expected$mpg), ]
3144+
expect_true(all(actual == expected))
3145+
},
3146+
finally = {
3147+
# Resetting the conf back to default value
3148+
callJMethod(conf, "set", "spark.sql.shuffle.partitions", shufflepartitionsvalue)
3149+
})
3150+
})
3151+
31073152
test_that("coalesce, repartition, numPartitions", {
31083153
df <- as.DataFrame(cars, numPartitions = 5)
31093154
expect_equal(getNumPartitions(df), 5)

0 commit comments

Comments
 (0)