Skip to content

Commit 0d4c3a0

Browse files
authored
Merge pull request apache-spark-on-k8s#414 from palantir/sr/new-master
New master
2 parents 6d02722 + 14ff6fd commit 0d4c3a0

File tree

596 files changed

+13796
-7279
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

596 files changed

+13796
-7279
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ jobs:
259259

260260
run-spark-docker-gradle-plugin-tests:
261261
<<: *test-defaults
262-
resource_class: small
262+
resource_class: medium
263263
steps:
264264
- *checkout-code
265265
- setup_remote_docker

R/pkg/DESCRIPTION

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Package: SparkR
22
Type: Package
3-
Version: 2.4.0
3+
Version: 3.0.0
44
Title: R Frontend for Apache Spark
55
Description: Provides an R Frontend for Apache Spark.
66
Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),

R/pkg/R/DataFrame.R

Lines changed: 26 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,6 @@ setMethod("createOrReplaceTempView",
503503
#' @param x A SparkDataFrame
504504
#' @param tableName A character vector containing the name of the table
505505
#'
506-
#' @family SparkDataFrame functions
507506
#' @seealso \link{createOrReplaceTempView}
508507
#' @rdname registerTempTable-deprecated
509508
#' @name registerTempTable
@@ -2955,6 +2954,9 @@ setMethod("exceptAll",
29552954
#' @param source a name for external data source.
29562955
#' @param mode one of 'append', 'overwrite', 'error', 'errorifexists', 'ignore'
29572956
#' save mode (it is 'error' by default)
2957+
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
2958+
#' system. If specified, the output is laid out on the file system similar
2959+
#' to Hive's partitioning scheme.
29582960
#' @param ... additional argument(s) passed to the method.
29592961
#'
29602962
#' @family SparkDataFrame functions
@@ -2966,13 +2968,13 @@ setMethod("exceptAll",
29662968
#' sparkR.session()
29672969
#' path <- "path/to/file.json"
29682970
#' df <- read.json(path)
2969-
#' write.df(df, "myfile", "parquet", "overwrite")
2971+
#' write.df(df, "myfile", "parquet", "overwrite", partitionBy = c("col1", "col2"))
29702972
#' saveDF(df, parquetPath2, "parquet", mode = "append", mergeSchema = TRUE)
29712973
#' }
29722974
#' @note write.df since 1.4.0
29732975
setMethod("write.df",
29742976
signature(df = "SparkDataFrame"),
2975-
function(df, path = NULL, source = NULL, mode = "error", ...) {
2977+
function(df, path = NULL, source = NULL, mode = "error", partitionBy = NULL, ...) {
29762978
if (!is.null(path) && !is.character(path)) {
29772979
stop("path should be character, NULL or omitted.")
29782980
}
@@ -2986,8 +2988,18 @@ setMethod("write.df",
29862988
if (is.null(source)) {
29872989
source <- getDefaultSqlSource()
29882990
}
2991+
cols <- NULL
2992+
if (!is.null(partitionBy)) {
2993+
if (!all(sapply(partitionBy, function(c) is.character(c)))) {
2994+
stop("All partitionBy column names should be characters.")
2995+
}
2996+
cols <- as.list(partitionBy)
2997+
}
29892998
write <- callJMethod(df@sdf, "write")
29902999
write <- callJMethod(write, "format", source)
3000+
if (!is.null(cols)) {
3001+
write <- callJMethod(write, "partitionBy", cols)
3002+
}
29913003
write <- setWriteOptions(write, path = path, mode = mode, ...)
29923004
write <- handledCallJMethod(write, "save")
29933005
})
@@ -3986,7 +3998,17 @@ setMethod("hint",
39863998
signature(x = "SparkDataFrame", name = "character"),
39873999
function(x, name, ...) {
39884000
parameters <- list(...)
3989-
stopifnot(all(sapply(parameters, is.character)))
4001+
if (!all(sapply(parameters, function(y) {
4002+
if (is.character(y) || is.numeric(y)) {
4003+
TRUE
4004+
} else if (is.list(y)) {
4005+
all(sapply(y, function(z) { is.character(z) || is.numeric(z) }))
4006+
} else {
4007+
FALSE
4008+
}
4009+
}))) {
4010+
stop("sql hint should be character, numeric, or list with character or numeric.")
4011+
}
39904012
jdf <- callJMethod(x@sdf, "hint", name, parameters)
39914013
dataFrame(jdf)
39924014
})

R/pkg/R/catalog.R

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ createExternalTable <- function(x, ...) {
6969
#' @param ... additional named parameters as options for the data source.
7070
#' @return A SparkDataFrame.
7171
#' @rdname createTable
72-
#' @seealso \link{createExternalTable}
7372
#' @examples
7473
#'\dontrun{
7574
#' sparkR.session()

R/pkg/R/context.R

Lines changed: 31 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -167,18 +167,30 @@ parallelize <- function(sc, coll, numSlices = 1) {
167167
# 2-tuples of raws
168168
serializedSlices <- lapply(slices, serialize, connection = NULL)
169169

170-
# The PRC backend cannot handle arguments larger than 2GB (INT_MAX)
170+
# The RPC backend cannot handle arguments larger than 2GB (INT_MAX)
171171
# If serialized data is safely less than that threshold we send it over the PRC channel.
172172
# Otherwise, we write it to a file and send the file name
173173
if (objectSize < sizeLimit) {
174174
jrdd <- callJStatic("org.apache.spark.api.r.RRDD", "createRDDFromArray", sc, serializedSlices)
175175
} else {
176-
fileName <- writeToTempFile(serializedSlices)
177-
jrdd <- tryCatch(callJStatic(
178-
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
179-
finally = {
180-
file.remove(fileName)
181-
})
176+
if (callJStatic("org.apache.spark.api.r.RUtils", "getEncryptionEnabled", sc)) {
177+
# the length of slices here is the parallelism to use in the jvm's sc.parallelize()
178+
parallelism <- as.integer(numSlices)
179+
jserver <- newJObject("org.apache.spark.api.r.RParallelizeServer", sc, parallelism)
180+
authSecret <- callJMethod(jserver, "secret")
181+
port <- callJMethod(jserver, "port")
182+
conn <- socketConnection(port = port, blocking = TRUE, open = "wb", timeout = 1500)
183+
doServerAuth(conn, authSecret)
184+
writeToConnection(serializedSlices, conn)
185+
jrdd <- callJMethod(jserver, "getResult")
186+
} else {
187+
fileName <- writeToTempFile(serializedSlices)
188+
jrdd <- tryCatch(callJStatic(
189+
"org.apache.spark.api.r.RRDD", "createRDDFromFile", sc, fileName, as.integer(numSlices)),
190+
finally = {
191+
file.remove(fileName)
192+
})
193+
}
182194
}
183195

184196
RDD(jrdd, "byte")
@@ -194,14 +206,21 @@ getMaxAllocationLimit <- function(sc) {
194206
))
195207
}
196208

209+
writeToConnection <- function(serializedSlices, conn) {
210+
tryCatch({
211+
for (slice in serializedSlices) {
212+
writeBin(as.integer(length(slice)), conn, endian = "big")
213+
writeBin(slice, conn, endian = "big")
214+
}
215+
}, finally = {
216+
close(conn)
217+
})
218+
}
219+
197220
writeToTempFile <- function(serializedSlices) {
198221
fileName <- tempfile()
199222
conn <- file(fileName, "wb")
200-
for (slice in serializedSlices) {
201-
writeBin(as.integer(length(slice)), conn, endian = "big")
202-
writeBin(slice, conn, endian = "big")
203-
}
204-
close(conn)
223+
writeToConnection(serializedSlices, conn)
205224
fileName
206225
}
207226

R/pkg/R/functions.R

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -198,8 +198,9 @@ NULL
198198
#' }
199199
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
200200
#' additional named properties to control how it is converted, accepts the same
201-
#' options as the JSON data source. In \code{arrays_zip}, this contains additional
202-
#' Columns of arrays to be merged.
201+
#' options as the JSON data source. Additionally \code{to_json} supports the "pretty"
202+
#' option which enables pretty JSON generation. In \code{arrays_zip}, this contains
203+
#' additional Columns of arrays to be merged.
203204
#' @name column_collection_functions
204205
#' @rdname column_collection_functions
205206
#' @family collection functions
@@ -1699,8 +1700,8 @@ setMethod("to_date",
16991700
})
17001701

17011702
#' @details
1702-
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
1703-
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
1703+
#' \code{to_json}: Converts a column containing a \code{structType}, a \code{mapType}
1704+
#' or an \code{arrayType} into a Column of JSON string.
17041705
#' Resolving the Column can fail if an unsupported type is encountered.
17051706
#'
17061707
#' @rdname column_collection_functions
@@ -2203,9 +2204,16 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
22032204
})
22042205

22052206
#' @details
2206-
#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2207-
#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1'
2208-
#' would yield '2017-07-14 03:40:00.0'.
2207+
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
2208+
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
2209+
#' timestamp in UTC, and renders that timestamp as a timestamp in the given time zone.
2210+
#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not
2211+
#' timezone-agnostic. So in Spark this function just shift the timestamp value from UTC timezone to
2212+
#' the given timezone.
2213+
#' This function may return confusing result if the input is a string with timezone, e.g.
2214+
#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to
2215+
#' timestamp according to the timezone in the string, and finally display the result by converting
2216+
#' the timestamp to string according to the session local timezone.
22092217
#'
22102218
#' @rdname column_datetime_diff_functions
22112219
#'
@@ -2261,9 +2269,16 @@ setMethod("next_day", signature(y = "Column", x = "character"),
22612269
})
22622270

22632271
#' @details
2264-
#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2265-
#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1'
2266-
#' would yield '2017-07-14 01:40:00.0'.
2272+
#' \code{to_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
2273+
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
2274+
#' timestamp in the given timezone, and renders that timestamp as a timestamp in UTC.
2275+
#' However, timestamp in Spark represents number of microseconds from the Unix epoch, which is not
2276+
#' timezone-agnostic. So in Spark this function just shift the timestamp value from the given
2277+
#' timezone to UTC timezone.
2278+
#' This function may return confusing result if the input is a string with timezone, e.g.
2279+
#' (\code{2018-03-13T06:18:23+00:00}). The reason is that, Spark firstly cast the string to
2280+
#' timestamp according to the timezone in the string, and finally display the result by converting
2281+
#' the timestamp to string according to the session local timezone.
22672282
#'
22682283
#' @rdname column_datetime_diff_functions
22692284
#' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method

R/pkg/R/sparkR.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,8 @@ sparkConfToSubmitOps[["spark.driver.extraLibraryPath"]] <- "--driver-library-pat
626626
sparkConfToSubmitOps[["spark.master"]] <- "--master"
627627
sparkConfToSubmitOps[["spark.yarn.keytab"]] <- "--keytab"
628628
sparkConfToSubmitOps[["spark.yarn.principal"]] <- "--principal"
629+
sparkConfToSubmitOps[["spark.kerberos.keytab"]] <- "--keytab"
630+
sparkConfToSubmitOps[["spark.kerberos.principal"]] <- "--principal"
629631

630632

631633
# Utility function that returns Spark Submit arguments as a string

R/pkg/tests/fulltests/test_Serde.R

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,3 +124,35 @@ test_that("SerDe of list of lists", {
124124
})
125125

126126
sparkR.session.stop()
127+
128+
# Note that this test should be at the end of tests since the configruations used here are not
129+
# specific to sessions, and the Spark context is restarted.
130+
test_that("createDataFrame large objects", {
131+
for (encryptionEnabled in list("true", "false")) {
132+
# To simulate a large object scenario, we set spark.r.maxAllocationLimit to a smaller value
133+
conf <- list(spark.r.maxAllocationLimit = "100",
134+
spark.io.encryption.enabled = encryptionEnabled)
135+
136+
suppressWarnings(sparkR.session(master = sparkRTestMaster,
137+
sparkConfig = conf,
138+
enableHiveSupport = FALSE))
139+
140+
sc <- getSparkContext()
141+
actual <- callJStatic("org.apache.spark.api.r.RUtils", "getEncryptionEnabled", sc)
142+
expected <- as.logical(encryptionEnabled)
143+
expect_equal(actual, expected)
144+
145+
tryCatch({
146+
# suppress warnings from dot in the field names. See also SPARK-21536.
147+
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
148+
expect_equal(getNumPartitions(df), 3)
149+
expect_equal(dim(df), dim(iris))
150+
151+
df <- createDataFrame(cars, numPartitions = 3)
152+
expect_equal(collect(df), cars)
153+
},
154+
finally = {
155+
sparkR.stop()
156+
})
157+
}
158+
})

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -316,18 +316,6 @@ test_that("create DataFrame from RDD", {
316316
unsetHiveContext()
317317
})
318318

319-
test_that("createDataFrame uses files for large objects", {
320-
# To simulate a large file scenario, we set spark.r.maxAllocationLimit to a smaller value
321-
conf <- callJMethod(sparkSession, "conf")
322-
callJMethod(conf, "set", "spark.r.maxAllocationLimit", "100")
323-
df <- suppressWarnings(createDataFrame(iris, numPartitions = 3))
324-
expect_equal(getNumPartitions(df), 3)
325-
326-
# Resetting the conf back to default value
327-
callJMethod(conf, "set", "spark.r.maxAllocationLimit", toString(.Machine$integer.max / 10))
328-
expect_equal(dim(df), dim(iris))
329-
})
330-
331319
test_that("read/write csv as DataFrame", {
332320
if (windows_with_hadoop()) {
333321
csvPath <- tempfile(pattern = "sparkr-test", fileext = ".csv")
@@ -1686,6 +1674,15 @@ test_that("column functions", {
16861674
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
16871675
}
16881676

1677+
# Test to_json() supports arrays of primitive types and arrays
1678+
df <- sql("SELECT array(19, 42, 70) as age")
1679+
j <- collect(select(df, alias(to_json(df$age), "json")))
1680+
expect_equal(j[order(j$json), ][1], "[19,42,70]")
1681+
1682+
df <- sql("SELECT array(array(1, 2), array(3, 4)) as matrix")
1683+
j <- collect(select(df, alias(to_json(df$matrix), "json")))
1684+
expect_equal(j[order(j$json), ][1], "[[1,2],[3,4]]")
1685+
16891686
# passing option
16901687
df <- as.DataFrame(list(list("col" = "{\"date\":\"21/10/2014\"}")))
16911688
schema2 <- structType(structField("date", "date"))
@@ -2410,6 +2407,15 @@ test_that("join(), crossJoin() and merge() on a DataFrame", {
24102407
expect_true(any(grepl("BroadcastHashJoin", execution_plan_broadcast)))
24112408
})
24122409

2410+
test_that("test hint", {
2411+
df <- sql("SELECT * FROM range(10e10)")
2412+
hintList <- list("hint2", "hint3", "hint4")
2413+
execution_plan_hint <- capture.output(
2414+
explain(hint(df, "hint1", 1.23456, "aaaaaaaaaa", hintList), TRUE)
2415+
)
2416+
expect_true(any(grepl("1.23456, aaaaaaaaaa", execution_plan_hint)))
2417+
})
2418+
24132419
test_that("toJSON() on DataFrame", {
24142420
df <- as.DataFrame(cars)
24152421
df_json <- toJSON(df)
@@ -2695,8 +2701,16 @@ test_that("read/write text files", {
26952701
expect_equal(colnames(df2), c("value"))
26962702
expect_equal(count(df2), count(df) * 2)
26972703

2704+
df3 <- createDataFrame(list(list(1L, "1"), list(2L, "2"), list(1L, "1"), list(2L, "2")),
2705+
schema = c("key", "value"))
2706+
textPath3 <- tempfile(pattern = "textPath3", fileext = ".txt")
2707+
write.df(df3, textPath3, "text", mode = "overwrite", partitionBy = "key")
2708+
df4 <- read.df(textPath3, "text")
2709+
expect_equal(count(df3), count(df4))
2710+
26982711
unlink(textPath)
26992712
unlink(textPath2)
2713+
unlink(textPath3)
27002714
})
27012715

27022716
test_that("read/write text files - compression option", {

0 commit comments

Comments
 (0)