Skip to content

[Filesystem] Support use Hadoop dependencies from environment variables HADOOP_CLASSPATH#1359

Merged
wuchong merged 13 commits intoapache:mainfrom
beryllw:hadoop-fs
Sep 28, 2025
Merged

[Filesystem] Support use Hadoop dependencies from environment variables HADOOP_CLASSPATH#1359
wuchong merged 13 commits intoapache:mainfrom
beryllw:hadoop-fs

Conversation

@beryllw
Copy link
Contributor

@beryllw beryllw commented Jul 17, 2025

Purpose

Linked issue: close #1358

Brief change log

Tests

API and Format

Documentation

Enable HDFS and Paimon plugins to utilize the host machine's Hadoop environment, and document the process for configuring Paimon with a Hive catalog.

@luoyuxia luoyuxia self-assigned this Aug 7, 2025
Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beryllw Overall, LGTM. I append a small commit to improve the doc

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test is failed

@Internal
public static final String[] PARENT_FIRST_HDFS_PATTERNS =
new String[] {
"hdfs-site", "core-site", "org.apache.hadoop.", "META-INF",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, seems hdfs-site, core-site, META-INF is a must to configure? But I'm still wondering why it don't work if not configured thems. At begining, I think it may cause it load the hdfs-site.xml in plugin/hdfs/fluss-fs-hadoop-0.8-SNAPSHOT.jar, but I unzip plugin/hdfs/fluss-fs-hadoop-0.8-SNAPSHOT.jar, can't find any hdfs-site.xml file, a little of confused..

Copy link
Contributor Author

@beryllw beryllw Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, seems hdfs-site, core-site, META-INF is a must to configure?

hdfs-site and core-site are used to load machine hdfs environment conf. When users work with an existing Hadoop environment on their machines, they typically want to use their own HDFS configurations.
Alternatively, configurations can be set via the fluss.hadoop.* prefix, as detailed in fluss doc.

META-INF is used to load related classes via SPI.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the explanation. But if user already set HADOOP_HOME, won't hdfs-site and core-site in HADOOP_HOME be loaded? See HadoopUtils#getHadoopConfiguration, or some wrong in here?

META-INF is for loading some classes that hadoop required?

Copy link
Contributor Author

@beryllw beryllw Aug 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if user already set HADOOP_HOME, won't hdfs-site and core-site in HADOOP_HOME be loaded?

Yes, but for plugin ClassLoader, without classloader.parent-first-patterns.default it cannot retrieve the hdfs-site resource.
The same problem exists for META-INF.

org.apache.hadoop. only for hadoop class resource, make plugin ClassLoader can retrieve hadoop class but not hdfs-site and META-INF resource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also curious about this. Why Flink doesn't need to include hdfs-site, core-site, META-INF into parent first pattern? Is there any specifc plugin that requires to read the hdfs-site and META-INF from the classloader by the plugin itself?

Copy link
Contributor Author

@beryllw beryllw Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After re-testing, I found that these three files are not required when Kerberos is not enabled, but the core-site file is needed when Kerberos authentication is enabled.

Failed test case:

remote.data.dir: hdfs://xxxxxx/rbf/data/fluss

datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: hdfs://xxxxxx/rbf/warehouse

plugin.classloader.parent-first-patterns.default: java.,com.alibaba.fluss.,javax.annotation.,org.slf4j,org.apache.log4j,org.apache.logging,org.apache.commons.logging,ch.qos.logback,org.apache.hadoop

Key error logs:

2025-09-03 10:24:26,304 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login
2025-09-03 10:24:40,781 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login commit
2025-09-03 10:24:40,782 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - Using user: "xxxxx" with name xxxxx
2025-09-03 10:24:40,782 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - User entry: "xxxxx"
2025-09-03 10:24:40,782 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - UGI loginUser:xxxxx (auth:SIMPLE)

Successful test case:

remote.data.dir: hdfs://xxxxxx/rbf/data/fluss

datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: hdfs://xxxxxx/rbf/warehouse

plugin.classloader.parent-first-patterns.default: java.,com.alibaba.fluss.,javax.annotation.,org.slf4j,org.apache.log4j,org.apache.logging,org.apache.commons.logging,ch.qos.logback,org.apache.hadoop,core-site

Key logs:

2025-09-03 11:20:51,175 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login
2025-09-03 11:20:51,177 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login commit
2025-09-03 11:20:51,177 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - using kerberos user:xxxxx@HADOOP.XXXX.COM
2025-09-03 11:20:51,177 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - Using user: "xxxxx@HADOOP.XXXX.COM" with name xxxxx@HADOOP.XXXX.COM
2025-09-03 11:20:51,178 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - User entry: "xxxxx@HADOOP.XXXX.COM"
2025-09-03 11:20:51,178 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - UGI loginUser:xxxxx@HADOOP.XXXX.COM (auth:KERBEROS)

The configuration below also fails with the same error as the first failure:

remote.data.dir: hdfs://xxxxxx/rbf/data/fluss

datalake.format: paimon
datalake.paimon.metastore: filesystem
datalake.paimon.warehouse: hdfs://xxxxxx/rbf/warehouse

plugin.classloader.parent-first-patterns.default: java.,com.alibaba.fluss.,javax.annotation.,org.slf4j,org.apache.log4j,org.apache.logging,org.apache.commons.logging,ch.qos.logback,org.apache.hadoop

fluss.hadoop.hadoop.security.authentication: kerberos
fluss.hadoop.hadoop.security.authorization: true

Key error logs:

2025-09-03 11:24:15,457 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login
2025-09-03 11:24:15,457 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - hadoop login commit
2025-09-03 11:24:15,458 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - Using user: "xxxxx" with name xxxxx
2025-09-03 11:24:15,458 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - User entry: "xxxxx"
2025-09-03 11:24:15,458 DEBUG org.apache.hadoop.security.UserGroupInformation              [] - UGI loginUser:xxxxx (auth:SIMPLE)

The reason is that when initializing the org.apache.hadoop.security.UserGroupInformation class, it uses ensureInitialized to create an empty Configuration, which by default loads the core-site configuration file in the current class loader path.
Flink uses a plugin class loader for Hadoop-related classes, but Flink has its own org.apache.flink.runtime.security module that loads configuration files from the Hadoop environment to set up UserGroupInformation's Conf. This might be why Flink can work without including core-site.

//org.apache.hadoop.security.UserGroupInformation#ensureInitialized
  private static void ensureInitialized() {
    if (!isInitialized()) {
      synchronized(UserGroupInformation.class) {
        if (!isInitialized()) { // someone might have beat us
          initialize(new Configuration(), false);
        }
      }
    }
  }
//org.apache.hadoop.conf.Configuration#static
static {
    // Add default resources
    addDefaultResource("core-default.xml");
    addDefaultResource("core-site.xml");

    // print deprecation warning if hadoop-site.xml is found in classpath
    ClassLoader cL = Thread.currentThread().getContextClassLoader();
    if (cL == null) {
      cL = Configuration.class.getClassLoader();
    }
    if (cL.getResource("hadoop-site.xml") != null) {
      LOG.warn("DEPRECATED: hadoop-site.xml found in the classpath. " +
          "Usage of hadoop-site.xml is deprecated. Instead use core-site.xml, "
          + "mapred-site.xml and hdfs-site.xml to override properties of " +
          "core-default.xml, mapred-default.xml and hdfs-default.xml " +
          "respectively");
      addDefaultResource("hadoop-site.xml");
    }
  }

Copy link
Contributor

@luoyuxia luoyuxia Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@beryllw Thanks for the great work!! Let's me have a summary, please correct me if I'm wrong.
So, It turns out to be:
option1:
Only add org.apache.hadoop. to parent first pattern in Fluss code, works for when Kerberos is not enabled. When Kerberos is enabled, users must add core-site.xml to parent first pattern manually.

option2: Add org.apache.hadoop., core-site.xml to parent first pattern in Fluss code, works whatever Kerberos is enabled or not. But after we introduce security module, we can also remove core-site.xml to parent first pattern.

I perfer option2 since it require users do nothing in Kerberos enabled env. And I do meet some users are in Kerberos enabled env. @wuchong What do you think of it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Option 3: Adding the configuration fluss.hadoop.hadoop.security.authentication does not work. The root cause is that when initializing the UserGroupInformation class, it uses an empty Configuration instance instead of our Fluss-specific Hadoop configuration.
I noticed that Flink includes a security module which is responsible for setting the configuration of UserGroupInformation — this might be relevant to our issue. For reference, see the following link:
https://github.com/apache/flink/blob/78136133fbec4ca145dec66d4bc0c324c8e16d82/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java#L68-L70

@beryllw
Copy link
Contributor Author

beryllw commented Aug 12, 2025

Test is failed

It seems the failure is related to the shadedPattern. Could we retrigger the test? I can't figure out the issue.
The test com.alibaba.fluss.lake.paimon.tiering.ReCreateSameTableAfterTieringTest.testReCreateSameTablepassed successfully in my local Java 8 environment.

<relocation>
    <pattern>org.apache.commons</pattern>
    <shadedPattern>org.apache.fluss.shaded.org.apache.commons</shadedPattern>
</relocation>

@wuchong
Copy link
Member

wuchong commented Aug 12, 2025

Why we have to shade the org.apache.commons?

@beryllw
Copy link
Contributor Author

beryllw commented Aug 12, 2025

Test is failed

Same problem in ReCreateSameTableAfterTieringTest.testReCreateSameTable, I will try to fix it.

@beryllw
Copy link
Contributor Author

beryllw commented Aug 29, 2025

Shade the org.apache.commons package in the Fluss component to avoid conflicts with Hadoop's built-in version.
env: hadoop-3.2.2, fluss-0.7
Use the system's Hadoop dependencies and integrate them into Fluss's classpath by executing:FLUSS_CLASSPATH="$FLUSS_CLASSPATH":"$HADOOP_CLASSPATH".

Exception in thread "main" java.lang.NoSuchMethodError: 'org.apache.commons.cli.Option$Builder org.apache.commons.cli.Option.builder(java.lang.String)'
        at com.alibaba.fluss.server.cli.CommandLineOptions.<clinit>(CommandLineOptions.java:31)
        at com.alibaba.fluss.server.cli.ServerConfigurationParserFactory.options(ServerConfigurationParserFactory.java:34)
        at com.alibaba.fluss.server.cli.ServerConfigurationParserFactory.getOptions(ServerConfigurationParserFactory.java:42)
        at com.alibaba.fluss.server.cli.CommandLineParser.parse(CommandLineParser.java:40)
        at com.alibaba.fluss.server.utils.ConfigurationParserUtils.loadCommonConfiguration(ConfigurationParserUtils.java:53)
        at com.alibaba.fluss.server.ServerBase.loadConfiguration(ServerBase.java:76)
        at com.alibaba.fluss.server.coordinator.CoordinatorServer.main(CoordinatorServer.java:152)
grep -r 'org.apache.commons.cli.Option' /lib/hadoop
Binary file /lib/hadoop/lib/commons-cli-1.2.jar matches

grep -r 'org.apache.commons.cli.Option' /usr/local/fluss
Binary file /usr/local/fluss/plugins/paimon/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar matches
Binary file /usr/local/fluss/lib/fluss-server-0.7.0.jar matches

The conflict arises because both versions are present in the classpath:
Hadoop 3.2.2 provides commons-cli-1.2.jar in /lib/hadoop/lib/
Fluss 0.7.0 contains newer versions in fluss-server-0.7.0.jar and flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

Another dependency conflict:

2025-08-29 18:15:15,787 ERROR com.alibaba.fluss.server.ServerBase                          [] - Could not start the CoordinatorServer.
java.lang.NullPointerException: Cannot invoke "org.apache.commons.lang3.JavaVersion.atLeast(org.apache.commons.lang3.JavaVersion)" because "org.apache.commons.lang3.SystemUtils.JAVA_SPECIFICATION_VERSION_AS_ENUM" is null
        at org.apache.commons.lang3.SystemUtils.isJavaVersionAtLeast(SystemUtils.java:1654) ~[commons-lang3-3.7.jar:3.7]
        at com.alibaba.fluss.utils.MapUtils.newConcurrentHashMap(MapUtils.java:30) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.rpc.netty.client.NettyClient.<init>(NettyClient.java:81) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.rpc.RpcClient.create(RpcClient.java:42) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.coordinator.CoordinatorServer.startServices(CoordinatorServer.java:212) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.ServerBase.start(ServerBase.java:118) [fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.ServerBase.startServer(ServerBase.java:88) [fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.coordinator.CoordinatorServer.main(CoordinatorServer.java:154) [fluss-server-0.7.0.jar:0.7.0]
2025-08-29 18:15:15,793 ERROR com.alibaba.fluss.server.ServerBase                          [] - Could not start CoordinatorServer.
com.alibaba.fluss.exception.FlussException: Failed to start the CoordinatorServer.
        at com.alibaba.fluss.server.ServerBase.start(ServerBase.java:131) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.ServerBase.startServer(ServerBase.java:88) [fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.coordinator.CoordinatorServer.main(CoordinatorServer.java:154) [fluss-server-0.7.0.jar:0.7.0]
Caused by: java.lang.NullPointerException: Cannot invoke "org.apache.commons.lang3.JavaVersion.atLeast(org.apache.commons.lang3.JavaVersion)" because "org.apache.commons.lang3.SystemUtils.JAVA_SPECIFICATION_VERSION_AS_ENUM" is null
        at org.apache.commons.lang3.SystemUtils.isJavaVersionAtLeast(SystemUtils.java:1654) ~[commons-lang3-3.7.jar:3.7]
        at com.alibaba.fluss.utils.MapUtils.newConcurrentHashMap(MapUtils.java:30) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.rpc.netty.client.NettyClient.<init>(NettyClient.java:81) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.rpc.RpcClient.create(RpcClient.java:42) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.coordinator.CoordinatorServer.startServices(CoordinatorServer.java:212) ~[fluss-server-0.7.0.jar:0.7.0]
        at com.alibaba.fluss.server.ServerBase.start(ServerBase.java:118) ~[fluss-server-0.7.0.jar:0.7.0]
        ... 2 more

@luoyuxia
Copy link
Contributor

luoyuxia commented Sep 4, 2025

@beryllw Test fail due to the shade of org/apache/commons. See

Error:  org.apache.fluss.lake.paimon.tiering.ReCreateSameTableAfterTieringTest.testReCreateSameTable  Time elapsed: 2.084 s  <<< ERROR!
java.lang.NoClassDefFoundError: org/apache/commons/io/output/NullPrintStream
	at org.apache.flink.runtime.taskexecutor.TaskManagerRunner.startTaskManager(TaskManagerRunner.java:611)
	at org.apache.flink.runtime.minicluster.MiniCluster.startTaskManager(MiniCluster.java:767)
	at org.apache.flink.runtime.minicluster.MiniCluster.startTaskManagers(MiniCluster.java:748)
	at org.apache.flink.runtime.minicluster.MiniCluster.start(MiniCluster.java:459)
	at org.apache.flink.client.program.PerJobMiniClusterFactory.submitJob(PerJobMiniClusterFactory.java:76)
	at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:84)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2472)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2453)
	at org.apache.fluss.flink.tiering.LakeTieringJobBuilder.build(LakeTieringJobBuilder.java:113)
	at org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase.buildTieringJob(FlinkPaimonTieringTestBase.java:129)
	at org.apache.fluss.lake.paimon.tiering.ReCreateSameTableAfterTieringTest.testReCreateSameTable(ReCreateSameTableAfterTieringTest.java:71)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)

@beryllw
Copy link
Contributor Author

beryllw commented Sep 4, 2025

@beryllw Test fail due to the shade of org/apache/commons.

commons-io dependency conflict.

Copy link
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chagnes looks good to me. Thank you @beryllw and @luoyuxia for the great work. I rebased the branch and modified the docs a bit, and I will merge this if you have no concerns.

Copy link
Contributor

@luoyuxia luoyuxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wuchong Thanks for the doc update. It looks good to me

@beryllw
Copy link
Contributor Author

beryllw commented Sep 28, 2025

LGTM! Thanks for helping improve the doc!

@wuchong wuchong merged commit 78ba99b into apache:main Sep 28, 2025
6 checks passed
leosanqing pushed a commit to leosanqing/fluss that referenced this pull request Sep 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Filesystem] HDFS/Paimon Plugin Support for Leveraging Host System's Hadoop Environment

3 participants