Skip to content

Commit 8d0d856

Browse files
authored
Merge pull request apache-spark-on-k8s#263 from palantir/rk/resync
Merge upstream apache
2 parents 2817552 + 29c4250 commit 8d0d856

File tree

171 files changed

+8278
-1291
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

171 files changed

+8278
-1291
lines changed

R/pkg/R/DataFrame.R

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -986,10 +986,10 @@ setMethod("unique",
986986
#' @param x A SparkDataFrame
987987
#' @param withReplacement Sampling with replacement or not
988988
#' @param fraction The (rough) sample target fraction
989-
#' @param seed Randomness seed value
989+
#' @param seed Randomness seed value. Default is a random seed.
990990
#'
991991
#' @family SparkDataFrame functions
992-
#' @aliases sample,SparkDataFrame,logical,numeric-method
992+
#' @aliases sample,SparkDataFrame-method
993993
#' @rdname sample
994994
#' @name sample
995995
#' @export
@@ -998,33 +998,47 @@ setMethod("unique",
998998
#' sparkR.session()
999999
#' path <- "path/to/file.json"
10001000
#' df <- read.json(path)
1001+
#' collect(sample(df, fraction = 0.5))
10011002
#' collect(sample(df, FALSE, 0.5))
1002-
#' collect(sample(df, TRUE, 0.5))
1003+
#' collect(sample(df, TRUE, 0.5, seed = 3))
10031004
#'}
10041005
#' @note sample since 1.4.0
10051006
setMethod("sample",
1006-
signature(x = "SparkDataFrame", withReplacement = "logical",
1007-
fraction = "numeric"),
1008-
function(x, withReplacement, fraction, seed) {
1009-
if (fraction < 0.0) stop(cat("Negative fraction value:", fraction))
1007+
signature(x = "SparkDataFrame"),
1008+
function(x, withReplacement = FALSE, fraction, seed) {
1009+
if (!is.numeric(fraction)) {
1010+
stop(paste("fraction must be numeric; however, got", class(fraction)))
1011+
}
1012+
if (!is.logical(withReplacement)) {
1013+
stop(paste("withReplacement must be logical; however, got", class(withReplacement)))
1014+
}
1015+
10101016
if (!missing(seed)) {
1017+
if (is.null(seed)) {
1018+
stop("seed must not be NULL or NA; however, got NULL")
1019+
}
1020+
if (is.na(seed)) {
1021+
stop("seed must not be NULL or NA; however, got NA")
1022+
}
1023+
10111024
# TODO : Figure out how to send integer as java.lang.Long to JVM so
10121025
# we can send seed as an argument through callJMethod
1013-
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction, as.integer(seed))
1026+
sdf <- handledCallJMethod(x@sdf, "sample", as.logical(withReplacement),
1027+
as.numeric(fraction), as.integer(seed))
10141028
} else {
1015-
sdf <- callJMethod(x@sdf, "sample", withReplacement, fraction)
1029+
sdf <- handledCallJMethod(x@sdf, "sample",
1030+
as.logical(withReplacement), as.numeric(fraction))
10161031
}
10171032
dataFrame(sdf)
10181033
})
10191034

10201035
#' @rdname sample
1021-
#' @aliases sample_frac,SparkDataFrame,logical,numeric-method
1036+
#' @aliases sample_frac,SparkDataFrame-method
10221037
#' @name sample_frac
10231038
#' @note sample_frac since 1.4.0
10241039
setMethod("sample_frac",
1025-
signature(x = "SparkDataFrame", withReplacement = "logical",
1026-
fraction = "numeric"),
1027-
function(x, withReplacement, fraction, seed) {
1040+
signature(x = "SparkDataFrame"),
1041+
function(x, withReplacement = FALSE, fraction, seed) {
10281042
sample(x, withReplacement, fraction, seed)
10291043
})
10301044

R/pkg/R/functions.R

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2226,8 +2226,9 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
22262226
})
22272227

22282228
#' @details
2229-
#' \code{from_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day in UTC,
2230-
#' returns another timestamp that corresponds to the same time of day in the given timezone.
2229+
#' \code{from_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2230+
#' time in UTC, and renders that time as a timestamp in the given time zone. For example, 'GMT+1'
2231+
#' would yield '2017-07-14 03:40:00.0'.
22312232
#'
22322233
#' @rdname column_datetime_diff_functions
22332234
#'
@@ -2286,8 +2287,9 @@ setMethod("next_day", signature(y = "Column", x = "character"),
22862287
})
22872288

