[FLUSS-2686] Add COS filesystem support#2836
[FLUSS-2686] Add COS filesystem support#2836XuQianJin-Stars wants to merge 2 commits intoapache:mainfrom
Conversation
8f2358a to
186ef5d
Compare
This adds a new fluss-fs-cos module that integrates Tencent Cloud COS (Cloud Object Storage) as a remote filesystem for Fluss. Key changes: - Add fluss-fs-cos module with hadoop-cos and cos_api dependencies - Implement COSFileSystem extending HadoopFileSystem (scheme: cosn) - Implement COSFileSystemPlugin as FileSystemPlugin SPI - Add security token support (COSSecurityTokenProvider/Receiver) - Add DynamicTemporaryCOSCredentialsProvider for temporary credentials - Add integration tests for COS filesystem behavior - Register module in fluss-filesystems parent pom
186ef5d to
e1151e8
Compare
There was a problem hiding this comment.
Pull request overview
Adds a new Fluss filesystem plugin module to support Tencent Cloud Object Storage (COS) via Hadoop’s CosNFileSystem, including SPI registration and integration tests for COS access.
Changes:
- Introduces new
fluss-fs-cosmodule with COS filesystem implementation, token receiver/provider, and SPI service registrations. - Adds COS integration tests (static credentials + token-based flow) gated by environment-provided credentials.
- Updates parent filesystem build to include the COS module and adjusts test-coverage exclusions.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-test-coverage/pom.xml | Excludes COS filesystem packages from coverage aggregation (consistent with other FS plugins). |
| fluss-filesystems/pom.xml | Registers fluss-fs-cos as a submodule. |
| fluss-filesystems/fluss-fs-cos/pom.xml | Adds COS module dependencies, shading, and multi-release JAXB handling. |
| fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystem.java | Wraps Hadoop FS and adds Fluss security-token acquisition hook. |
| fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/COSFileSystemPlugin.java | Implements FileSystemPlugin for cosn and translates Fluss config to Hadoop config. |
| fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenProvider.java | Provides a Fluss ObtainedSecurityToken intended for COS credential propagation. |
| fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/COSSecurityTokenReceiver.java | Receives Fluss tokens and configures Hadoop COS credential provider + extra config. |
| fluss-filesystems/fluss-fs-cos/src/main/java/org/apache/fluss/fs/cos/token/DynamicTemporaryCOSCredentialsProvider.java | Supplies COS SDK credentials dynamically from received tokens. |
| fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.FileSystemPlugin | SPI registration of COSFileSystemPlugin. |
| fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/services/org.apache.fluss.fs.token.SecurityTokenReceiver | SPI registration of COSSecurityTokenReceiver. |
| fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/licenses/LICENSE.jaxb | Bundled CDDL license text for JAXB dependency. |
| fluss-filesystems/fluss-fs-cos/src/main/resources/META-INF/NOTICE | Declares bundled dependencies and licensing notice for the module. |
| fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSFileSystemBehaviorITCase.java | COS behavior IT using static secretId/secretKey. |
| fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorBaseITCase.java | Base for token-based COS behavior tests; initializes FS with static creds first. |
| fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSWithTokenFileSystemBehaviorITCase.java | Token-based COS behavior IT wiring receiver + re-init. |
| fluss-filesystems/fluss-fs-cos/src/test/java/org/apache/fluss/fs/cos/COSTestCredentials.java | Reads COS IT credentials from environment variables and gates tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| public ObtainedSecurityToken obtainSecurityToken(String scheme) { | ||
| // For COS, we directly use the configured secret id and secret key as the token. | ||
| // If STS temporary credentials are needed in the future, this can be extended | ||
| // to call Tencent Cloud STS API to get temporary credentials. | ||
| Map<String, String> additionInfo = new HashMap<>(); | ||
| // we need to put endpoint as addition info | ||
| if (endpoint != null) { | ||
| additionInfo.put(ENDPOINT_KEY, endpoint); | ||
| } | ||
|
|
||
| Credentials credentials = new Credentials(secretId, secretKey, null); | ||
| byte[] tokenBytes = CredentialsJsonSerde.toJson(credentials); | ||
|
|
||
| // token does not expire when using static credentials | ||
| return new ObtainedSecurityToken(scheme, tokenBytes, Long.MAX_VALUE, additionInfo); | ||
| } |
There was a problem hiding this comment.
This provider serializes secretId/secretKey with a null security token and sets Long.MAX_VALUE expiry, but the receiver converts tokens into BasicSessionCredentials (session-based) which generally expects a non-null session token. Either (a) actually obtain STS temporary credentials (accessKey/secretKey/sessionToken + real expiry) and serialize them, or (b) treat the propagated token as static credentials end-to-end (adjust receiver/provider to use non-session COS credentials) and rename/comment accordingly to avoid implying STS behavior.
| LOG.debug("Providing session credentials"); | ||
| return new BasicSessionCredentials( | ||
| credentials.getCOSAccessKeyId(), | ||
| credentials.getCOSSecretKey(), | ||
| ((BasicSessionCredentials) credentials).getSessionToken()); |
There was a problem hiding this comment.
This unconditionally casts credentials to BasicSessionCredentials, which will throw ClassCastException if the receiver ever stores a different COSCredentials implementation (e.g., static/basic credentials). Consider removing the cast by storing the session token separately, or branching with instanceof and returning the appropriate credential type based on what was received.
| LOG.debug("Providing session credentials"); | |
| return new BasicSessionCredentials( | |
| credentials.getCOSAccessKeyId(), | |
| credentials.getCOSSecretKey(), | |
| ((BasicSessionCredentials) credentials).getSessionToken()); | |
| if (credentials instanceof BasicSessionCredentials) { | |
| BasicSessionCredentials sessionCredentials = (BasicSessionCredentials) credentials; | |
| LOG.debug("Providing session credentials"); | |
| return new BasicSessionCredentials( | |
| sessionCredentials.getCOSAccessKeyId(), | |
| sessionCredentials.getCOSSecretKey(), | |
| sessionCredentials.getSessionToken()); | |
| } else { | |
| LOG.debug("Providing non-session COS credentials"); | |
| return credentials; | |
| } |
| LOG.info( | ||
| "Session credentials updated successfully with access key: {}.", | ||
| credentials.getCOSAccessKeyId()); |
There was a problem hiding this comment.
Avoid logging credential identifiers (even access key IDs) at INFO level, as they are typically treated as sensitive and can leak via centralized logs. Consider removing the access key from the message, masking it (e.g., last 4 chars), and/or lowering the log level to DEBUG.
| LOG.info( | |
| "Session credentials updated successfully with access key: {}.", | |
| credentials.getCOSAccessKeyId()); | |
| LOG.info("Session credentials updated successfully."); |
| // then, set addition info | ||
| if (additionInfos == null) { | ||
| // if addition info is null, it also means we have not received any token, | ||
| throw new RuntimeException("Credentials is not ready."); |
There was a problem hiding this comment.
The exception message is grammatically incorrect and not very actionable. Consider using a more specific message that indicates the real cause (e.g., token/credentials not yet received via onNewTokensObtained, so the receiver cannot update Hadoop config) and—if applicable—include what to configure/call first.
| throw new RuntimeException("Credentials is not ready."); | |
| throw new RuntimeException( | |
| "Cannot update Hadoop configuration: COS credentials and additional " | |
| + "information have not been received yet. Ensure " | |
| + "onNewTokensObtained(...) has been called with valid tokens " | |
| + "before invoking COSSecurityTokenReceiver.updateHadoopConfig()."); |
| String providers = hadoopConfig.get(CREDENTIALS_PROVIDER, ""); | ||
|
|
||
| if (!providers.contains(credentialsProviderName)) { | ||
| if (providers.isEmpty()) { | ||
| LOG.debug("Setting provider"); | ||
| providers = credentialsProviderName; | ||
| } else { | ||
| providers = credentialsProviderName + "," + providers; | ||
| LOG.debug("Prepending provider, new providers value: {}", providers); | ||
| } | ||
| hadoopConfig.set(CREDENTIALS_PROVIDER, providers); | ||
| } else { | ||
| LOG.debug("Provider already exists"); | ||
| } |
There was a problem hiding this comment.
Using String#contains on a comma-separated provider list can yield false positives (e.g., partial class-name matches). Consider parsing the list as comma-separated tokens (trimmed) and checking for exact equality before prepending.
| private static final Logger LOG = LoggerFactory.getLogger(COSSecurityTokenReceiver.class); | ||
|
|
||
| static volatile COSCredentials credentials; | ||
| static volatile Map<String, String> additionInfos; |
There was a problem hiding this comment.
The field name additionInfos is awkward/unclear; additionalInfos (or additionalInfo if singular) is more standard and improves readability throughout this class.
| // then, set addition info | ||
| if (additionInfos == null) { |
There was a problem hiding this comment.
The field name additionInfos is awkward/unclear; additionalInfos (or additionalInfo if singular) is more standard and improves readability throughout this class.
| // if addition info is null, it also means we have not received any token, | ||
| throw new RuntimeException("Credentials is not ready."); | ||
| } else { | ||
| for (Map.Entry<String, String> entry : additionInfos.entrySet()) { |
There was a problem hiding this comment.
The field name additionInfos is awkward/unclear; additionalInfos (or additionalInfo if singular) is more standard and improves readability throughout this class.
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-jar-plugin</artifactId> | ||
| <executions> | ||
| <execution> | ||
| <goals> | ||
| <goal>jar</goal> | ||
| </goals> | ||
| </execution> | ||
| </executions> | ||
| <configuration> | ||
| <archive> | ||
| <manifestEntries> | ||
| <!-- jaxb-api is packaged as an optional dependency that is only accessible on Java 11 --> | ||
| <Multi-Release>true</Multi-Release> | ||
| </manifestEntries> | ||
| </archive> | ||
| </configuration> | ||
| </plugin> |
There was a problem hiding this comment.
The same maven-jar-plugin is declared twice. This is easy to misread and can lead to configuration being overridden unexpectedly depending on Maven’s model merging. Consider merging the jar and test-jar executions into a single maven-jar-plugin declaration so the Multi-Release manifest configuration applies consistently.
| <plugin> | ||
| <groupId>org.apache.maven.plugins</groupId> | ||
| <artifactId>maven-jar-plugin</artifactId> | ||
| <executions> | ||
| <execution> | ||
| <goals> | ||
| <goal>test-jar</goal> | ||
| </goals> | ||
| </execution> | ||
| </executions> | ||
| </plugin> |
There was a problem hiding this comment.
The same maven-jar-plugin is declared twice. This is easy to misread and can lead to configuration being overridden unexpectedly depending on Maven’s model merging. Consider merging the jar and test-jar executions into a single maven-jar-plugin declaration so the Multi-Release manifest configuration applies consistently.
Purpose
Linked issue: close #2686
This PR adds support for Tencent Cloud Object Storage (COS) as a remote filesystem for Fluss. COS is one of the most widely used cloud storage services in China and this integration enables users to store Fluss remote data on COS, similar to the existing OSS, OBS, S3, and GCS filesystem integrations.
Brief change log
fluss-fs-cos/pom.xml: Add module configuration with dependencies onhadoop-cos(3.3.5),cos_api(5.6.139), andfluss-fs-hadoopfor Hadoop filesystem abstraction.COSFileSystem: ExtendsHadoopFileSystemto wrap Hadoop'sCosNFileSystem, with lazy-initialized security token provider support for obtaining temporary credentials.COSFileSystemPlugin: ImplementsFileSystemPluginSPI for thecosnscheme. Handles Hadoop configuration translation from Fluss config (prefixfs.cosn.*), supports three credential modes: static secret key, custom credentials provider, and dynamic security token viaCOSSecurityTokenReceiver.COSSecurityTokenProvider: Generates temporary security tokens using Tencent Cloud STS (Security Token Service) based on existingsecretId/secretKeyconfiguration.COSSecurityTokenReceiver: ImplementsSecurityTokenReceiverSPI to receive and apply security tokens, configuringDynamicTemporaryCOSCredentialsProvideras the Hadoop credentials provider.DynamicTemporaryCOSCredentialsProvider: Implements COS SDK'sCOSCredentialsProviderinterface, providingBasicSessionCredentialsfrom tokens received viaCOSSecurityTokenReceiver.fluss-filesystems/pom.xml: Registerfluss-fs-cosas a sub-module.Tests
COSFileSystemBehaviorITCase: Integration test for basic COS filesystem behavior (create, read, write, delete) using static credentials (secretId/secretKey).COSWithTokenFileSystemBehaviorITCase: Integration test for COS filesystem behavior using dynamic security tokens obtained via STS.COSWithTokenFileSystemBehaviorBaseITCase: Base class for token-based filesystem tests, handling filesystem initialization with both static credentials and security tokens.COSTestCredentials: Test utility that reads COS credentials and endpoint from environment variables (COSN_SECRET_ID,COSN_SECRET_KEY,COSN_ENDPOINT,COSN_BUCKET).API and Format
No existing API or storage format changes. This PR only adds a new filesystem plugin that registers via SPI (
META-INF/services/org.apache.fluss.fs.FileSystemPlugin).New configuration keys introduced (all following standard Hadoop COS conventions):
fs.cosn.endpoint— COS region endpoint (e.g.,ap-guangzhou)fs.cosn.userinfo.secretId— Tencent Cloud secret IDfs.cosn.userinfo.secretKey— Tencent Cloud secret keyfs.cosn.credentials.provider— Custom credentials provider classDocumentation
This PR introduces a new feature. Documentation for COS filesystem configuration should be added to the Fluss documentation site in a follow-up. Usage example: