Skip to content

Commit 20c66ef

Browse files
committed
resolve conflicts
1 parent 6b87bfa commit 20c66ef

File tree

24 files changed

+153
-1526
lines changed

24 files changed

+153
-1526
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ jobs:
259259

260260
run-spark-docker-gradle-plugin-tests:
261261
<<: *test-defaults
262-
resource_class: small
262+
resource_class: medium
263263
steps:
264264
- *checkout-code
265265
- setup_remote_docker

R/pkg/tests/run-all.R

Lines changed: 25 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -24,57 +24,20 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
2424
# Turn all warnings into errors
2525
options("warn" = 2)
2626

27-
<<<<<<< HEAD
28-
# Setup global test environment
29-
# Install Spark first to set SPARK_HOME
30-
install.spark()
31-
=======
3227
if (.Platform$OS.type == "windows") {
3328
Sys.setenv(TZ = "GMT")
3429
}
3530

3631
# Setup global test environment
3732
# Install Spark first to set SPARK_HOME
38-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
39-
40-
# NOTE(shivaram): We set overwrite to handle any old tar.gz files or directories left behind on
41-
# CRAN machines. For Jenkins we should already have SPARK_HOME set.
42-
install.spark(overwrite = TRUE)
33+
install.spark()
4334

4435
sparkRDir <- file.path(Sys.getenv("SPARK_HOME"), "R")
4536
sparkRWhitelistSQLDirs <- c("spark-warehouse", "metastore_db")
4637
invisible(lapply(sparkRWhitelistSQLDirs,
4738
function(x) { unlink(file.path(sparkRDir, x), recursive = TRUE, force = TRUE)}))
4839
sparkRFilesBefore <- list.files(path = sparkRDir, all.files = TRUE)
4940

50-
<<<<<<< HEAD
51-
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
52-
if (identical(Sys.getenv("CONDA_TESTS"), "true")) {
53-
summaryReporter <- ProgressReporter$new()
54-
options(testthat.output_file = "target/R/R/conda/r-tests.xml")
55-
junitReporter <- JunitReporter$new()
56-
# set random seed for predictable results. mostly for base's sample() in tree and classification
57-
set.seed(42)
58-
testthat:::test_package_dir("SparkR",
59-
file.path(sparkRDir, "pkg", "tests", "condatests"),
60-
NULL,
61-
MultiReporter$new(reporters = list(summaryReporter, junitReporter)))
62-
} else {
63-
summaryReporter <- ProgressReporter$new()
64-
options(testthat.output_file = "target/R/R/r-tests.xml")
65-
junitReporter <- JunitReporter$new()
66-
reporter <- MultiReporter$new(reporters = list(summaryReporter, junitReporter))
67-
# set random seed for predictable results. mostly for base's sample() in tree and classification
68-
test_package("SparkR", reporter = reporter)
69-
set.seed(42)
70-
testthat:::test_package_dir("SparkR",
71-
file.path(sparkRDir, "pkg", "tests", "fulltests"),
72-
NULL,
73-
reporter)
74-
}
75-
}
76-
77-
=======
7841
sparkRTestMaster <- "local[1]"
7942
sparkRTestConfig <- list()
8043
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
@@ -89,19 +52,30 @@ if (identical(Sys.getenv("NOT_CRAN"), "true")) {
8952
spark.executor.extraJavaOptions = tmpArg)
9053
}
9154

92-
test_package("SparkR")
93-
9455
if (identical(Sys.getenv("NOT_CRAN"), "true")) {
95-
# set random seed for predictable results. mostly for base's sample() in tree and classification
96-
set.seed(42)
97-
# for testthat 1.0.2 later, change reporter from "summary" to default_reporter()
98-
testthat:::run_tests("SparkR",
99-
file.path(sparkRDir, "pkg", "tests", "fulltests"),
100-
NULL,
101-
"summary")
56+
if (identical(Sys.getenv("CONDA_TESTS"), "true")) {
57+
summaryReporter <- ProgressReporter$new()
58+
options(testthat.output_file = "target/R/R/conda/r-tests.xml")
59+
junitReporter <- JunitReporter$new()
60+
# set random seed for predictable results. mostly for base's sample() in tree and classification
61+
set.seed(42)
62+
testthat:::test_package_dir("SparkR",
63+
file.path(sparkRDir, "pkg", "tests", "condatests"),
64+
NULL,
65+
MultiReporter$new(reporters = list(summaryReporter, junitReporter)))
66+
} else {
67+
summaryReporter <- ProgressReporter$new()
68+
options(testthat.output_file = "target/R/R/r-tests.xml")
69+
junitReporter <- JunitReporter$new()
70+
reporter <- MultiReporter$new(reporters = list(summaryReporter, junitReporter))
71+
# set random seed for predictable results. mostly for base's sample() in tree and classification
72+
test_package("SparkR", reporter = reporter)
73+
set.seed(42)
74+
testthat:::test_package_dir("SparkR",
75+
file.path(sparkRDir, "pkg", "tests", "fulltests"),
76+
NULL,
77+
reporter)
78+
}
10279
}
10380