22882289
#' @details
2289-
#' \code{to_utc_timestamp}: Given a timestamp, which corresponds to a certain time of day
2290-
#' in the given timezone, returns another timestamp that corresponds to the same time of day in UTC.
2290+
#' \code{to_utc_timestamp}: Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a
2291+
#' time in the given time zone, and renders that time as a timestamp in UTC. For example, 'GMT+1'
2292+
#' would yield '2017-07-14 01:40:00.0'.
22912293
#'
22922294
#' @rdname column_datetime_diff_functions
22932295
#' @aliases to_utc_timestamp to_utc_timestamp,Column,character-method

R/pkg/R/generics.R

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
645645
#' @rdname sample
646646
#' @export
647647
setGeneric("sample",
648-
function(x, withReplacement, fraction, seed) {
648+
function(x, withReplacement = FALSE, fraction, seed) {
649649
standardGeneric("sample")
650650
})
651651

@@ -656,7 +656,7 @@ setGeneric("rollup", function(x, ...) { standardGeneric("rollup") })
656656
#' @rdname sample
657657
#' @export
658658
setGeneric("sample_frac",
659-
function(x, withReplacement, fraction, seed) { standardGeneric("sample_frac") })
659+
function(x, withReplacement = FALSE, fraction, seed) { standardGeneric("sample_frac") })
660660

661661
#' @rdname sampleBy
662662
#' @export

R/pkg/tests/fulltests/test_sparkSQL.R

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1116,6 +1116,20 @@ test_that("sample on a DataFrame", {
11161116
sampled3 <- sample_frac(df, FALSE, 0.1, 0) # set seed for predictable result
11171117
expect_true(count(sampled3) < 3)
11181118

1119+
# Different arguments
1120+
df <- createDataFrame(as.list(seq(10)))
1121+
expect_equal(count(sample(df, fraction = 0.5, seed = 3)), 4)
1122+
expect_equal(count(sample(df, withReplacement = TRUE, fraction = 0.5, seed = 3)), 2)
1123+
expect_equal(count(sample(df, fraction = 1.0)), 10)
1124+
expect_equal(count(sample(df, fraction = 1L)), 10)
1125+
expect_equal(count(sample(df, FALSE, fraction = 1.0)), 10)
1126+
1127+
expect_error(sample(df, fraction = "a"), "fraction must be numeric")
1128+
expect_error(sample(df, "a", fraction = 0.1), "however, got character")
1129+
expect_error(sample(df, fraction = 1, seed = NA), "seed must not be NULL or NA; however, got NA")
1130+
expect_error(sample(df, fraction = -1.0),
1131+
"illegal argument - requirement failed: Sampling fraction \\(-1.0\\)")
1132+
11191133
# nolint start
11201134
# Test base::sample is working
11211135
#expect_equal(length(sample(1:12)), 12)

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@
187187
<plugin>
188188
<groupId>org.apache.maven.plugins</groupId>
189189
<artifactId>maven-assembly-plugin</artifactId>
190-
<version>3.0.0</version>
190+
<version>3.1.0</version>
191191
<executions>
192192
<execution>
193193
<id>dist</id>

common/network-common/src/main/java/org/apache/spark/network/util/ByteUnit.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717
package org.apache.spark.network.util;
1818

1919
public enum ByteUnit {
20-
BYTE (1),
21-
KiB (1024L),
22-
MiB ((long) Math.pow(1024L, 2L)),
23-
GiB ((long) Math.pow(1024L, 3L)),
24-
TiB ((long) Math.pow(1024L, 4L)),
25-
PiB ((long) Math.pow(1024L, 5L));
20+
BYTE(1),
21+
KiB(1024L),
22+
MiB((long) Math.pow(1024L, 2L)),
23+
GiB((long) Math.pow(1024L, 3L)),
24+
TiB((long) Math.pow(1024L, 4L)),
25+
PiB((long) Math.pow(1024L, 5L));
2626

2727
ByteUnit(long multiplier) {
2828
this.multiplier = multiplier;

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalShuffleClient.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.nio.ByteBuffer;
2222
import java.util.List;
2323

24+
import com.codahale.metrics.MetricSet;
2425
import com.google.common.collect.Lists;
2526
import org.slf4j.Logger;
2627
import org.slf4j.LoggerFactory;
@@ -117,6 +118,12 @@ public void fetchBlocks(
117118
}
118119
}
119120

121+
@Override
122+
public MetricSet shuffleMetrics() {
123+
checkInit();
124+
return clientFactory.getAllMetrics();
125+
}
126+
120127
/**
121128
* Registers this executor with an external shuffle server. This registration is required to
122129
* inform the shuffle server about where and how we store our shuffle files.
@@ -140,6 +147,7 @@ public void registerWithShuffleServer(
140147

141148
@Override
142149
public void close() {
150+
checkInit();
143151
clientFactory.close();
144152
}
145153
}

common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ShuffleClient.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@
1818
package org.apache.spark.network.shuffle;
1919

2020
import java.io.Closeable;
21+
import java.util.Collections;
22+
23+
import com.codahale.metrics.MetricSet;
2124

2225
/** Provides an interface for reading shuffle files, either from an Executor or external service. */
2326
public abstract class ShuffleClient implements Closeable {
@@ -52,4 +55,13 @@ public abstract void fetchBlocks(
5255
String[] blockIds,
5356
BlockFetchingListener listener,
5457
TempShuffleFileManager tempShuffleFileManager);
58+
59+
/**
60+
* Get the shuffle MetricsSet from ShuffleClient, this will be used in MetricsSystem to
61+
* get the Shuffle related metrics.
62+
*/
63+
public MetricSet shuffleMetrics() {
64+
// Return an empty MetricSet by default.
65+
return () -> Collections.emptyMap();
66+
}
5567
}

common/sketch/pom.xml

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,17 @@
5656
<build>
5757
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
5858
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
59-
<pluginManagement>
60-
<plugins>
61-
<plugin>
62-
<groupId>net.alchim31.maven</groupId>
63-
<artifactId>scala-maven-plugin</artifactId>
64-
<version>3.2.2</version>
65-
<configuration>
66-
<javacArgs combine.children="append">
67-
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
68-
<javacArg>-XDignore.symbol.file</javacArg>
69-
</javacArgs>
70-
</configuration>
71-
</plugin>
72-
</plugins>
73-
</pluginManagement>
59+
<plugins>
60+
<plugin>
61+
<groupId>net.alchim31.maven</groupId>
62+
<artifactId>scala-maven-plugin</artifactId>
63+
<configuration>
64+
<javacArgs combine.children="append">
65+
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
66+
<javacArg>-XDignore.symbol.file</javacArg>
67+
</javacArgs>
68+
</configuration>
69+
</plugin>
70+
</plugins>
7471
</build>
7572
</project>

common/unsafe/pom.xml

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -93,20 +93,17 @@
9393
<build>
9494
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
9595
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
96-
<pluginManagement>
97-
<plugins>
98-
<plugin>
99-
<groupId>net.alchim31.maven</groupId>
100-
<artifactId>scala-maven-plugin</artifactId>
101-
<version>3.2.2</version>
102-
<configuration>
103-
<javacArgs combine.children="append">
104-
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
105-
<javacArg>-XDignore.symbol.file</javacArg>
106-
</javacArgs>
107-
</configuration>
108-
</plugin>
109-
</plugins>
110-
</pluginManagement>
96+
<plugins>
97+
<plugin>
98+
<groupId>net.alchim31.maven</groupId>
99+
<artifactId>scala-maven-plugin</artifactId>
100+
<configuration>
101+
<javacArgs combine.children="append">
102+
<!-- This option is needed to suppress warnings from sun.misc.Unsafe usage -->
103+
<javacArg>-XDignore.symbol.file</javacArg>
104+
</javacArgs>
105+
</configuration>
106+
</plugin>
107+
</plugins>
111108
</build>
112109
</project>

0 commit comments

Comments
 (0)