Skip to content

Commit 686d1c5

Browse files
authored
Support multiple catalogs for Presto spark native execution (prestodb#25943)
## Description This PR implements native catalog properties support for Presto Spark, enabling proper configuration and management of catalog properties for native execution processes. **Key Changes:** * **Added `NativeExecutionCatalogProperties` class**: A new class that holds catalog properties for native execution processes, where each catalog generates a separate `<catalog-name>.properties` file. * **Enhanced `WorkerProperty` and `PrestoSparkWorkerProperty`**: Extended the worker property classes to support catalog properties configuration and proper property file generation. * **Updated Native Execution Module**: Modified `NativeExecutionModule` and `NativeExecutionProcess` to integrate catalog properties into the native execution workflow. * **Improved Configuration Integration**: Updated `PrestoSparkModule`, `PrestoSparkServiceFactory` to properly wire the new catalog properties functionality. * **Enhanced Test Coverage**: Added and updated tests in `TestNativeExecutionSystemConfig`, `TestNativeExecutionProcess`, and other test classes to ensure proper catalog properties handling. ## Motivation and Context This change is required to support proper catalog configuration in Presto Spark's native execution mode. Previously, catalog properties were not properly managed for native execution processes, which limited the ability to configure connectors effectively in native mode. ## Test Plan * **Unit Tests**: Added test `NativeExecutionCatalogProperties` * **Integration Tests**: Updated `TestNativeExecutionProcess` and `TestPrestoSparkHttpClient` to verify catalog properties integration ## Contributor checklist - [x] Please make sure your submission complies with our [contributing guide](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md), in particular [code style](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#code-style) and [commit standards](https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#commit-standards). - [x] PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced. - [x] Adequate tests were added if applicable. - [x] CI passed. ## Release Notes ``` == NO RELEASE NOTE == ```
1 parent 00f3749 commit 686d1c5

File tree

12 files changed

+104
-145
lines changed

12 files changed

+104
-145
lines changed

presto-native-execution/src/test/java/com/facebook/presto/spark/PrestoSparkNativeQueryRunnerUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
1919
import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils;
2020
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionModule;
21-
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
2221
import com.facebook.presto.spi.security.PrincipalType;
2322
import com.facebook.presto.testing.QueryRunner;
2423
import com.google.common.collect.ImmutableList;
@@ -125,7 +124,8 @@ public static PrestoSparkQueryRunner createTpchRunner()
125124
return createRunner(
126125
"tpchstandard",
127126
new NativeExecutionModule(
128-
Optional.of(new NativeExecutionConnectorConfig().setConnectorName("tpch"))));
127+
Optional.of(
128+
ImmutableMap.of("hive", ImmutableMap.of("connector.name", "tpch")))));
129129
}
130130

131131
public static PrestoSparkQueryRunner createRunner(String defaultCatalog, Optional<Path> baseDir, Map<String, String> additionalConfigProperties, Map<String, String> additionalSparkProperties, ImmutableList<Module> nativeModules)

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,6 @@
120120
import com.facebook.presto.spark.execution.PrestoSparkBroadcastTableCacheManager;
121121
import com.facebook.presto.spark.execution.PrestoSparkExecutionExceptionFactory;
122122
import com.facebook.presto.spark.execution.http.BatchTaskUpdateRequest;
123-
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
124123
import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig;
125124
import com.facebook.presto.spark.execution.shuffle.PrestoSparkLocalShuffleReadInfo;
126125
import com.facebook.presto.spark.execution.shuffle.PrestoSparkLocalShuffleWriteInfo;
@@ -284,7 +283,6 @@ protected void setup(Binder binder)
284283
configBinder(binder).bindConfig(PrestoSparkConfig.class);
285284
configBinder(binder).bindConfig(TracingConfig.class);
286285
configBinder(binder).bindConfig(NativeExecutionNodeConfig.class);
287-
configBinder(binder).bindConfig(NativeExecutionConnectorConfig.class);
288286
configBinder(binder).bindConfig(PlanCheckerProviderManagerConfig.class);
289287
configBinder(binder).bindConfig(SecurityConfig.class);
290288

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkServiceFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import com.google.inject.Module;
2828

2929
import java.util.List;
30+
import java.util.Optional;
3031

3132
import static com.facebook.presto.spark.classloader_interface.PrestoSparkConfiguration.METADATA_STORAGE_TYPE_LOCAL;
3233
import static com.google.common.base.Preconditions.checkArgument;
@@ -70,7 +71,7 @@ protected List<Module> getAdditionalModules(PrestoSparkConfiguration configurati
7071
return ImmutableList.of(
7172
new PrestoSparkLocalMetadataStorageModule(),
7273
// TODO: Need to let NativeExecutionModule addition be controlled by configuration as well.
73-
new NativeExecutionModule());
74+
new NativeExecutionModule(Optional.of(configuration.getCatalogProperties())));
7475
}
7576

7677
protected SqlParserOptions getSqlParserOptions()

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionModule.java

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@
1313
*/
1414
package com.facebook.presto.spark.execution.nativeprocess;
1515