104-
SparkR:::uninstallDownloadedSpark()
105-
106-
}
107-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
81+
}

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
import java.io.File;
2121
import java.io.IOException;
22-
import java.lang.reflect.Method;
2322
import java.nio.ByteBuffer;
2423
import java.nio.charset.StandardCharsets;
2524
import java.util.List;
@@ -36,10 +35,6 @@
3635
import org.apache.hadoop.fs.FileSystem;
3736
import org.apache.hadoop.fs.Path;
3837
import org.apache.hadoop.fs.permission.FsPermission;
39-
<<<<<<< HEAD
40-
import org.apache.hadoop.metrics2.MetricsSource;
41-
=======
42-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
4338
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
4439
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
4540
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -176,31 +171,13 @@ protected void serviceInit(Configuration conf) throws Exception {
176171
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
177172

178173
// register metrics on the block handler into the Node Manager's metrics system.
179-
<<<<<<< HEAD
180-
try {
181-
YarnShuffleServiceMetrics serviceMetrics = new YarnShuffleServiceMetrics(
182-
blockHandler.getAllMetrics());
183-
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
184-
185-
Method registerSourceMethod = metricsSystem.getClass().getDeclaredMethod("registerSource",
186-
String.class, String.class, MetricsSource.class);
187-
registerSourceMethod.setAccessible(true);
188-
registerSourceMethod.invoke(metricsSystem, "shuffleService", "Metrics on the Spark " +
189-
"Shuffle Service", serviceMetrics);
190-
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
191-
} catch (Exception e) {
192-
logger.warn("Unable to register Spark Shuffle Service metrics with Node Manager; " +
193-
"proceeding without metrics", e);
194-
}
195-
=======
196174
YarnShuffleServiceMetrics serviceMetrics =
197175
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
198176

199177
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
200178
metricsSystem.register(
201179
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
202180
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
203-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
204181

205182
// If authentication is enabled, set up the shuffle server to use a
206183
// special RPC handler that filters out unauthenticated fetch requests

common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java

Lines changed: 37 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -17,40 +17,14 @@
1717

1818
package org.apache.spark.network.yarn;
1919

20-
<<<<<<< HEAD
21-
import com.codahale.metrics.*;
22-
import com.google.common.annotations.VisibleForTesting;
23-
import com.google.common.collect.ImmutableMap;
24-
=======
2520
import java.util.Map;
2621

2722
import com.codahale.metrics.*;
28-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
2923
import org.apache.hadoop.metrics2.MetricsCollector;
3024
import org.apache.hadoop.metrics2.MetricsInfo;
3125
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
3226
import org.apache.hadoop.metrics2.MetricsSource;
3327

34-
<<<<<<< HEAD
35-
import java.util.Map;
36-
import java.util.concurrent.TimeUnit;
37-
38-
/**
39-
* Modeled off of YARN's NodeManagerMetrics.
40-
*/
41-
public class YarnShuffleServiceMetrics implements MetricsSource {
42-
43-
// Converting from the dropwizard-metrics default of nanoseconds into milliseconds to match how
44-
// MetricsServlet serializes times (to milliseconds) configured via the MetricsModule passed into
45-
// its Jackson ObjectMapper. Without this rate factor applied, the Timer metrics from
46-
// ExternalShuffleBlockManager#ShuffleMetrics with "Millis" suffixes are misleading, as they
47-
// would otherwise contain values in nanoseconds units
48-
private static final double rateFactor = (double) TimeUnit.MILLISECONDS.toNanos(1L);
49-
50-
private final MetricSet metricSet;
51-
52-
public YarnShuffleServiceMetrics(MetricSet metricSet) {
53-
=======
5428
/**
5529
* Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}
5630
* to hadoop metrics system.
@@ -61,7 +35,6 @@ class YarnShuffleServiceMetrics implements MetricsSource {
6135
private final MetricSet metricSet;
6236

6337
YarnShuffleServiceMetrics(MetricSet metricSet) {
64-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
6538
this.metricSet = metricSet;
6639
}
6740

@@ -73,118 +46,64 @@ class YarnShuffleServiceMetrics implements MetricsSource {
7346
*/
7447
@Override
7548
public void getMetrics(MetricsCollector collector, boolean all) {
76-
<<<<<<< HEAD
77-
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("shuffleService");
78-
=======
7949
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
80-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
8150

8251
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
8352
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
8453
}
8554
}
8655

87-
<<<<<<< HEAD
88-
private static void addSnapshotToMetricRecordBuilder(Snapshot snapshot,
89-
MetricsRecordBuilder builder,
90-
String name,
91-
String metricType) {
92-
93-
ImmutableMap<String, Double> doubleValues = ImmutableMap.<String, Double>builder()
94-
.put("median", snapshot.getMedian())
95-
.put("mean", snapshot.getMean())
96-
.put("75th", snapshot.get75thPercentile())
97-
.put("95th", snapshot.get95thPercentile())
98-
.put("98th", snapshot.get98thPercentile())
99-
.put("99th", snapshot.get99thPercentile())
100-
.put("999th", snapshot.get999thPercentile())
101-
.build();
102-
103-
ImmutableMap<String, Long> longValues = ImmutableMap.<String, Long>builder()
104-
.put("min", snapshot.getMin())
105-
.put("max", snapshot.getMax())
106-
.build();
107-
108-
for (Map.Entry<String, Double> entry : doubleValues.entrySet()) {
109-
builder.addGauge(
110-
new ShuffleServiceMetricsInfo(name + "_" + entry.getKey(),
111-
entry.getKey() + " of " + metricType + " " + name),
112-
entry.getValue() / rateFactor);
113-
}
114-
115-
for (Map.Entry<String, Long> entry : longValues.entrySet()) {
116-
builder.addGauge(
117-
new ShuffleServiceMetricsInfo(name + "_" + entry.getKey(),
118-
entry.getKey() + " of " + metricType + " " + name),
119-
entry.getValue() / rateFactor);
120-
}
121-
122-
}
123-
124-
@VisibleForTesting
125-
public static void collectMetric(
126-
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
127-
128-
// The metric types used in ExternalShuffleBlockHandler.ShuffleMetrics
129-
if (metric instanceof Timer) {
130-
Timer t = (Timer) metric;
131-
Snapshot snapshot = t.getSnapshot();
132-
=======
13356
/**
13457
* The metric types used in
13558
* {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}.
13659
* Visible for testing.
13760
*/
13861
public static void collectMetric(
139-
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
62+
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
14063

14164
if (metric instanceof Timer) {
14265
Timer t = (Timer) metric;
143-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
14466
metricsRecordBuilder
145-
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
146-
t.getCount())
147-
.addGauge(
148-
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
149-
t.getFifteenMinuteRate())
150-
.addGauge(
151-
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
152-
t.getFiveMinuteRate())
153-
.addGauge(
154-
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
155-
t.getOneMinuteRate())
156-
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
157-
t.getMeanRate());
158-
<<<<<<< HEAD
159-
addSnapshotToMetricRecordBuilder(snapshot, metricsRecordBuilder, name, "timer");
160-
=======
161-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
67+
.addCounter(new ShuffleServiceMetricsInfo(
68+
name + "_count", "Count of timer " + name),
69+
t.getCount())
70+
.addGauge(
71+
new ShuffleServiceMetricsInfo(
72+
name + "_rate15", "15 minute rate of timer " + name),
73+
t.getFifteenMinuteRate())
74+
.addGauge(
75+
new ShuffleServiceMetricsInfo(
76+
name + "_rate5", "5 minute rate of timer " + name),
77+
t.getFiveMinuteRate())
78+
.addGauge(
79+
new ShuffleServiceMetricsInfo(
80+
name + "_rate1", "1 minute rate of timer " + name),
81+
t.getOneMinuteRate())
82+
.addGauge(new ShuffleServiceMetricsInfo(
83+
name + "_rateMean", "Mean rate of timer " + name),
84+
t.getMeanRate());
16285
} else if (metric instanceof Meter) {
16386
Meter m = (Meter) metric;
16487
metricsRecordBuilder
165-
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
166-
m.getCount())
167-
.addGauge(
168-
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
169-
m.getFifteenMinuteRate())
170-
.addGauge(
171-
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
172-
m.getFiveMinuteRate())
173-
.addGauge(
174-
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
175-
m.getOneMinuteRate())
176-
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
177-
m.getMeanRate());
88+
.addCounter(new ShuffleServiceMetricsInfo(
89+
name + "_count", "Count of meter " + name),
90+
m.getCount())
91+
.addGauge(
92+
new ShuffleServiceMetricsInfo(
93+
name + "_rate15", "15 minute rate of meter " + name),
94+
m.getFifteenMinuteRate())
95+
.addGauge(
96+
new ShuffleServiceMetricsInfo(
97+
name + "_rate5", "5 minute rate of meter " + name),
98+
m.getFiveMinuteRate())
99+
.addGauge(
100+
new ShuffleServiceMetricsInfo(
101+
name + "_rate1", "1 minute rate of meter " + name),
102+
m.getOneMinuteRate())
103+
.addGauge(new ShuffleServiceMetricsInfo(
104+
name + "_rateMean", "Mean rate of meter " + name),
105+
m.getMeanRate());
178106
} else if (metric instanceof Gauge) {
179-
<<<<<<< HEAD
180-
Gauge m = (Gauge) metric;
181-
Object gaugeValue = m.getValue();
182-
if (gaugeValue instanceof Integer) {
183-
Integer intValue = (Integer) gaugeValue;
184-
metricsRecordBuilder
185-
.addGauge(new ShuffleServiceMetricsInfo(name, "Integer value of " +
186-
"gauge " + name), intValue.intValue());
187-
=======
188107
final Object gaugeValue = ((Gauge) metric).getValue();
189108
if (gaugeValue instanceof Integer) {
190109
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue);
@@ -197,18 +116,14 @@ public static void collectMetric(
197116
} else {
198117
throw new IllegalStateException(
199118
"Not supported class type of metric[" + name + "] for value " + gaugeValue);
200-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
201119
}
202120
}
203121
}
204122

205-
<<<<<<< HEAD
206-
=======
207123
private static MetricsInfo getShuffleServiceMetricsInfo(String name) {
208124
return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
209125
}
210126

211-
>>>>>>> 075dd620e32872b5d90a2fa7d09b43b15502182b
212127
private static class ShuffleServiceMetricsInfo implements MetricsInfo {
213128

214129
private final String name;

0 commit comments

Comments
 (0)