Skip to content

Commit 3ed9951

Browse files
Izerenrkhachatryan
authored andcommitted
[FLINK-38592] Add configurable property for file system factories to allow loading multiple factories packaged in the image depending on their priority for the same schema.
1 parent 4d4c309 commit 3ed9951

File tree

9 files changed

+305
-4
lines changed

9 files changed

+305
-4
lines changed

docs/content.zh/docs/deployment/filesystems/common.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,24 @@ fs.<scheme>.limit.stream-timeout: (毫秒,0 表示无穷)
6363

6464
连接数是按每个 TaskManager/文件系统来进行限制的。因为文件系统的创建是按照 scheme 和 authority 进行的,所以不同的 authority 具有独立的连接池,例如 `hdfs://myhdfs:50010/` 和 `hdfs://anotherhdfs:4399/` 会有单独的连接池。
6565

66+
## File System Factory Priority
67+
68+
When multiple `FileSystemFactory` implementations are available for the same URI scheme (for example, during a migration between file system backends), Flink resolves the conflict using a priority mechanism. The factory with the highest priority is selected.
69+
70+
Each factory declares a default priority via its `getPriority()` method (default `0`). You can override the priority for any factory through configuration:
71+
72+
```yaml
73+
fs.<scheme>.priority.<factoryClassName>: <integer>
74+
```
75+
76+
Higher values indicate higher priority. When the configuration key is not set, the factory's declared priority is used.
77+
78+
If two factories end up with the same priority, the winner depends on classloading order, which is non-deterministic. Flink logs a warning in this case. Set explicit priorities to ensure deterministic behavior.
79+
80+
**Example:** Suppose two S3 factory implementations are on the classpath. To select the Hadoop-based factory over the Presto-based one (default priority is zero):
81+
82+
```yaml
83+
fs.s3.priority.org.apache.flink.fs.s3.hadoop.S3FileSystemFactory: 1
84+
```
85+
6686
{{< top >}}