16-
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
17-
import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig;
18-
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
16+
import com.facebook.presto.spark.execution.property.NativeExecutionCatalogProperties;
1917
import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty;
2018
import com.facebook.presto.spark.execution.property.WorkerProperty;
2119
import com.facebook.presto.spark.execution.shuffle.PrestoSparkLocalShuffleInfoTranslator;
@@ -32,6 +30,7 @@
3230
import com.google.inject.TypeLiteral;
3331
import okhttp3.OkHttpClient;
3432

33+
import java.util.Map;
3534
import java.util.Optional;
3635
import java.util.concurrent.TimeUnit;
3736

@@ -40,20 +39,20 @@
4039
public class NativeExecutionModule
4140
implements Module
4241
{
43-
private Optional<NativeExecutionConnectorConfig> connectorConfig;
42+
private Optional<Map<String, Map<String, String>>> catalogProperties;
4443

4544
// For use by production system where the configurations can only be tuned via configurations.
4645
public NativeExecutionModule()
4746
{
48-
this.connectorConfig = Optional.empty();
47+
this.catalogProperties = Optional.empty();
4948
}
5049

5150
// In the future, we would make more bindings injected into NativeExecutionModule
5251
// to be able to test various configuration parameters
5352
@VisibleForTesting
54-
public NativeExecutionModule(Optional<NativeExecutionConnectorConfig> connectorConfig)
53+
public NativeExecutionModule(Optional<Map<String, Map<String, String>>> catalogProperties)
5554
{
56-
this.connectorConfig = connectorConfig;
55+
this.catalogProperties = catalogProperties;
5756
}
5857

5958
@Override
@@ -74,17 +73,14 @@ protected void bindShuffle(Binder binder)
7473

7574
protected void bindWorkerProperties(Binder binder)
7675
{
76+
// Bind NativeExecutionCatalogProperties - this is not bound elsewhere
77+
binder.bind(NativeExecutionCatalogProperties.class).toInstance(
78+
new NativeExecutionCatalogProperties(catalogProperties.orElse(ImmutableMap.of())));
79+
80+
// Bind worker property classes
7781
newOptionalBinder(binder, new TypeLiteral<WorkerProperty<?, ?, ?>>() {
7882
}).setDefault().to(PrestoSparkWorkerProperty.class).in(Scopes.SINGLETON);
79-
if (connectorConfig.isPresent()) {
80-
binder.bind(PrestoSparkWorkerProperty.class).toInstance(
81-
new PrestoSparkWorkerProperty(connectorConfig.get(),
82-
new NativeExecutionNodeConfig(), new NativeExecutionSystemConfig(
83-
ImmutableMap.of())));
84-
}
85-
else {
86-
binder.bind(PrestoSparkWorkerProperty.class).in(Scopes.SINGLETON);
87-
}
83+
binder.bind(PrestoSparkWorkerProperty.class).in(Scopes.SINGLETON);
8884
}
8985

9086
protected void bindHttpClient(Binder binder)

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/nativeprocess/NativeExecutionProcess.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,7 @@ private void populateConfigurationFiles(String configBasePath)
343343
workerProperty.populateAllProperties(
344344
Paths.get(configBasePath, WORKER_CONFIG_FILE),
345345
Paths.get(configBasePath, WORKER_NODE_CONFIG_FILE),
346-
Paths.get(configBasePath, format("%s%s.properties", WORKER_CONNECTOR_CONFIG_FILE,
347-
getNativeExecutionCatalogName(session))));
346+
Paths.get(configBasePath, WORKER_CONNECTOR_CONFIG_FILE)); // Directory path for catalogs
348347
}
349348

350349
private void doGetServerInfo(SettableFuture<ServerInfo> future)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.spark.execution.property;
15+
16+
import java.util.Map;
17+
18+
import static java.util.Objects.requireNonNull;
19+
20+
/**
21+
* This class holds catalog properties for native execution process.
22+
* Each catalog will generate a separate <catalog-name>.properties file.
23+
*/
24+
public class NativeExecutionCatalogProperties
25+
{
26+
private final Map<String, Map<String, String>> catalogProperties;
27+
28+
public NativeExecutionCatalogProperties(Map<String, Map<String, String>> catalogProperties)
29+
{
30+
this.catalogProperties = requireNonNull(catalogProperties, "catalogProperties is null");
31+
}
32+
33+
public Map<String, Map<String, String>> getAllCatalogProperties()
34+
{
35+
return catalogProperties;
36+
}
37+
}

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/NativeExecutionConnectorConfig.java

