Skip to content

Commit 84af5a1

Browse files
authored
feat(native-pos): Add spark driver temp storage handle propagation (#26436)
## Description For presto-on-spark native stack, allow spark driver to propagate common temp storage handle to executors. The temp storage handle will be a common one used for spilling as well as broadcast join. ``` == NO RELEASE NOTE == ```
1 parent f046e9f commit 84af5a1

19 files changed

+166
-57
lines changed

presto-main-base/src/main/java/com/facebook/presto/spiller/LocalTempStorage.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,31 @@ public void remove(TempDataOperationContext context, TempStorageHandle handle)
121121
Files.delete(((LocalTempStorageHandle) handle).getFilePath());
122122
}
123123

124+
@Override
125+
public TempStorageHandle getRootDirectoryHandle()
126+
{
127+
return new LocalTempStorageHandle(getNextSpillPath());
128+
}
129+
124130
@Override
125131
public byte[] serializeHandle(TempStorageHandle storageHandle)
132+
{
133+
return LocalTempStorage.serializeHandleStatic(storageHandle);
134+
}
135+
136+
public static byte[] serializeHandleStatic(TempStorageHandle storageHandle)
126137
{
127138
URI uri = ((LocalTempStorageHandle) storageHandle).getFilePath().toUri();
128139
return uri.toString().getBytes(UTF_8);
129140
}
130141

131142
@Override
132143
public TempStorageHandle deserialize(byte[] serializedStorageHandle)
144+
{
145+
return LocalTempStorage.deserializeStatic(serializedStorageHandle);
146+
}
147+
148+
public static LocalTempStorageHandle deserializeStatic(byte[] serializedStorageHandle)
133149
{
134150
String uriString = new String(serializedStorageHandle, UTF_8);
135151
try {
@@ -192,7 +208,7 @@ private boolean hasEnoughDiskSpace(Path path)
192208
}
193209
}
194210

195-
private static class LocalTempStorageHandle
211+
public static class LocalTempStorageHandle
196212
implements TempStorageHandle
197213
{
198214
private final Path filePath;
@@ -207,6 +223,12 @@ public Path getFilePath()
207223
return filePath;
208224
}
209225