docs/content.zh/docs/deployment/filesystems/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ cp ./opt/flink-s3-fs-hadoop-{{< version >}}.jar ./plugins/s3-fs-hadoop/
7575
- 添加 Service Entry。创建文件 `META-INF/services/org.apache.flink.core.fs.FileSystemFactory`,文件中包含文件系统 Factory 类的类名。
7676
(更多细节请查看 [Java Service Loader docs](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html)
7777

78+
- Optionally, override `getPriority()` in your factory to declare a relative priority. This is useful when multiple factories may be present for the same URI scheme (for example, during a migration between FS backends). The factory with the highest priority is selected. The default priority is `0`. See [Common Configurations]({{< ref "docs/deployment/filesystems/common" >}}#file-system-factory-priority) for details on overriding priority via configuration. For new experimental factories it is recommended to override: `getPriority()` to `return -1`. This way you provide production safe defaults for file system migrations.
79+
7880
在插件检索时,文件系统 Factory 类会由一个专用的 Java 类加载器加载,从而避免与其他类或 Flink 组件冲突。在文件系统实例化和文件系统调用时,应使用该类加载器。
7981

8082
<span class="label label-warning">警告</span> 实际上这表示您的实现应避免使用 `Thread.currentThread().getContextClassLoader()` 类加载器。

docs/content/docs/deployment/filesystems/common.md

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,24 @@ Limit enforcement on a per TaskManager/file system basis.
6969
Because file systems creation occurs per scheme and authority, different
7070
authorities have independent connection pools. For example `hdfs://myhdfs:50010/` and `hdfs://anotherhdfs:4399/` will have separate pools.
7171

72-
{{< top >}}
72+
## File System Factory Priority
73+
74+
When multiple `FileSystemFactory` implementations are available for the same URI scheme (for example, during a migration between file system backends), Flink resolves the conflict using a priority mechanism. The factory with the highest priority is selected.
75+
76+
Each factory declares a default priority via its `getPriority()` method (default `0`). You can override the priority for any factory through configuration:
77+
78+
```yaml
79+
fs.<scheme>.priority.<factoryClassName>: <integer>
80+
```
81+
82+
Higher values indicate higher priority. When the configuration key is not set, the factory's declared priority is used.
83+
84+
If two factories end up with the same priority, the winner depends on classloading order, which is non-deterministic. Flink logs a warning in this case. Set explicit priorities to ensure deterministic behavior.
85+
86+
**Example:** Suppose two S3 factory implementations are on the classpath. To select the Hadoop-based factory over the Presto-based one (default priority is zero):
87+
88+
```yaml
89+
fs.s3.priority.org.apache.flink.fs.s3.hadoop.S3FileSystemFactory: 1
90+
```
91+
92+
{{< top >}}

docs/content/docs/deployment/filesystems/overview.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ To add a new file system:
9292
- Add a service entry. Create a file `META-INF/services/org.apache.flink.core.fs.FileSystemFactory` which contains the class name of your file system factory class
9393
(see the [Java Service Loader docs](https://docs.oracle.com/javase/8/docs/api/java/util/ServiceLoader.html) for more details).
9494

95+
- Optionally, override `getPriority()` in your factory to declare a relative priority. This is useful when multiple factories may be present for the same URI scheme (for example, during a migration between FS backends). The factory with the highest priority is selected. The default priority is `0`. See [Common Configurations]({{< ref "docs/deployment/filesystems/common" >}}#file-system-factory-priority) for details on overriding priority via configuration. For new experimental factories it is recommended to override: `getPriority()` to `return -1`. This way you provide production safe defaults for file system migrations.
96+
9597
During plugins discovery, the file system factory class will be loaded by a dedicated Java class loader to avoid class conflicts with other plugins and Flink components.
9698
The same class loader should be used during file system instantiation and the file system operation calls.
9799

flink-core/src/main/java/org/apache/flink/configuration/CoreOptions.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,4 +567,24 @@ public static ConfigOption<Long> fileSystemConnectionLimitStreamInactivityTimeou
567567
.longType()
568568
.defaultValue(0L);
569569
}
570+
571+
/**
572+
* Explicitly resolves the conflict between multiple FileSystemFactory implementations when
573+
* multiple jars are loaded for the same scheme. Primary use is to allow configuration based
574+
* migration between file systems without the need to build separate images.
575+
*
576+
* <p>Config key pattern: {@code fs.<scheme>.priority.<factoryClassName>}
577+
*/
578+
public static ConfigOption<Integer> fileSystemFactoryPriority(String scheme, String className) {
579+
return ConfigOptions.key("fs." + scheme + ".priority." + className)
580+
.intType()
581+
.noDefaultValue()
582+
.withDescription(
583+
"Priority for the filesystem factory '"
584+
+ className
585+
+ "' when multiple factories register for the '"
586+
+ scheme
587+
+ "' scheme. Higher priority wins. "
588+
+ "When not set, the factory's declared priority is used.");
589+
}
570590
}

flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.flink.core.plugin.PluginManager;
3636
import org.apache.flink.util.ExceptionUtils;
3737
import org.apache.flink.util.TemporaryClassLoaderContext;
38+
import org.apache.flink.util.WrappingProxy;
39+
import org.apache.flink.util.WrappingProxyUtil;
3840

3941
import org.apache.flink.shaded.guava33.com.google.common.base.Splitter;
4042
import org.apache.flink.shaded.guava33.com.google.common.collect.ImmutableMultimap;
@@ -297,6 +299,21 @@ private static void initializeWithoutPlugins(Configuration config)
297299
initialize(config, null);
298300
}
299301

302+
/**
303+
* Returns a list of registered {@link FileSystemFactory FS factories}.
304+
*
305+
* @return a snapshot of the currently registered file system factories
306+
*/
307+
@Internal
308+
public static List<FileSystemFactory> getRegisteredFileSystemFactories() {
309+
LOCK.lock();
310+
try {
311+
return new ArrayList<>(FS_FACTORIES.values());
312+
} finally {
313+
LOCK.unlock();
314+
}
315+
}
316+
300317
/**
301318
* Initializes the shared file system settings.
302319
*
@@ -338,14 +355,48 @@ public static void initialize(Configuration config, @Nullable PluginManager plug
338355
final List<FileSystemFactory> fileSystemFactories =
339356
loadFileSystemFactories(factorySuppliers);
340357

358+
// Track registered priorities for factory selection
359+
final Map<String, Integer> registeredPriorities = new HashMap<>();
360+
341361
// configure all file system factories
342362
for (FileSystemFactory factory : fileSystemFactories) {
343363
factory.configure(config);
344-
String scheme = factory.getScheme();
364+
final String scheme = factory.getScheme();
345365

346366
FileSystemFactory fsf =
347367
ConnectionLimitingFactory.decorateIfLimited(factory, scheme, config);
348-
FS_FACTORIES.put(scheme, fsf);
368+
369+
final String className = resolveFactoryClassName(factory);
370+
final int registeredPriority =
371+
registeredPriorities.getOrDefault(scheme, Integer.MIN_VALUE);
372+
final int newPriority =
373+
config.getOptional(CoreOptions.fileSystemFactoryPriority(scheme, className))
374+
.orElse(factory.getPriority());
375+
376+
LOG.info(
377+
"{} filesystem factory {} for scheme '{}' "
378+
+ "with priority {} (highest registered priority: {})",
379+
newPriority >= registeredPriority ? "Registering" : "Skipping",
380+
className,
381+
scheme,
382+
newPriority,
383+
registeredPriority);
384+
if (newPriority >= registeredPriority) {
385+
FS_FACTORIES.put(scheme, fsf);
386+
registeredPriorities.put(scheme, newPriority);
387+
if (newPriority == registeredPriority) {
388+
LOG.warn(
389+
"Filesystem factory {} overrides a previously registered factory "
390+
+ "for scheme '{}' at the same priority {}. "
391+
+ "The winner depends on classloading order. "
392+
+ "Set fs.{}.priority.<factoryClassName> to assign "
393+
+ "explicit priorities.",
394+
className,
395+
scheme,
396+
newPriority,
397+
scheme);
398+
}
399+
}
349400
}
350401

351402
// configure the default (fallback) factory
@@ -946,6 +997,20 @@ private static void addAllFactoriesToList(
946997
}
947998
}
948999

1000+
/**
1001+
* Resolves the real class name of a {@link FileSystemFactory}, unwrapping {@link
1002+
* PluginFileSystemFactory}.
1003+
*/
1004+
private static String resolveFactoryClassName(FileSystemFactory factory) {
1005+
if (factory instanceof WrappingProxy) {
1006+
@SuppressWarnings("unchecked")
1007+
FileSystemFactory unwrapped =
1008+
WrappingProxyUtil.stripProxy((WrappingProxy<FileSystemFactory>) factory);
1009+
return unwrapped.getClass().getName();
1010+
}
1011+
return factory.getClass().getName();
1012+
}
1013+
9491014
/**
9501015
* Utility loader for the Hadoop file system factory. We treat the Hadoop FS factory in a
9511016
* special way, because we use it as a catch all for file systems schemes not supported directly

flink-core/src/main/java/org/apache/flink/core/fs/FileSystemFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,13 @@ public interface FileSystemFactory extends Plugin {
4747
* @throws IOException Thrown if the file system could not be instantiated.
4848
*/
4949
FileSystem create(URI fsUri) throws IOException;
50+
51+
/**
52+
* Returns the default priority of this factory when multiple factories compete for the same
53+
* scheme. Higher priority wins. Can be overridden via config key {@code
54+
* fs.<scheme>.priority.<factoryClassName>}.
55+
*/
56+
default int getPriority() {
57+
return 0;
58+
}
5059
}

flink-core/src/main/java/org/apache/flink/core/fs/PluginFileSystemFactory.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@
2929
* A wrapper around {@link FileSystemFactory} that ensures the plugin classloader is used for all
3030
* {@link FileSystem} operations.
3131
*/
32-
public class PluginFileSystemFactory implements FileSystemFactory {
32+
public class PluginFileSystemFactory
33+
implements FileSystemFactory, WrappingProxy<FileSystemFactory> {
3334
private final FileSystemFactory inner;
3435
private final ClassLoader loader;
3536

@@ -64,6 +65,16 @@ public FileSystem create(final URI fsUri) throws IOException {
6465
}
6566
}
6667

68+
@Override
69+
public int getPriority() {
70+
return inner.getPriority();
71+
}
72+
73+
@Override
74+
public FileSystemFactory getWrappedDelegate() {
75+
return inner;
76+
}
77+
6778
@Override
6879
public String toString() {
6980
return String.format("Plugin %s", inner.getClass().getName());

0 commit comments

Comments
 (0)