Lines changed: 0 additions & 86 deletions
This file was deleted.

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/PrestoSparkWorkerProperty.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,14 @@
1919
* A utility class that helps with properties and its materialization.
2020
*/
2121
public class PrestoSparkWorkerProperty
22-
extends WorkerProperty<NativeExecutionConnectorConfig, NativeExecutionNodeConfig, NativeExecutionSystemConfig>
22+
extends WorkerProperty<NativeExecutionCatalogProperties, NativeExecutionNodeConfig, NativeExecutionSystemConfig>
2323
{
2424
@Inject
2525
public PrestoSparkWorkerProperty(
26-
NativeExecutionConnectorConfig connectorConfig,
26+
NativeExecutionCatalogProperties catalogProperties,
2727
NativeExecutionNodeConfig nodeConfig,
2828
NativeExecutionSystemConfig systemConfig)
2929
{
30-
super(connectorConfig, nodeConfig, systemConfig);
30+
super(catalogProperties, nodeConfig, systemConfig);
3131
}
3232
}

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/property/WorkerProperty.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -64,25 +64,25 @@
6464
* }
6565
* }
6666
*/
67-
public class WorkerProperty<T1 extends NativeExecutionConnectorConfig, T2 extends NativeExecutionNodeConfig, T3 extends NativeExecutionSystemConfig>
67+
public class WorkerProperty<T1 extends NativeExecutionCatalogProperties, T2 extends NativeExecutionNodeConfig, T3 extends NativeExecutionSystemConfig>
6868
{
69-
private final T1 connectorConfig;
69+
private final T1 catalogProperties;
7070
private final T2 nodeConfig;
7171
private final T3 systemConfig;
7272

7373
public WorkerProperty(
74-
T1 connectorConfig,
74+
T1 catalogProperties,
7575
T2 nodeConfig,
7676
T3 systemConfig)
7777
{
7878
this.systemConfig = requireNonNull(systemConfig, "systemConfig is null");
7979
this.nodeConfig = requireNonNull(nodeConfig, "nodeConfig is null");
80-
this.connectorConfig = requireNonNull(connectorConfig, "connectorConfig is null");
80+
this.catalogProperties = requireNonNull(catalogProperties, "catalogProperties is null");
8181
}
8282

83-
public T1 getConnectorConfig()
83+
public T1 getCatalogProperties()
8484
{
85-
return connectorConfig;
85+
return catalogProperties;
8686
}
8787

8888
public T2 getNodeConfig()
@@ -95,12 +95,12 @@ public T3 getSystemConfig()
9595
return systemConfig;
9696
}
9797

98-
public void populateAllProperties(Path systemConfigPath, Path nodeConfigPath, Path connectorConfigPath)
98+
public void populateAllProperties(Path systemConfigPath, Path nodeConfigPath, Path catalogConfigsPath)
9999
throws IOException
100100
{
101101
populateProperty(systemConfig.getAllProperties(), systemConfigPath);
102102
populateProperty(nodeConfig.getAllProperties(), nodeConfigPath);
103-
populateProperty(connectorConfig.getAllProperties(), connectorConfigPath);
103+
populateCatalogProperties(catalogProperties.getAllCatalogProperties(), catalogConfigsPath);
104104
}
105105

106106
private void populateProperty(Map<String, String> properties, Path path)
@@ -122,4 +122,20 @@ private void populateProperty(Map<String, String> properties, Path path)
122122
throw e;
123123
}
124124
}
125+
126+
private void populateCatalogProperties(Map<String, Map<String, String>> catalogProperties, Path path)
127+
throws IOException
128+
{
129+
File catalogDir = path.toFile();
130+
if (!catalogDir.exists()) {
131+
catalogDir.mkdirs();
132+
}
133+
134+
for (Map.Entry<String, Map<String, String>> catalogEntry : catalogProperties.entrySet()) {
135+
String catalogName = catalogEntry.getKey();
136+
Map<String, String> properties = catalogEntry.getValue();
137+
Path catalogConfigPath = path.resolve(catalogName + ".properties");
138+
populateProperty(properties, catalogConfigPath);
139+
}
140+
}
125141
}

presto-spark-base/src/test/java/com/facebook/presto/spark/execution/TestNativeExecutionProcess.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import com.facebook.presto.spark.execution.http.TestPrestoSparkHttpClient;
2323
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcess;
2424
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionProcessFactory;
25-
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
25+
import com.facebook.presto.spark.execution.property.NativeExecutionCatalogProperties;
2626
import com.facebook.presto.spark.execution.property.NativeExecutionNodeConfig;
2727
import com.facebook.presto.spark.execution.property.NativeExecutionSystemConfig;
2828
import com.facebook.presto.spark.execution.property.PrestoSparkWorkerProperty;
@@ -88,7 +88,7 @@ private NativeExecutionProcessFactory createNativeExecutionProcessFactory()
8888
TaskId taskId = new TaskId("testid", 0, 0, 0, 0);
8989
ScheduledExecutorService errorScheduler = newScheduledThreadPool(4);
9090
PrestoSparkWorkerProperty workerProperty = new PrestoSparkWorkerProperty(
91-
new NativeExecutionConnectorConfig(),
91+
new NativeExecutionCatalogProperties(ImmutableMap.of()),
9292
new NativeExecutionNodeConfig(),
9393
new NativeExecutionSystemConfig(ImmutableMap.of()));
9494
NativeExecutionProcessFactory factory = new NativeExecutionProcessFactory(

0 commit comments

Comments
 (0)