Skip to content

Commit 6197903

Browse files
author
Robert Kruszewski
committed
Merge branch 'master' into resync-apache
2 parents 37c725d + 3c6198c commit 6197903

File tree

372 files changed

+8531
-2589
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

372 files changed

+8531
-2589
lines changed

R/pkg/NAMESPACE

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ exportMethods("arrange",
169169
"transform",
170170
"union",
171171
"unionAll",
172+
"unionByName",
172173
"unique",
173174
"unpersist",
174175
"where",

R/pkg/R/DataFrame.R

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2683,7 +2683,7 @@ generateAliasesForIntersectedCols <- function (x, intersectedColNames, suffix) {
26832683
#' @rdname union
26842684
#' @name union
26852685
#' @aliases union,SparkDataFrame,SparkDataFrame-method
2686-
#' @seealso \link{rbind}
2686+
#' @seealso \link{rbind} \link{unionByName}
26872687
#' @export
26882688
#' @examples
26892689
#'\dontrun{
@@ -2714,6 +2714,40 @@ setMethod("unionAll",
27142714
union(x, y)
27152715
})
27162716

2717+
#' Return a new SparkDataFrame containing the union of rows, matched by column names
2718+
#'
2719+
#' Return a new SparkDataFrame containing the union of rows in this SparkDataFrame
2720+
#' and another SparkDataFrame. This is different from \code{union} function, and both
2721+
#' \code{UNION ALL} and \code{UNION DISTINCT} in SQL as column positions are not taken
2722+
#' into account. Input SparkDataFrames can have different data types in the schema.
2723+
#'
2724+
#' Note: This does not remove duplicate rows across the two SparkDataFrames.
2725+
#' This function resolves columns by name (not by position).
2726+
#'
2727+
#' @param x A SparkDataFrame
2728+
#' @param y A SparkDataFrame
2729+
#' @return A SparkDataFrame containing the result of the union.
2730+
#' @family SparkDataFrame functions
2731+
#' @rdname unionByName
2732+
#' @name unionByName
2733+
#' @aliases unionByName,SparkDataFrame,SparkDataFrame-method
2734+
#' @seealso \link{rbind} \link{union}
2735+
#' @export
2736+
#' @examples
2737+
#'\dontrun{
2738+
#' sparkR.session()
2739+
#' df1 <- select(createDataFrame(mtcars), "carb", "am", "gear")
2740+
#' df2 <- select(createDataFrame(mtcars), "am", "gear", "carb")
2741+
#' head(unionByName(df1, df2))
2742+
#' }
2743+
#' @note unionByName since 2.3.0
2744+
setMethod("unionByName",
2745+
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
2746+
function(x, y) {
2747+
unioned <- callJMethod(x@sdf, "unionByName", y@sdf)
2748+
dataFrame(unioned)
2749+
})
2750+
27172751
#' Union two or more SparkDataFrames
27182752
#'
27192753
#' Union two or more SparkDataFrames by row. As in R's \code{rbind}, this method
@@ -2730,7 +2764,7 @@ setMethod("unionAll",
27302764
#' @aliases rbind,SparkDataFrame-method
27312765
#' @rdname rbind
27322766
#' @name rbind
2733-
#' @seealso \link{union}
2767+
#' @seealso \link{union} \link{unionByName}
27342768
#' @export
27352769
#' @examples
27362770
#'\dontrun{

R/pkg/R/functions.R

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@ NULL
176176
#'
177177
#' @param x Column to compute on. Note the difference in the following methods:
178178
#' \itemize{
179-
#' \item \code{to_json}: it is the column containing the struct or array of the structs.
179+
#' \item \code{to_json}: it is the column containing the struct, array of the structs,
180+
#' the map or array of maps.
180181
#' \item \code{from_json}: it is the column containing the JSON string.
181182
#' }
182183
#' @param ... additional argument(s). In \code{to_json} and \code{from_json}, this contains
@@ -1700,8 +1701,9 @@ setMethod("to_date",
17001701
})
17011702

17021703
#' @details
1703-
#' \code{to_json}: Converts a column containing a \code{structType} or array of \code{structType}
1704-
#' into a Column of JSON string. Resolving the Column can fail if an unsupported type is encountered.
1704+
#' \code{to_json}: Converts a column containing a \code{structType}, array of \code{structType},
1705+
#' a \code{mapType} or array of \code{mapType} into a Column of JSON string.
1706+
#' Resolving the Column can fail if an unsupported type is encountered.
17051707
#'
17061708
#' @rdname column_collection_functions
17071709
#' @aliases to_json to_json,Column-method
@@ -1715,6 +1717,14 @@ setMethod("to_date",
17151717
#'
17161718
#' # Converts an array of structs into a JSON array
17171719
#' df2 <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
1720+
#' df2 <- mutate(df2, people_json = to_json(df2$people))
1721+
#'
1722+
#' # Converts a map into a JSON object
1723+
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
1724+
#' df2 <- mutate(df2, people_json = to_json(df2$people))
1725+
#'
1726+
#' # Converts an array of maps into a JSON array
1727+
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
17181728
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
17191729
#' @note to_json since 2.2.0
17201730
setMethod("to_json", signature(x = "Column"),

R/pkg/R/generics.R

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -769,6 +769,10 @@ setGeneric("union", function(x, y) { standardGeneric("union") })
769769
#' @export
770770
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })
771771

772+
#' @rdname unionByName
773+
#' @export
774+
setGeneric("unionByName", function(x, y) { standardGeneric("unionByName") })
775+
772776
#' @rdname unpersist
773777
#' @export
774778
setGeneric("unpersist", function(x, ...) { standardGeneric("unpersist") })

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1491,6 +1491,14 @@ test_that("column functions", {
14911491
j <- collect(select(df, alias(to_json(df$people), "json")))
14921492
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
14931493

1494+
df <- sql("SELECT map('name', 'Bob') as people")
1495+
j <- collect(select(df, alias(to_json(df$people), "json")))
1496+
expect_equal(j[order(j$json), ][1], "{\"name\":\"Bob\"}")
1497+
1498+
df <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
1499+
j <- collect(select(df, alias(to_json(df$people), "json")))
1500+
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
1501+
14941502
df <- read.json(mapTypeJsonPath)
14951503
j <- collect(select(df, alias(to_json(df$info), "json")))
14961504
expect_equal(j[order(j$json), ][1], "{\"age\":16,\"height\":176.5}")
@@ -2255,7 +2263,7 @@ test_that("isLocal()", {
22552263
expect_false(isLocal(df))
22562264
})
22572265

2258-
test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
2266+
test_that("union(), unionByName(), rbind(), except(), and intersect() on a DataFrame", {
22592267
df <- read.json(jsonPath)
22602268

22612269
lines <- c("{\"name\":\"Bob\", \"age\":24}",
@@ -2271,6 +2279,13 @@ test_that("union(), rbind(), except(), and intersect() on a DataFrame", {
22712279
expect_equal(first(unioned)$name, "Michael")
22722280
expect_equal(count(arrange(suppressWarnings(unionAll(df, df2)), df$age)), 6)
22732281

2282+
df1 <- select(df2, "age", "name")
2283+
unioned1 <- arrange(unionByName(df1, df), df1$age)
2284+
expect_is(unioned, "SparkDataFrame")
2285+
expect_equal(count(unioned), 6)
2286+
# Here, we test if 'Michael' in df is correctly mapped to the same name.
2287+
expect_equal(first(unioned)$name, "Michael")
2288+
22742289
unioned2 <- arrange(rbind(unioned, df, df2), df$age)
22752290
expect_is(unioned2, "SparkDataFrame")
22762291
expect_equal(count(unioned2), 12)

R/pkg/tests/run-all.R

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
4343
test_package("SparkR")
4444

4545
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
46+
# set random seed for predictable results. mostly for base's sample() in tree and classification
47+
set.seed(42)
4648
# for testthat 1.0.2 later, change reporter from "summary" to default_reporter()
4749
testthat:::run_tests("SparkR",
4850
file.path(sparkRDir, "pkg", "tests", "fulltests"),

appveyor.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ only_commits:
3232
- sql/core/src/main/scala/org/apache/spark/sql/api/r/
3333
- core/src/main/scala/org/apache/spark/api/r/
3434
- mllib/src/main/scala/org/apache/spark/ml/r/
35+
- core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
3536

3637
cache:
3738
- C:\Users\appveyor\.m2

bin/load-spark-env.cmd

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -35,21 +35,21 @@ if [%SPARK_ENV_LOADED%] == [] (
3535

3636
rem Setting SPARK_SCALA_VERSION if not already set.
3737

38-
rem set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
39-
rem set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
38+
set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11"
39+
set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12"
4040

4141
if [%SPARK_SCALA_VERSION%] == [] (
4242

43-
rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
44-
rem echo "Presence of build for multiple Scala versions detected."
45-
rem echo "Either clean one of them or, set SPARK_SCALA_VERSION=2.11 in spark-env.cmd."
46-
rem exit 1
47-
rem )
48-
rem if exist %ASSEMBLY_DIR2% (
43+
if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% (
44+
echo "Presence of build for multiple Scala versions detected."
45+
echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd."
46+
exit 1
47+
)
48+
if exist %ASSEMBLY_DIR2% (
4949
set SPARK_SCALA_VERSION=2.11
50-
rem ) else (
51-
rem set SPARK_SCALA_VERSION=2.12
52-
rem )
50+
) else (
51+
set SPARK_SCALA_VERSION=2.12
52+
)
5353
)
5454
exit /b 0
5555

bin/load-spark-env.sh

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,18 +46,18 @@ fi
4646

4747
if [ -z "$SPARK_SCALA_VERSION" ]; then
4848

49-
#ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
50-
#ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
49+
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
50+
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
5151

52-
#if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
53-
# echo -e "Presence of build for multiple Scala versions detected." 1>&2
54-
# echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION=2.11 in spark-env.sh.' 1>&2
55-
# exit 1
56-
#fi
52+
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
53+
echo -e "Presence of build for multiple Scala versions detected." 1>&2
54+
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
55+
exit 1
56+
fi
5757

58-
#if [ -d "$ASSEMBLY_DIR2" ]; then
58+
if [ -d "$ASSEMBLY_DIR2" ]; then
5959
export SPARK_SCALA_VERSION="2.11"
60-
#else
61-
# export SPARK_SCALA_VERSION="2.12"
62-
#fi
60+
else
61+
export SPARK_SCALA_VERSION="2.12"
62+
fi
6363
fi

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDBTypeInfo.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ Index parent() {
249249
* calculated only once, avoiding redundant work when multiple child indices of the
250250
* same parent index exist.
251251
*/
252-
byte[] childPrefix(Object value) throws Exception {
252+
byte[] childPrefix(Object value) {
253253
Preconditions.checkState(parent == null, "Not a parent index.");
254254
return buildKey(name, toParentKey(value));
255255
}
@@ -295,7 +295,7 @@ byte[] end(byte[] prefix) {
295295
}
296296

297297
/** The key for the end marker for entries with the given value. */
298-
byte[] end(byte[] prefix, Object value) throws Exception {
298+
byte[] end(byte[] prefix, Object value) {
299299
checkParent(prefix);
300300
return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
301301
: buildKey(name, toKey(value), END_MARKER);
@@ -313,7 +313,7 @@ byte[] entityKey(byte[] prefix, Object entity) throws Exception {
313313
return entityKey;
314314
}
315315

316-
private void updateCount(WriteBatch batch, byte[] key, long delta) throws Exception {
316+
private void updateCount(WriteBatch batch, byte[] key, long delta) {
317317
long updated = getCount(key) + delta;
318318
if (updated > 0) {
319319
batch.put(key, db.serializer.serialize(updated));
@@ -431,7 +431,7 @@ void remove(
431431
addOrRemove(batch, entity, null, null, naturalKey, prefix);
432432
}
433433

434-
long getCount(byte[] key) throws Exception {
434+
long getCount(byte[] key) {
435435
byte[] data = db.db().get(key);
436436
return data != null ? db.serializer.deserializeLong(data) : 0;
437437
}

0 commit comments

Comments
 (0)