Skip to content

Commit e3a87ea

Browse files
author
Robert Kruszewski
committed
Merge branch 'master' into rk/upstream
2 parents 3f67de0 + 5298171 commit e3a87ea

File tree

14 files changed

+303
-227
lines changed

14 files changed

+303
-227
lines changed

R/pkg/R/functions.R

Lines changed: 135 additions & 167 deletions
Large diffs are not rendered by default.

R/pkg/R/generics.R

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -913,8 +913,9 @@ setGeneric("add_months", function(y, x) { standardGeneric("add_months") })
913913
#' @name NULL
914914
setGeneric("approxCountDistinct", function(x, ...) { standardGeneric("approxCountDistinct") })
915915

916-
#' @rdname array_contains
916+
#' @rdname column_collection_functions
917917
#' @export
918+
#' @name NULL
918919
setGeneric("array_contains", function(x, value) { standardGeneric("array_contains") })
919920

920921
#' @rdname column_string_functions
@@ -992,8 +993,9 @@ setGeneric("conv", function(x, fromBase, toBase) { standardGeneric("conv") })
992993
#' @name NULL
993994
setGeneric("countDistinct", function(x, ...) { standardGeneric("countDistinct") })
994995

995-
#' @rdname crc32
996+
#' @rdname column_misc_functions
996997
#' @export
998+
#' @name NULL
997999
setGeneric("crc32", function(x) { standardGeneric("crc32") })
9981000

9991001
#' @rdname column_nonaggregate_functions
@@ -1006,8 +1008,9 @@ setGeneric("create_array", function(x, ...) { standardGeneric("create_array") })
10061008
#' @name NULL
10071009
setGeneric("create_map", function(x, ...) { standardGeneric("create_map") })
10081010

1009-
#' @rdname hash
1011+
#' @rdname column_misc_functions
10101012
#' @export
1013+
#' @name NULL
10111014
setGeneric("hash", function(x, ...) { standardGeneric("hash") })
10121015