226+
@Override
227+
public String getPathAsString()
228+
{
229+
return filePath.toString();
230+
}
231+
210232
@Override
211233
public String toString()
212234
{

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkConfig.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class PrestoSparkConfig
4646
private boolean storageBasedBroadcastJoinEnabled;
4747
private DataSize storageBasedBroadcastJoinWriteBufferSize = new DataSize(24, MEGABYTE);
4848
private String storageBasedBroadcastJoinStorage = "local";
49+
private String nativeTempStorage = "local";
4950
private DataSize sparkBroadcastJoinMaxMemoryOverride;
5051
private boolean smileSerializationEnabled = true;
5152
private int splitAssignmentBatchSize = 1_000_000;

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ public class PrestoSparkQueryExecutionFactory
194194
private final Set<PrestoSparkAuthenticatorProvider> authenticatorProviders;
195195
private final TempStorageManager tempStorageManager;
196196
private final String storageBasedBroadcastJoinStorage;
197+
private final String nativeTempStorage;
197198
private final NodeMemoryConfig nodeMemoryConfig;
198199
private final FeaturesConfig featuresConfig;
199200
private final QueryManagerConfig queryManagerConfig;
@@ -274,6 +275,7 @@ public PrestoSparkQueryExecutionFactory(
274275
this.authenticatorProviders = ImmutableSet.copyOf(requireNonNull(authenticatorProviders, "authenticatorProviders is null"));
275276
this.tempStorageManager = requireNonNull(tempStorageManager, "tempStorageManager is null");
276277
this.storageBasedBroadcastJoinStorage = requireNonNull(prestoSparkConfig, "prestoSparkConfig is null").getStorageBasedBroadcastJoinStorage();
278+
this.nativeTempStorage = requireNonNull(featuresConfig, "prestoSparkConfig is null").getSpillerTempStorage();
277279
this.nodeMemoryConfig = requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null");
278280
this.featuresConfig = requireNonNull(featuresConfig, "featuresConfig is null");
279281
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
@@ -683,7 +685,8 @@ else if (preparedQuery.isExplainTypeValidate()) {
683685
taskInfoCollector.register(sparkContext, Option.empty(), true);
684686
CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector = new CollectionAccumulator<>();
685687
shuffleStatsCollector.register(sparkContext, Option.empty(), false);
686-
TempStorage tempStorage = tempStorageManager.getTempStorage(storageBasedBroadcastJoinStorage);
688+
TempStorage broadcastJoinTempStorage = tempStorageManager.getTempStorage(this.storageBasedBroadcastJoinStorage);
689+
TempStorage nativeTempStorage = tempStorageManager.getTempStorage(this.nativeTempStorage);
687690
queryStateTimer.endAnalysis();
688691

689692
if (!isAdaptiveQueryExecutionEnabled(session)) {
@@ -713,7 +716,8 @@ else if (preparedQuery.isExplainTypeValidate()) {
713716
metadataStorage,
714717
queryStatusInfoOutputLocation,
715718
queryDataOutputLocation,
716-
tempStorage,
719+
broadcastJoinTempStorage,
720+
nativeTempStorage,
717721
nodeMemoryConfig,
718722
featuresConfig,
719723
queryManagerConfig,
@@ -752,7 +756,8 @@ else if (preparedQuery.isExplainTypeValidate()) {
752756
metadataStorage,
753757
queryStatusInfoOutputLocation,
754758
queryDataOutputLocation,
755-
tempStorage,
759+
broadcastJoinTempStorage,
760+
nativeTempStorage,
756761
nodeMemoryConfig,
757762
featuresConfig,
758763
queryManagerConfig,

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkTaskDescriptor.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,21 @@ public class PrestoSparkTaskDescriptor
3030
private final Map<String, String> extraCredentials;
3131
private final PlanFragment fragment;
3232
private final TableWriteInfo tableWriteInfo;
33+
private final byte[] serializedNativeTempStorageHandle;
3334

3435
@JsonCreator
3536
public PrestoSparkTaskDescriptor(
3637
@JsonProperty("session") SessionRepresentation session,
3738
@JsonProperty("extraCredentials") Map<String, String> extraCredentials,
3839
@JsonProperty("fragment") PlanFragment fragment,
39-
@JsonProperty("tableWriteInfo") TableWriteInfo tableWriteInfo)
40+
@JsonProperty("tableWriteInfo") TableWriteInfo tableWriteInfo,
41+
@JsonProperty("serializedNativeTempStorageHandle") byte[] serializedNativeTempStorageHandle)
4042
{
4143
this.session = requireNonNull(session, "session is null");
4244
this.extraCredentials = ImmutableMap.copyOf(requireNonNull(extraCredentials, "extraCredentials is null"));
4345
this.fragment = requireNonNull(fragment);
4446
this.tableWriteInfo = requireNonNull(tableWriteInfo, "tableWriteInfo is null");
47+
this.serializedNativeTempStorageHandle = requireNonNull(serializedNativeTempStorageHandle, "serializedNativeTempStorageHandle is null");
4548
}
4649

4750
@JsonProperty
@@ -67,4 +70,10 @@ public TableWriteInfo getTableWriteInfo()
6770
{
6871
return tableWriteInfo;
6972
}
73+
74+
@JsonProperty
75+
public byte[] getSerializedNativeTempStorageHandle()
76+
{
77+
return serializedNativeTempStorageHandle;
78+
}
7079
}

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,8 @@ public abstract class AbstractPrestoSparkQueryExecution
198198
protected final Optional<String> queryStatusInfoOutputLocation;
199199
protected final Optional<String> queryDataOutputLocation;
200200
protected final long queryCompletionDeadline;
201-
protected final TempStorage tempStorage;
201+
protected final TempStorage broadcastJoinTempStorage;
202+
protected final TempStorage nativeTempStorage;
202203
protected final NodeMemoryConfig nodeMemoryConfig;
203204
protected final FeaturesConfig featuresConfig;
204205
protected final QueryManagerConfig queryManagerConfig;
@@ -239,7 +240,8 @@ public AbstractPrestoSparkQueryExecution(
239240
PrestoSparkMetadataStorage metadataStorage,
240241
Optional<String> queryStatusInfoOutputLocation,
241242
Optional<String> queryDataOutputLocation,
242-
TempStorage tempStorage,
243+
TempStorage broadcastJoinTempStorage,
244+
TempStorage nativeTempStorage,
243245
NodeMemoryConfig nodeMemoryConfig,
244246
FeaturesConfig featuresConfig,
245247
QueryManagerConfig queryManagerConfig,
@@ -277,7 +279,8 @@ public AbstractPrestoSparkQueryExecution(
277279
this.metadataStorage = requireNonNull(metadataStorage, "metadataStorage is null");
278280
this.queryStatusInfoOutputLocation = requireNonNull(queryStatusInfoOutputLocation, "queryStatusInfoOutputLocation is null");
279281
this.queryDataOutputLocation = requireNonNull(queryDataOutputLocation, "queryDataOutputLocation is null");
280-
this.tempStorage = requireNonNull(tempStorage, "tempStorage is null");
282+
this.broadcastJoinTempStorage = requireNonNull(broadcastJoinTempStorage, "broadcastJoinTempStorage is null");
283+
this.nativeTempStorage = requireNonNull(nativeTempStorage, "nativeTempStorage is null");
281284
this.nodeMemoryConfig = requireNonNull(nodeMemoryConfig, "nodeMemoryConfig is null");
282285
this.featuresConfig = requireNonNull(featuresConfig, "featuresConfig is null");
283286
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
@@ -465,7 +468,8 @@ protected List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>> collectPag
465468
session.toSessionRepresentation(),
466469
session.getIdentity().getExtraCredentials(),
467470
rootFragment,
468-
tableWriteInfo);
471+
tableWriteInfo,
472+
nativeTempStorage.serializeHandle(nativeTempStorage.getRootDirectoryHandle()));
469473
SerializedPrestoSparkTaskDescriptor serializedTaskDescriptor = new SerializedPrestoSparkTaskDescriptor(sparkTaskDescriptorJsonCodec.toJsonBytes(taskDescriptor));
470474

471475
Map<String, JavaFutureAction<List<Tuple2<MutablePartitionId, PrestoSparkSerializedPage>>>> inputFutures = inputRdds.entrySet().stream()
@@ -562,7 +566,8 @@ public <T extends PrestoSparkTaskOutput> RddAndMore<T> createRdd(SubPlan subPlan
562566
taskInfoCollector,
563567
shuffleStatsCollector,
564568
tableWriteInfo,
565-
outputType);
569+
outputType,
570+
nativeTempStorage);
566571
return new RddAndMore<>(rdd, broadcastDependencies.build());
567572
}
568573

@@ -885,7 +890,8 @@ protected synchronized <T extends PrestoSparkTaskOutput> RddAndMore<T> createRdd
885890
taskInfoCollector,
886891
shuffleStatsCollector,
887892
tableWriteInfo,
888-
outputType);
893+
outputType,
894+
nativeTempStorage);
889895

