Skip to content

Commit 31df3a1

Browse files
authored
[feat](Catalog)Support Paimon DLF Catalog Using OSSHDFS Storage (#59245)
Support Paimon DLF Catalog Using OSSHDFS Storage (#59245)
1 parent 2dace7a commit 31df3a1

File tree

3 files changed

+75
-8
lines changed

3 files changed

+75
-8
lines changed

fe/fe-core/src/main/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStoreProperties.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.doris.datasource.property.metastore;
1919

2020
import org.apache.doris.datasource.paimon.PaimonExternalCatalog;
21-
import org.apache.doris.datasource.property.storage.OSSProperties;
2221
import org.apache.doris.datasource.property.storage.StorageProperties;
2322

2423
import com.aliyun.datalake.metastore.common.DataLakeConfig;
@@ -89,15 +88,11 @@ public Catalog initializeCatalog(String catalogName, List<StorageProperties> sto
8988
HiveConf hiveConf = buildHiveConf();
9089
buildCatalogOptions();
9190
StorageProperties ossProps = storagePropertiesList.stream()
92-
.filter(sp -> sp.getType() == StorageProperties.Type.OSS)
91+
.filter(sp -> sp.getType() == StorageProperties.Type.OSS
92+
|| sp.getType() == StorageProperties.Type.OSS_HDFS)
9393
.findFirst()
9494
.orElseThrow(() -> new IllegalStateException("Paimon DLF metastore requires OSS storage properties."));
95-
96-
if (!(ossProps instanceof OSSProperties)) {
97-
throw new IllegalStateException("Expected OSSProperties type.");
98-
}
99-
OSSProperties ossProperties = (OSSProperties) ossProps;
100-
hiveConf.addResource(ossProperties.getHadoopStorageConfig());
95+
ossProps.getHadoopStorageConfig().forEach(entry -> hiveConf.set(entry.getKey(), entry.getValue()));
10196
appendUserHadoopConfig(hiveConf);
10297
CatalogContext catalogContext = CatalogContext.create(catalogOptions, hiveConf);
10398
return CatalogFactory.createCatalog(catalogContext);

fe/fe-core/src/test/java/org/apache/doris/datasource/property/metastore/PaimonAliyunDLFMetaStorePropertiesTest.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,64 @@ void testInitializeCatalogWithValidOssProperties() throws UserException {
100100
}
101101
}
102102

103+
104+
@Test
105+
void testInitializeCatalogWithValidOssHdfsProperties() throws UserException {
106+
Map<String, String> props = createValidProps();
107+
PaimonAliyunDLFMetaStoreProperties dlfProps =
108+
new PaimonAliyunDLFMetaStoreProperties(props);
109+
dlfProps.initNormalizeAndCheckProps();
110+
111+
// Prepare OSSProperties mock
112+
Map<String, String> ossProps = new HashMap<>();
113+
ossProps.put("dlf.access_key", "ak");
114+
ossProps.put("dlf.secret_key", "sk");
115+
ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
116+
ossProps.put("dlf.region", "cn-beijing");
117+
ossProps.put("oss.hdfs.enabled", "true");
118+
119+
120+
List<StorageProperties> storageProperties = StorageProperties.createAll(ossProps);
121+
122+
Catalog mockCatalog = Mockito.mock(Catalog.class);
123+
124+
try (MockedStatic<CatalogFactory> mocked = Mockito.mockStatic(CatalogFactory.class)) {
125+
mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)))
126+
.thenReturn(mockCatalog);
127+
128+
Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties);
129+
130+
Assertions.assertNotNull(catalog, "Catalog should not be null");
131+
Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one");
132+
133+
mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)));
134+
}
135+
ossProps = new HashMap<>();
136+
ossProps.put("dlf.access_key", "ak");
137+
ossProps.put("dlf.secret_key", "sk");
138+
ossProps.put("dlf.endpoint", "dlf-vpc.cn-beijing.aliyuncs.com");
139+
ossProps.put("dlf.region", "cn-beijing");
140+
ossProps.put("oss.access_key", "ak");
141+
ossProps.put("oss.secret_key", "sk");
142+
ossProps.put("oss.endpoint", "oss-cn-beijing.oss-dls.aliyuncs.com");
143+
storageProperties = StorageProperties.createAll(ossProps);
144+
145+
mockCatalog = Mockito.mock(Catalog.class);
146+
147+
try (MockedStatic<CatalogFactory> mocked = Mockito.mockStatic(CatalogFactory.class)) {
148+
mocked.when(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)))
149+
.thenReturn(mockCatalog);
150+
151+
Catalog catalog = dlfProps.initializeCatalog("testCatalog", storageProperties);
152+
153+
Assertions.assertNotNull(catalog, "Catalog should not be null");
154+
Assertions.assertEquals(mockCatalog, catalog, "Catalog should be the mocked one");
155+
156+
mocked.verify(() -> CatalogFactory.createCatalog(Mockito.any(CatalogContext.class)));
157+
}
158+
159+
}
160+
103161
@Test
104162
void testInitializeCatalogWithoutOssPropertiesThrows() {
105163
Map<String, String> props = createValidProps();

regression-test/suites/external_table_p2/refactor_catalog_param/oss_hdfs_catalog_test.groovy

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,20 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") {
144144
'oss.hdfs.endpoint' = '${oss_hdfs_endpoint}',
145145
'oss.hdfs.region'='${oss_hdfs_region}'
146146
"""
147+
//**************** Paimon DLF ON OSS_HDFS *******************/
147148

149+
String query_table_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryTable")
150+
String query_count_paimon_dlf = context.config.otherConfigs.get("paimonDlfWarehouseOnOssHdfsQueryCount")
151+
String paimon_dlf_old_catalog_properties = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogOldProperties")
152+
String paimon_dlf_new_catalog_properties1 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties1")
153+
String paimon_dlf_new_catalog_properties2 = context.config.otherConfigs.get("paimonDlfOnOssHdfsCatalogNewProperties2")
154+
155+
testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,true)
156+
testQuery("paimon_dlf_oss_hdfs_old_catalog",paimon_dlf_old_catalog_properties ,query_table_paimon_dlf,query_count_paimon_dlf,false)
157+
testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,true)
158+
testQuery("paimon_dlf_oss_hdfs_new_catalog1",paimon_dlf_new_catalog_properties1 ,query_table_paimon_dlf,query_count_paimon_dlf,false)
159+
testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,true)
160+
testQuery("paimon_dlf_oss_hdfs_new_catalog2",paimon_dlf_new_catalog_properties2 ,query_table_paimon_dlf,query_count_paimon_dlf,false)
148161

149162
//**************** Paimon FILESYSTEM ON OSS_HDFS *******************/
150163
String paimon_fs_warehouse = context.config.otherConfigs.get("paimonFsWarehouseOnOssHdfs")
@@ -161,6 +174,7 @@ suite("oss_hdfs_catalog_test", "p2,external,new_catalog_property") {
161174
testQuery("paimon_fs_oss_hdfs_region_catalog",paimon_file_system_catalog_properties + usingOSSHDFSProps + old_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false)
162175
testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,true)
163176
testQuery("paimon_fs_oss_hdfs_new_catalog",paimon_file_system_catalog_properties + new_oss_hdfs_storage_properties,query_table_paimon_fs,query_count_paimon_fs,false)
177+
164178
//**************** ICEBERG FILESYSTEM ON OSS_HDFS *******************/
165179
String iceberg_file_system_catalog_properties = """
166180
'type'='iceberg',

0 commit comments

Comments
 (0)