10131016
#' @param x empty. Should be used with no argument.
@@ -1060,12 +1063,14 @@ setGeneric("dense_rank", function(x = "missing") { standardGeneric("dense_rank")
10601063
#' @name NULL
10611064
setGeneric("encode", function(x, charset) { standardGeneric("encode") })
10621065

1063-
#' @rdname explode
1066+
#' @rdname column_collection_functions
10641067
#' @export
1068+
#' @name NULL
10651069
setGeneric("explode", function(x) { standardGeneric("explode") })
10661070

1067-
#' @rdname explode_outer
1071+
#' @rdname column_collection_functions
10681072
#' @export
1073+
#' @name NULL
10691074
setGeneric("explode_outer", function(x) { standardGeneric("explode_outer") })
10701075

10711076
#' @rdname column_nonaggregate_functions
@@ -1088,8 +1093,9 @@ setGeneric("format_number", function(y, x) { standardGeneric("format_number") })
10881093
#' @name NULL
10891094
setGeneric("format_string", function(format, x, ...) { standardGeneric("format_string") })
10901095

1091-
#' @rdname from_json
1096+
#' @rdname column_collection_functions
10921097
#' @export
1098+
#' @name NULL
10931099
setGeneric("from_json", function(x, schema, ...) { standardGeneric("from_json") })
10941100

10951101
#' @rdname column_datetime_functions
@@ -1205,8 +1211,9 @@ setGeneric("lpad", function(x, len, pad) { standardGeneric("lpad") })
12051211
#' @name NULL
12061212
setGeneric("ltrim", function(x) { standardGeneric("ltrim") })
12071213

1208-
#' @rdname md5
1214+
#' @rdname column_misc_functions
12091215
#' @export
1216+
#' @name NULL
12101217
setGeneric("md5", function(x) { standardGeneric("md5") })
12111218

12121219
#' @rdname column_datetime_functions
@@ -1272,12 +1279,14 @@ setGeneric("percent_rank", function(x = "missing") { standardGeneric("percent_ra
12721279
#' @name NULL
12731280
setGeneric("pmod", function(y, x) { standardGeneric("pmod") })
12741281

1275-
#' @rdname posexplode
1282+
#' @rdname column_collection_functions
12761283
#' @export
1284+
#' @name NULL
12771285
setGeneric("posexplode", function(x) { standardGeneric("posexplode") })
12781286

1279-
#' @rdname posexplode_outer
1287+
#' @rdname column_collection_functions
12801288
#' @export
1289+
#' @name NULL
12811290
setGeneric("posexplode_outer", function(x) { standardGeneric("posexplode_outer") })
12821291

12831292
#' @rdname column_datetime_functions
@@ -1350,12 +1359,14 @@ setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })
13501359
#' @name NULL
13511360
setGeneric("second", function(x) { standardGeneric("second") })
13521361

1353-
#' @rdname sha1
1362+
#' @rdname column_misc_functions
13541363
#' @export
1364+
#' @name NULL
13551365
setGeneric("sha1", function(x) { standardGeneric("sha1") })
13561366

1357-
#' @rdname sha2
1367+
#' @rdname column_misc_functions
13581368
#' @export
1369+
#' @name NULL
13591370
setGeneric("sha2", function(y, x) { standardGeneric("sha2") })
13601371

13611372
#' @rdname column_math_functions
@@ -1378,17 +1389,19 @@ setGeneric("shiftRightUnsigned", function(y, x) { standardGeneric("shiftRightUns
13781389
#' @name NULL
13791390
setGeneric("signum", function(x) { standardGeneric("signum") })
13801391

1381-
#' @rdname size
1392+
#' @rdname column_collection_functions
13821393
#' @export
1394+
#' @name NULL
13831395
setGeneric("size", function(x) { standardGeneric("size") })
13841396

13851397
#' @rdname column_aggregate_functions
13861398
#' @export
13871399
#' @name NULL
13881400
setGeneric("skewness", function(x) { standardGeneric("skewness") })
13891401

1390-
#' @rdname sort_array
1402+
#' @rdname column_collection_functions
13911403
#' @export
1404+
#' @name NULL
13921405
setGeneric("sort_array", function(x, asc = TRUE) { standardGeneric("sort_array") })
13931406

13941407
#' @rdname column_string_functions
@@ -1451,8 +1464,9 @@ setGeneric("toRadians", function(x) { standardGeneric("toRadians") })
14511464
#' @name NULL
14521465
setGeneric("to_date", function(x, format) { standardGeneric("to_date") })
14531466

1454-
#' @rdname to_json
1467+
#' @rdname column_collection_functions
14551468
#' @export
1469+
#' @name NULL
14561470
setGeneric("to_json", function(x, ...) { standardGeneric("to_json") })
14571471

14581472
#' @rdname column_datetime_functions

common/network-common/src/main/java/org/apache/spark/network/client/TransportClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ public void stream(String streamId, StreamCallback callback) {
179179
// written to the socket atomically, so that callbacks are called in the right order
180180
// when responses arrive.
181181
synchronized (this) {
182-
handler.addStreamCallback(callback);
182+
handler.addStreamCallback(streamId, callback);
183183
channel.writeAndFlush(new StreamRequest(streamId)).addListener(future -> {
184184
if (future.isSuccess()) {
185185
long timeTaken = System.currentTimeMillis() - startTime;

common/network-common/src/main/java/org/apache/spark/network/client/TransportResponseHandler.java

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import java.util.concurrent.ConcurrentLinkedQueue;
2525
import java.util.concurrent.atomic.AtomicLong;
2626

27+
import scala.Tuple2;
28+
2729
import com.google.common.annotations.VisibleForTesting;
2830
import io.netty.channel.Channel;
2931
import org.slf4j.Logger;
@@ -56,7 +58,7 @@ public class TransportResponseHandler extends MessageHandler<ResponseMessage> {
5658

5759
private final Map<Long, RpcResponseCallback> outstandingRpcs;
5860

59-
private final Queue<StreamCallback> streamCallbacks;
61+
private final Queue<Tuple2<String, StreamCallback>> streamCallbacks;
6062
private volatile boolean streamActive;
6163

6264
/** Records the time (in system nanoseconds) that the last fetch or RPC request was sent. */
@@ -88,9 +90,9 @@ public void removeRpcRequest(long requestId) {
8890
outstandingRpcs.remove(requestId);
8991
}
9092

91-
public void addStreamCallback(StreamCallback callback) {
93+
public void addStreamCallback(String streamId, StreamCallback callback) {
9294
timeOfLastRequestNs.set(System.nanoTime());
93-
streamCallbacks.offer(callback);
95+
streamCallbacks.offer(new Tuple2<>(streamId, callback));
9496
}
9597

9698
@VisibleForTesting
@@ -104,15 +106,31 @@ public void deactivateStream() {
104106
*/
105107
private void failOutstandingRequests(Throwable cause) {
106108
for (Map.Entry<StreamChunkId, ChunkReceivedCallback> entry : outstandingFetches.entrySet()) {
107-
entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
109+
try {
110+
entry.getValue().onFailure(entry.getKey().chunkIndex, cause);
111+
} catch (Exception e) {
112+
logger.warn("ChunkReceivedCallback.onFailure throws exception", e);
113+
}
108114
}
109115
for (Map.Entry<Long, RpcResponseCallback> entry : outstandingRpcs.entrySet()) {
110-
entry.getValue().onFailure(cause);
116+
try {
117+
entry.getValue().onFailure(cause);
118+
} catch (Exception e) {
119+
logger.warn("RpcResponseCallback.onFailure throws exception", e);
120+
}
121+
}
122+
for (Tuple2<String, StreamCallback> entry : streamCallbacks) {
123+
try {
124+
entry._2().onFailure(entry._1(), cause);
125+
} catch (Exception e) {
126+
logger.warn("StreamCallback.onFailure throws exception", e);
127+
}
111128
}
112129

113130
// It's OK if new fetches appear, as they will fail immediately.
114131
outstandingFetches.clear();
115132
outstandingRpcs.clear();
133+
streamCallbacks.clear();
116134
}
117135

118136
@Override
@@ -190,8 +208,9 @@ public void handle(ResponseMessage message) throws Exception {
190208
}
191209
} else if (message instanceof StreamResponse) {
192210
StreamResponse resp = (StreamResponse) message;
193-
StreamCallback callback = streamCallbacks.poll();
194-
if (callback != null) {
211+
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
212+
if (entry != null) {
213+
StreamCallback callback = entry._2();
195214
if (resp.byteCount > 0) {
196215
StreamInterceptor interceptor = new StreamInterceptor(this, resp.streamId, resp.byteCount,
197216
callback);
@@ -216,8 +235,9 @@ public void handle(ResponseMessage message) throws Exception {
216235
}
217236
} else if (message instanceof StreamFailure) {
218237
StreamFailure resp = (StreamFailure) message;
219-
StreamCallback callback = streamCallbacks.poll();
220-
if (callback != null) {
238+
Tuple2<String, StreamCallback> entry = streamCallbacks.poll();
239+
if (entry != null) {
240+
StreamCallback callback = entry._2();
221241
try {
222242
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
223243
} catch (IOException ioe) {

common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.spark.network;
1919

20+
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122

2223
import io.netty.channel.Channel;
@@ -127,17 +128,43 @@ public void testActiveStreams() throws Exception {
127128

128129
StreamResponse response = new StreamResponse("stream", 1234L, null);
129130
StreamCallback cb = mock(StreamCallback.class);
130-
handler.addStreamCallback(cb);
131+
handler.addStreamCallback("stream", cb);
131132
assertEquals(1, handler.numOutstandingRequests());
132133
handler.handle(response);
133134
assertEquals(1, handler.numOutstandingRequests());
134135
handler.deactivateStream();
135136
assertEquals(0, handler.numOutstandingRequests());
136137

137138
StreamFailure failure = new StreamFailure("stream", "uh-oh");
138-
handler.addStreamCallback(cb);
139+
handler.addStreamCallback("stream", cb);
139140
assertEquals(1, handler.numOutstandingRequests());
140141
handler.handle(failure);
141142
assertEquals(0, handler.numOutstandingRequests());
142143
}
144+
145+
@Test
146+
public void failOutstandingStreamCallbackOnClose() throws Exception {
147+
Channel c = new LocalChannel();
148+
c.pipeline().addLast(TransportFrameDecoder.HANDLER_NAME, new TransportFrameDecoder());
149+
TransportResponseHandler handler = new TransportResponseHandler(c);
150+
151+
StreamCallback cb = mock(StreamCallback.class);
152+
handler.addStreamCallback("stream-1", cb);
153+
handler.channelInactive();
154+
155+
verify(cb).onFailure(eq("stream-1"), isA(IOException.class));
156+
}
157+
158+
@Test
159+
public void failOutstandingStreamCallbackOnException() throws Exception {
160+
Channel c = new LocalChannel();
161+
c.pipeline().addLast(TransportFrameDecoder.HANDLER_NAME, new TransportFrameDecoder());
162+
TransportResponseHandler handler = new TransportResponseHandler(c);
163+
164+
StreamCallback cb = mock(StreamCallback.class);
165+
handler.addStreamCallback("stream-1", cb);
166+
handler.exceptionCaught(new IOException("Oops!"));
167+
168+
verify(cb).onFailure(eq("stream-1"), isA(IOException.class));
169+
}
143170
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
199199
new SecurityManager(executorConf),
200200
clientMode = true)
201201
val driver = fetcher.setupEndpointRefByURI(driverUrl)
202-
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig(executorId))
202+
val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig)
203203
val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId))
204204
fetcher.shutdown()
205205

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,10 +354,11 @@ package object config {
354354

355355
private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
356356
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
357+
.internal()
357358
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
358359
"above this threshold. This is to avoid a giant request takes too much memory.")
359360
.bytesConf(ByteUnit.BYTE)
360-
.createWithDefaultString("200m")
361+
.createWithDefault(Long.MaxValue)
361362

362363
private[spark] val TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES =
363364
ConfigBuilder("spark.taskMetrics.trackUpdatedBlockStatuses")

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] sealed trait CoarseGrainedClusterMessage extends Serializable
2828

2929
private[spark] object CoarseGrainedClusterMessages {
3030

31-
case class RetrieveSparkAppConfig(executorId: String) extends CoarseGrainedClusterMessage
31+
case object RetrieveSparkAppConfig extends CoarseGrainedClusterMessage
3232

3333
case class SparkAppConfig(
3434
sparkProperties: Seq[(String, String)],

core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -341,15 +341,11 @@ private[storage] class BlockInfoManager extends Logging {
341341
*
342342
* @return the ids of blocks whose pins were released
343343
*/
344-
def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = {
344+
def releaseAllLocksForTask(taskAttemptId: TaskAttemptId): Seq[BlockId] = synchronized {
345345
val blocksWithReleasedLocks = mutable.ArrayBuffer[BlockId]()
346346

347-
val readLocks = synchronized {
348-
readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
349-
}
350-
val writeLocks = synchronized {
351-
writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
352-
}
347+
val readLocks = readLocksByTask.remove(taskAttemptId).getOrElse(ImmutableMultiset.of[BlockId]())
348+
val writeLocks = writeLocksByTask.remove(taskAttemptId).getOrElse(Seq.empty)
353349

354350
for (blockId <- writeLocks) {
355351
infos.get(blockId).foreach { info =>
@@ -358,21 +354,19 @@ private[storage] class BlockInfoManager extends Logging {
358354
}
359355
blocksWithReleasedLocks += blockId
360356
}
357+
361358
readLocks.entrySet().iterator().asScala.foreach { entry =>
362359
val blockId = entry.getElement
363360
val lockCount = entry.getCount
364361
blocksWithReleasedLocks += blockId
365-
synchronized {
366-
get(blockId).foreach { info =>
367-
info.readerCount -= lockCount
368-
assert(info.readerCount >= 0)
369-
}
362+
get(blockId).foreach { info =>
363+
info.readerCount -= lockCount
364+
assert(info.readerCount >= 0)
370365
}
371366
}
372367

373-
synchronized {
374-
notifyAll()
375-
}
368+
notifyAll()
369+
376370
blocksWithReleasedLocks
377371
}
378372

core/src/main/scala/org/apache/spark/ui/JettyUtils.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.language.implicitConversions
2626
import scala.xml.Node
2727

2828
import org.eclipse.jetty.client.api.Response
29+
import org.eclipse.jetty.client.HttpClient
30+
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP
2931
import org.eclipse.jetty.proxy.ProxyServlet
3032
import org.eclipse.jetty.server._
3133
import org.eclipse.jetty.server.handler._
@@ -208,6 +210,16 @@ private[spark] object JettyUtils extends Logging {
208210
rewrittenURI.toString()
209211
}
210212

213+
override def newHttpClient(): HttpClient = {
214+
// SPARK-21176: Use the Jetty logic to calculate the number of selector threads (#CPUs/2),
215+
// but limit it to 8 max.
216+
// Otherwise, it might happen that we exhaust the threadpool since in reverse proxy mode
217+
// a proxy is instantiated for each executor. If the head node has many processors, this
218+
// can quickly add up to an unreasonably high number of threads.
219+
val numSelectors = math.max(1, math.min(8, Runtime.getRuntime().availableProcessors() / 2))
220+
new HttpClient(new HttpClientTransportOverHTTP(numSelectors), null)
221+
}
222+
211223
override def filterServerResponseHeader(
212224
clientRequest: HttpServletRequest,
213225
serverResponse: Response,

0 commit comments

Comments
 (0)