890896
// For intermediate, non-broadcast stages - we use partitioned RDD
891897
// These stages produce PrestoSparkMutableRow
@@ -951,7 +957,7 @@ private PrestoSparkBroadcastDependency<?> createBroadcastDependency(RddAndMore<?
951957
}
952958

953959
if (isStorageBasedBroadcastJoinEnabled(session)) {
954-
validateStorageCapabilities(tempStorage);
960+
validateStorageCapabilities(broadcastJoinTempStorage);
955961
TempDataOperationContext tempDataOperationContext = new TempDataOperationContext(
956962
session.getSource(),
957963
session.getQueryId().getId(),
@@ -964,7 +970,7 @@ private PrestoSparkBroadcastDependency<?> createBroadcastDependency(RddAndMore<?
964970
maxBroadcastMemory,
965971
getQueryMaxTotalMemoryPerNode(session),
966972
queryCompletionDeadline,
967-
tempStorage,
973+
broadcastJoinTempStorage,
968974
tempDataOperationContext,
969975
waitTimeMetrics);
970976
}

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkAdaptiveQueryExecution.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,8 @@ public PrestoSparkAdaptiveQueryExecution(
162162
PrestoSparkMetadataStorage metadataStorage,
163163
Optional<String> queryStatusInfoOutputLocation,
164164
Optional<String> queryDataOutputLocation,
165-
TempStorage tempStorage,
165+
TempStorage broadcastJoinTempStorage,
166+
TempStorage nativeTempStorage,
166167
NodeMemoryConfig nodeMemoryConfig,
167168
FeaturesConfig featuresConfig,
168169
QueryManagerConfig queryManagerConfig,
@@ -205,7 +206,8 @@ public PrestoSparkAdaptiveQueryExecution(
205206
metadataStorage,
206207
queryStatusInfoOutputLocation,
207208
queryDataOutputLocation,
208-
tempStorage,
209+
broadcastJoinTempStorage,
210+
nativeTempStorage,
209211
nodeMemoryConfig,
210212
featuresConfig,
211213
queryManagerConfig,

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/PrestoSparkStaticQueryExecution.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,8 @@ public PrestoSparkStaticQueryExecution(
104104
PrestoSparkMetadataStorage metadataStorage,
105105
Optional<String> queryStatusInfoOutputLocation,
106106
Optional<String> queryDataOutputLocation,
107-
TempStorage tempStorage,
107+
TempStorage broadcastJoinTempStorage,
108+
TempStorage nativeTempStorage,
108109
NodeMemoryConfig nodeMemoryConfig,
109110
FeaturesConfig featuresConfig,
110111
QueryManagerConfig queryManagerConfig,
@@ -142,7 +143,8 @@ public PrestoSparkStaticQueryExecution(
142143
metadataStorage,
143144
queryStatusInfoOutputLocation,
144145
queryDataOutputLocation,
145-
tempStorage,
146+
broadcastJoinTempStorage,
147+
nativeTempStorage,
146148
nodeMemoryConfig,
147149
featuresConfig,
148150
queryManagerConfig,

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcess.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import okhttp3.OkHttpClient;
2323

2424
import java.io.IOException;
25+
import java.util.Optional;
2526
import java.util.concurrent.ExecutionException;
2627
import java.util.concurrent.ExecutorService;
2728
import java.util.concurrent.ScheduledExecutorService;
@@ -58,7 +59,8 @@ public DetachedNativeExecutionProcess(
5859
errorRetryScheduledExecutor,
5960
serverInfoCodec,
6061
maxErrorDuration,
61-
workerProperty);
62+
workerProperty,
63+
Optional.empty());
6264
}
6365

6466
@Override

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/DetachedNativeExecutionProcessFactory.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,13 @@
2020
import com.facebook.presto.spark.execution.property.WorkerProperty;
2121
import com.facebook.presto.spark.execution.task.ForNativeExecutionTask;
2222
import com.facebook.presto.spi.PrestoException;
23+
import com.facebook.presto.spi.storage.TempStorageHandle;
2324
import com.facebook.presto.sql.analyzer.FeaturesConfig;
2425
import com.google.inject.Inject;
2526
import okhttp3.OkHttpClient;
2627

2728
import java.io.IOException;
29+
import java.util.Optional;
2830
import java.util.concurrent.ExecutorService;
2931
import java.util.concurrent.ScheduledExecutorService;
3032
import java.util.concurrent.TimeUnit;
@@ -60,13 +62,16 @@ public DetachedNativeExecutionProcessFactory(
6062
}
6163

6264
@Override
63-
public NativeExecutionProcess getNativeExecutionProcess(Session session)
65+
public NativeExecutionProcess getNativeExecutionProcess(Session session,
66+
Optional<TempStorageHandle> nativeTempStorageHandle)
6467
{
65-
return createNativeExecutionProcess(session, new Duration(2, TimeUnit.MINUTES));
68+
return createNativeExecutionProcess(session, new Duration(2, TimeUnit.MINUTES),
69+
Optional.empty());
6670
}
6771

6872
@Override
69-
public NativeExecutionProcess createNativeExecutionProcess(Session session, Duration maxErrorDuration)
73+
public NativeExecutionProcess createNativeExecutionProcess(Session session,
74+
Duration maxErrorDuration, Optional<TempStorageHandle> nativeTempStorageHandle)
7075
{
7176
try {
7277
return new DetachedNativeExecutionProcess(

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcess.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty;
2828
import com.facebook.presto.spark.execution.property.WorkerProperty;
2929
import com.facebook.presto.spi.PrestoException;
30+
import com.facebook.presto.spi.storage.TempStorageHandle;
3031
import com.google.common.annotations.VisibleForTesting;
3132
import com.google.common.collect.ImmutableList;
3233
import com.google.common.util.concurrent.FutureCallback;
@@ -57,6 +58,7 @@
5758
import java.nio.file.Paths;
5859
import java.util.Arrays;
5960
import java.util.List;
61+
import java.util.Optional;
6062
import java.util.concurrent.ExecutionException;
6163
import java.util.concurrent.Executor;
6264
import java.util.concurrent.RejectedExecutionException;
@@ -99,6 +101,8 @@ public class NativeExecutionProcess
99101
private final RequestErrorTracker errorTracker;
100102
private final OkHttpClient httpClient;
101103
private final WorkerProperty<?, ?, ?> workerProperty;
104+
// Temp storage used for spilling and broadcast join.
105+
private final Optional<TempStorageHandle> nativeTempStorageHandle;
102106

103107
private volatile Process process;
104108
private volatile ProcessOutputPipe processOutputPipe;
@@ -112,7 +116,8 @@ public NativeExecutionProcess(
112116
ScheduledExecutorService scheduledExecutorService,
113117
JsonCodec<ServerInfo> serverInfoCodec,
114118
Duration maxErrorDuration,
115-
WorkerProperty<?, ?, ?> workerProperty)
119+
WorkerProperty<?, ?, ?> workerProperty,
120+
Optional<TempStorageHandle> nativeTempStorageHandle)
116121
throws IOException
117122
{
118123
this.executablePath = requireNonNull(executablePath, "executablePath is null");
@@ -136,6 +141,7 @@ public NativeExecutionProcess(
136141
scheduledExecutorService,
137142
"getting native process status");
138143
this.workerProperty = requireNonNull(workerProperty, "workerProperty is null");
144+
this.nativeTempStorageHandle = requireNonNull(nativeTempStorageHandle, "nativeTempStorageHandle is null");
139145
// Update any runtime configs to be used by presto native worker
140146
updateWorkerProperties();
141147
}
@@ -341,7 +347,7 @@ private void populateConfigurationFiles(String configBasePath)
341347
Paths.get(configBasePath, WORKER_CONNECTOR_CONFIG_DIR)); // Directory path for catalogs
342348
}
343349

344-
private void updateWorkerProperties()
350+
protected void updateWorkerProperties()
345351
{
346352
// Update memory properties
347353
updateWorkerMemoryProperties();
@@ -352,6 +358,11 @@ private void updateWorkerProperties()
352358
// the native execution process eventually for process initialization.
353359
workerProperty.getSystemConfig()
354360
.update(NativeExecutionSystemConfig.HTTP_SERVER_HTTP_PORT, String.valueOf(port));
361+
362+
// Update the temp storage configs for spilling and broadcast join.
363+
if (nativeTempStorageHandle.isPresent()) {
364+
workerProperty.updateTempStorageConfig(nativeTempStorageHandle.get());
365+
}
355366
}
356367

357368
protected SparkConf getSparkConf()

0 commit comments

Comments
 (0)