Skip to content

Commit 109f910

Browse files
authored
Merge pull request apache-spark-on-k8s#490 from palantir/rk/merge-again
Merge from upstream
2 parents 39513a4 + b2ee254 commit 109f910

File tree

1,295 files changed

+33535
-18192
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,295 files changed

+33535
-18192
lines changed

.gitignore

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,3 +98,6 @@ spark-warehouse/
9898
*.Rproj.*
9999

100100
.Rproj.user
101+
102+
# For SBT
103+
.jvmopts

FORK.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
* [SPARK-17059](https://issues.apache.org/jira/browse/SPARK-17059) - Allow FileFormat to specify partition pruning strategy via splits
77
* [SPARK-24345](https://issues.apache.org/jira/browse/SPARK-24345) - Improve ParseError stop location when offending symbol is a token
88
* [SPARK-23795](https://issues.apache.org/jira/browse/SPARK-23795) - Make AbstractLauncher#self() protected
9-
* [SPARK-23153](https://issues.apache.org/jira/browse/SPARK-23153) - Support application dependencies in submission client's local file system
109
* [SPARK-18079](https://issues.apache.org/jira/browse/SPARK-18079) - CollectLimitExec.executeToIterator should perform per-partition limits
1110

1211
* [SPARK-15777](https://issues.apache.org/jira/browse/SPARK-15777) (Partial fix) - Catalog federation
@@ -31,3 +30,9 @@
3130
* [SPARK-25862](https://issues.apache.org/jira/browse/SPARK-25862) - Removal of `unboundedPreceding`, `unboundedFollowing`, `currentRow`
3231
* [SPARK-26127](https://issues.apache.org/jira/browse/SPARK-26127) - Removal of deprecated setters from tree regression and classification models
3332
* [SPARK-25867](https://issues.apache.org/jira/browse/SPARK-25867) - Removal of KMeans computeCost
33+
* [SPARK-26216](https://issues.apache.org/jira/browse/SPARK-26216) - Change to UserDefinedFunction type
34+
* [SPARK-26323](https://issues.apache.org/jira/browse/SPARK-26323) - Scala UDF null checking
35+
* [SPARK-26580](https://issues.apache.org/jira/browse/SPARK-26580) - Bring back scala 2.11 behaviour of primitive types null behaviour
36+
* [SPARK-26133](https://issues.apache.org/jira/browse/SPARK-26133) - Old OneHotEncoder
37+
* [SPARK-11215](https://issues.apache.org/jira/browse/SPARK-11215) - StringIndexer multi column support
38+
* [SPARK-26616](https://issues.apache.org/jira/browse/SPARK-26616) - No document frequency in IDFModel

R/README.md

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,7 @@ To set other options like driver memory, executor memory etc. you can pass in th
3939

4040
#### Using SparkR from RStudio
4141

42-
If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example
43-
```R
44-
# Set this to where Spark is installed
45-
Sys.setenv(SPARK_HOME="/Users/username/spark")
46-
# This line loads SparkR from the installed directory
47-
.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
48-
library(SparkR)
49-
sparkR.session()
50-
```
42+
If you wish to use SparkR from RStudio, please refer [SparkR documentation](https://spark.apache.org/docs/latest/sparkr.html#starting-up-from-rstudio).
5143

5244
#### Making changes to SparkR
5345

R/pkg/NAMESPACE

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,8 @@ exportMethods("glm",
6767
"spark.fpGrowth",
6868
"spark.freqItemsets",
6969
"spark.associationRules",
70-
"spark.findFrequentSequentialPatterns")
70+
"spark.findFrequentSequentialPatterns",
71+
"spark.assignClusters")
7172

7273
# Job group lifecycle management methods
7374
export("setJobGroup",
@@ -311,8 +312,10 @@ exportMethods("%<=>%",
311312
"lower",
312313
"lpad",
313314
"ltrim",
315+
"map_concat",
314316
"map_entries",
315317
"map_from_arrays",
318+
"map_from_entries",
316319
"map_keys",
317320
"map_values",
318321
"max",
@@ -351,6 +354,8 @@ exportMethods("%<=>%",
351354
"row_number",
352355
"rpad",
353356
"rtrim",
357+
"schema_of_csv",
358+
"schema_of_json",
354359
"second",
355360
"sha1",
356361
"sha2",

R/pkg/R/DataFrame.R

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -766,7 +766,6 @@ setMethod("repartition",
766766
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
767767
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
768768
#'}
769-
#'
770769
#' At least one partition-by expression must be specified.
771770
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
772771
#'
@@ -828,7 +827,6 @@ setMethod("repartitionByRange",
828827
#' toJSON
829828
#'
830829
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
831-
#'
832830
#' Each row is turned into a JSON document with columns as different fields.
833831
#' The returned SparkDataFrame has a single character column with the name \code{value}
834832
#'
@@ -2732,13 +2730,25 @@ setMethod("union",
27322730
dataFrame(unioned)
27332731
})
27342732

2735-
#' Return a new SparkDataFrame containing the union of rows
2733+
#' Return a new SparkDataFrame containing the union of rows.
27362734
#'
2737-
#' This is an alias for `union`.
2735+
#' This is an alias for \code{union}.
27382736
#'
2739-
#' @rdname union
2740-
#' @name unionAll
2737+
#' @param x a SparkDataFrame.
2738+
#' @param y a SparkDataFrame.
2739+
#' @return A SparkDataFrame containing the result of the unionAll operation.
2740+
#' @family SparkDataFrame functions
27412741
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
2742+
#' @rdname unionAll
2743+
#' @name unionAll
2744+
#' @seealso \link{union}
2745+
#' @examples
2746+
#'\dontrun{
2747+
#' sparkR.session()
2748+
#' df1 <- read.json(path)
2749+
#' df2 <- read.json(path2)
2750+
#' unionAllDF <- unionAll(df1, df2)
2751+
#' }
27422752
#' @note unionAll since 1.4.0
27432753
setMethod("unionAll",
27442754
signature(x = "SparkDataFrame", y = "SparkDataFrame"),

R/pkg/R/SQLContext.R

Lines changed: 136 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,70 @@ getDefaultSqlSource <- function() {
147147
l[["spark.sql.sources.default"]]
148148
}
149149

150+
writeToFileInArrow <- function(fileName, rdf, numPartitions) {
151+
requireNamespace1 <- requireNamespace
152+
153+
# R API in Arrow is not yet released in CRAN. CRAN requires to add the
154+
# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
155+
# or not. Therefore, it works around by avoiding direct requireNamespace.
156+
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
157+
if (requireNamespace1("arrow", quietly = TRUE)) {
158+
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
159+
RecordBatchStreamWriter <- get(
160+
"RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
161+
FileOutputStream <- get(
162+
"FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)
163+
164+
numPartitions <- if (!is.null(numPartitions)) {
165+
numToInt(numPartitions)
166+
} else {
167+
1
168+
}
169+
170+
rdf_slices <- if (numPartitions > 1) {
171+
split(rdf, makeSplits(numPartitions, nrow(rdf)))
172+
} else {
173+
list(rdf)
174+
}
175+
176+
stream_writer <- NULL
177+
tryCatch({
178+
for (rdf_slice in rdf_slices) {
179+
batch <- record_batch(rdf_slice)
180+
if (is.null(stream_writer)) {
181+
stream <- FileOutputStream(fileName)
182+
schema <- batch$schema
183+
stream_writer <- RecordBatchStreamWriter(stream, schema)
184+
}
185+
186+
stream_writer$write_batch(batch)
187+
}
188+
},
189+
finally = {
190+
if (!is.null(stream_writer)) {
191+
stream_writer$close()
192+
}
193+
})
194+
195+
} else {
196+
stop("'arrow' package should be installed.")
197+
}
198+
}
199+
200+
checkTypeRequirementForArrow <- function(dataHead, schema) {
201+
# Currenty Arrow optimization does not support raw for now.
202+
# Also, it does not support explicit float type set by users. It leads to
203+
# incorrect conversion. We will fall back to the path without Arrow optimization.
204+
if (any(sapply(dataHead, is.raw))) {
205+
stop("Arrow optimization with R DataFrame does not support raw type yet.")
206+
}
207+
if (inherits(schema, "structType")) {
208+
if (any(sapply(schema$fields(), function(x) x$dataType.toString() == "FloatType"))) {
209+
stop("Arrow optimization with R DataFrame does not support FloatType type yet.")
210+
}
211+
}
212+
}
213+
150214
#' Create a SparkDataFrame
151215
#'
152216
#' Converts R data.frame or list into SparkDataFrame.
@@ -172,36 +236,76 @@ getDefaultSqlSource <- function() {
172236
createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
173237
numPartitions = NULL) {
174238
sparkSession <- getSparkSession()
239+
arrowEnabled <- sparkR.conf("spark.sql.execution.arrow.enabled")[[1]] == "true"
240+
useArrow <- FALSE
241+
firstRow <- NULL
175242

176243
if (is.data.frame(data)) {
177-
# Convert data into a list of rows. Each row is a list.
178-
179-
# get the names of columns, they will be put into RDD
180-
if (is.null(schema)) {
181-
schema <- names(data)
182-
}
244+
# get the names of columns, they will be put into RDD
245+
if (is.null(schema)) {
246+
schema <- names(data)
247+
}
183248

184-
# get rid of factor type
185-
cleanCols <- function(x) {
186-
if (is.factor(x)) {
187-
as.character(x)
188-
} else {
189-
x
190-
}
249+
# get rid of factor type
250+
cleanCols <- function(x) {
251+
if (is.factor(x)) {
252+
as.character(x)
253+
} else {
254+
x
191255
}
256+
}
257+
data[] <- lapply(data, cleanCols)
258+
259+
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
260+
if (arrowEnabled) {
261+
useArrow <- tryCatch({
262+
stopifnot(length(data) > 0)
263+
dataHead <- head(data, 1)
264+
checkTypeRequirementForArrow(data, schema)
265+
fileName <- tempfile(pattern = "sparwriteToFileInArrowk-arrow", fileext = ".tmp")
266+
tryCatch({
267+
writeToFileInArrow(fileName, data, numPartitions)
268+
jrddInArrow <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
269+
"readArrowStreamFromFile",
270+
sparkSession,
271+
fileName)
272+
},
273+
finally = {
274+
# File might not be created.
275+
suppressWarnings(file.remove(fileName))
276+
})
277+
278+
firstRow <- do.call(mapply, append(args, dataHead))[[1]]
279+
TRUE
280+
},
281+
error = function(e) {
282+
warning(paste0("createDataFrame attempted Arrow optimization because ",
283+
"'spark.sql.execution.arrow.enabled' is set to true; however, ",
284+
"failed, attempting non-optimization. Reason: ",
285+
e))
286+
FALSE
287+
})
288+
}
192289

290+
if (!useArrow) {
291+
# Convert data into a list of rows. Each row is a list.
193292
# drop factors and wrap lists
194-
data <- setNames(lapply(data, cleanCols), NULL)
293+
data <- setNames(as.list(data), NULL)
195294

196295
# check if all columns have supported type
197296
lapply(data, getInternalType)
198297

199298
# convert to rows
200-
args <- list(FUN = list, SIMPLIFY = FALSE, USE.NAMES = FALSE)
201299
data <- do.call(mapply, append(args, data))
300+
if (length(data) > 0) {
301+
firstRow <- data[[1]]
302+
}
303+
}
202304
}
203305

204-
if (is.list(data)) {
306+
if (useArrow) {
307+
rdd <- jrddInArrow
308+
} else if (is.list(data)) {
205309
sc <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getJavaSparkContext", sparkSession)
206310
if (!is.null(numPartitions)) {
207311
rdd <- parallelize(sc, data, numSlices = numToInt(numPartitions))
@@ -215,14 +319,16 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
215319
}
216320

217321
if (is.null(schema) || (!inherits(schema, "structType") && is.null(names(schema)))) {
218-
row <- firstRDD(rdd)
322+
if (is.null(firstRow)) {
323+
firstRow <- firstRDD(rdd)
324+
}
219325
names <- if (is.null(schema)) {
220-
names(row)
326+
names(firstRow)
221327
} else {
222328
as.list(schema)
223329
}
224330
if (is.null(names)) {
225-
names <- lapply(1:length(row), function(x) {
331+
names <- lapply(1:length(firstRow), function(x) {
226332
paste("_", as.character(x), sep = "")
227333
})
228334
}
@@ -237,19 +343,24 @@ createDataFrame <- function(data, schema = NULL, samplingRatio = 1.0,
237343
nn
238344
})
239345

240-
types <- lapply(row, infer_type)
241-
fields <- lapply(1:length(row), function(i) {
346+
types <- lapply(firstRow, infer_type)
347+
fields <- lapply(1:length(firstRow), function(i) {
242348
structField(names[[i]], types[[i]], TRUE)
243349
})
244350
schema <- do.call(structType, fields)
245351
}
246352

247353
stopifnot(class(schema) == "structType")
248354

249-
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
250-
srdd <- callJMethod(jrdd, "rdd")
251-
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
252-
srdd, schema$jobj, sparkSession)
355+
if (useArrow) {
356+
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
357+
"toDataFrame", rdd, schema$jobj, sparkSession)
358+
} else {
359+
jrdd <- getJRDD(lapply(rdd, function(x) x), "row")
360+
srdd <- callJMethod(jrdd, "rdd")
361+
sdf <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "createDF",
362+
srdd, schema$jobj, sparkSession)
363+
}
253364
dataFrame(sdf)
254365
}
255366

R/pkg/R/context.R

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,33 @@ objectFile <- function(sc, path, minPartitions = NULL) {
8181
RDD(jrdd, "byte")
8282
}
8383

84+
makeSplits <- function(numSerializedSlices, length) {
85+
# Generate the slice ids to put each row
86+
# For instance, for numSerializedSlices of 22, length of 50
87+
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
88+
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
89+
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
90+
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
91+
if (numSerializedSlices > 0) {
92+
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
93+
# nolint start
94+
start <- trunc((as.numeric(x) * length) / numSerializedSlices)
95+
end <- trunc(((as.numeric(x) + 1) * length) / numSerializedSlices)
96+
# nolint end
97+
rep(start, end - start)
98+
}))
99+
} else {
100+
1
101+
}
102+
}
103+
84104
#' Create an RDD from a homogeneous list or vector.
85105
#'
86106
#' This function creates an RDD from a local homogeneous list in R. The elements
87107
#' in the list are split into \code{numSlices} slices and distributed to nodes
88108
#' in the cluster.
89109
#'
90-
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MB), the function
110+
#' If size of serialized slices is larger than spark.r.maxAllocationLimit or (200MiB), the function
91111
#' will write it to disk and send the file name to JVM. Also to make sure each slice is not
92112
#' larger than that limit, number of slices may be increased.
93113
#'
@@ -143,25 +163,7 @@ parallelize <- function(sc, coll, numSlices = 1) {
143163
# For large objects we make sure the size of each slice is also smaller than sizeLimit
144164
numSerializedSlices <- min(len, max(numSlices, ceiling(objectSize / sizeLimit)))
145165

146-
# Generate the slice ids to put each row
147-
# For instance, for numSerializedSlices of 22, length of 50
148-
# [1] 0 0 2 2 4 4 6 6 6 9 9 11 11 13 13 15 15 15 18 18 20 20 22 22 22
149-
# [26] 25 25 27 27 29 29 31 31 31 34 34 36 36 38 38 40 40 40 43 43 45 45 47 47 47
150-
# Notice the slice group with 3 slices (ie. 6, 15, 22) are roughly evenly spaced.
151-
# We are trying to reimplement the calculation in the positions method in ParallelCollectionRDD
152-
splits <- if (numSerializedSlices > 0) {
153-
unlist(lapply(0: (numSerializedSlices - 1), function(x) {
154-
# nolint start
155-
start <- trunc((as.numeric(x) * len) / numSerializedSlices)
156-
end <- trunc(((as.numeric(x) + 1) * len) / numSerializedSlices)
157-
# nolint end
158-
rep(start, end - start)
159-
}))
160-
} else {
161-
1
162-
}
163-
164-
slices <- split(coll, splits)
166+
slices <- split(coll, makeSplits(numSerializedSlices, len))
165167

166168
# Serialize each slice: obtain a list of raws, or a list of lists (slices) of
167169
# 2-tuples of raws

0 commit comments

Comments
 (0)