pages, AsyncExternalSourceBuffer buffer) {
- while (pages.hasNext() && buffer.noMoreInputs() == false) {
- var spaceListener = buffer.waitForSpace();
- if (spaceListener.isDone() == false) {
- while (spaceListener.isDone() == false && buffer.noMoreInputs() == false) {
- Thread.onSpinWait();
- }
- }
-
- if (buffer.noMoreInputs()) {
- break;
- }
-
- Page page = pages.next();
- page.allowPassingToDifferentDriver();
- buffer.addPage(page);
- }
+ ExternalSourceDrainUtils.drainPages(pages, buffer);
}
/**
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/DataSourceModule.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/DataSourceModule.java
index be6d3e5882118..1ce7358185dd3 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/DataSourceModule.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/DataSourceModule.java
@@ -9,7 +9,10 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.core.IOUtils;
+import org.elasticsearch.xpack.esql.datasources.spi.ConnectorFactory;
import org.elasticsearch.xpack.esql.datasources.spi.DataSourcePlugin;
+import org.elasticsearch.xpack.esql.datasources.spi.ExternalSourceFactory;
import org.elasticsearch.xpack.esql.datasources.spi.FilterPushdownSupport;
import org.elasticsearch.xpack.esql.datasources.spi.FormatReaderFactory;
import org.elasticsearch.xpack.esql.datasources.spi.SourceOperatorFactoryProvider;
@@ -19,7 +22,9 @@
import java.io.Closeable;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -32,14 +37,14 @@
* This module:
*
* - Discovers all plugins implementing {@link DataSourcePlugin}
- * - Collects storage providers, format readers, table catalog connectors, and operator factories
+ * - Collects storage providers, format readers, and external source factories
* - Populates registries for runtime lookup
* - Validates that no duplicate registrations occur
* - Creates an {@link OperatorFactoryRegistry} for unified operator factory lookup
*
*
* This module implements Closeable to properly release resources held by storage providers
- * (such as HttpClient connections).
+ * (such as HttpClient connections) and managed TableCatalog instances.
*
*
Note: Method names follow the project convention of omitting the "get" prefix.
*/
@@ -47,9 +52,12 @@ public final class DataSourceModule implements Closeable {
private final StorageProviderRegistry storageProviderRegistry;
private final FormatReaderRegistry formatReaderRegistry;
- private final Map tableCatalogs;
- private final Map operatorFactories;
+ private final Map sourceFactories;
+ // FIXME: pluginFactories is a backward-compat bridge for DataSourcePlugin.operatorFactories().
+ // Once plugins migrate to ExternalSourceFactory.operatorFactory(), remove this field.
+ private final Map pluginFactories;
private final FilterPushdownRegistry filterPushdownRegistry;
+ private final List managedCloseables;
private final Settings settings;
public DataSourceModule(
@@ -64,9 +72,11 @@ public DataSourceModule(
Map storageFactories = new HashMap<>();
Map formatFactories = new HashMap<>();
- Map catalogFactories = new HashMap<>();
+ Map sourceFactoryMap = new LinkedHashMap<>();
+ // FIXME: pluginFactories is a backward-compat bridge for DataSourcePlugin.operatorFactories().
+ // Once plugins migrate to ExternalSourceFactory.operatorFactory(), remove this field.
Map operatorFactoryProviders = new HashMap<>();
- Map filterPushdownProviders = new HashMap<>();
+ List closeables = new ArrayList<>();
for (DataSourcePlugin plugin : dataSourcePlugins) {
@@ -86,14 +96,37 @@ public DataSourceModule(
}
}
+ // New unified path: sourceFactories()
+ Map newSourceFactories = plugin.sourceFactories(settings);
+ for (Map.Entry entry : newSourceFactories.entrySet()) {
+ String sourceType = entry.getKey();
+ if (sourceFactoryMap.put(sourceType, entry.getValue()) != null) {
+ throw new IllegalArgumentException("Source factory for type [" + sourceType + "] is already registered");
+ }
+ }
+
+ // Bridge: connectors() → sourceFactoryMap (ConnectorFactory is ExternalSourceFactory)
+ Map newConnectors = plugin.connectors(settings);
+ for (Map.Entry entry : newConnectors.entrySet()) {
+ String connectorType = entry.getKey();
+ if (sourceFactoryMap.put(connectorType, entry.getValue()) != null) {
+ throw new IllegalArgumentException("Source factory for type [" + connectorType + "] is already registered");
+ }
+ }
+
+ // Bridge: tableCatalogs() → create TableCatalog, add to sourceFactoryMap + closeables
Map newCatalogTypes = plugin.tableCatalogs(settings);
for (Map.Entry entry : newCatalogTypes.entrySet()) {
String catalogType = entry.getKey();
- if (catalogFactories.put(catalogType, entry.getValue()) != null) {
- throw new IllegalArgumentException("Table catalog for [" + catalogType + "] is already registered");
+ TableCatalog catalog = entry.getValue().create(settings);
+ closeables.add(catalog);
+ if (sourceFactoryMap.put(catalogType, catalog) != null) {
+ throw new IllegalArgumentException("Source factory for type [" + catalogType + "] is already registered");
}
}
+ // FIXME: standalone operatorFactories() and filterPushdownSupport() on DataSourcePlugin
+ // are unused by all production plugins; remove bridge once confirmed.
Map newOperatorFactories = plugin.operatorFactories(settings);
for (Map.Entry entry : newOperatorFactories.entrySet()) {
String sourceType = entry.getKey();
@@ -101,16 +134,6 @@ public DataSourceModule(
throw new IllegalArgumentException("Operator factory for source type [" + sourceType + "] is already registered");
}
}
-
- Map newFilterPushdown = plugin.filterPushdownSupport(settings);
- for (Map.Entry entry : newFilterPushdown.entrySet()) {
- String sourceType = entry.getKey();
- if (filterPushdownProviders.put(sourceType, entry.getValue()) != null) {
- throw new IllegalArgumentException(
- "Filter pushdown support for source type [" + sourceType + "] is already registered"
- );
- }
- }
}
// Register factories lazily -- providers and readers are created on first access
@@ -122,14 +145,33 @@ public DataSourceModule(
formatReaderRegistry.registerLazy(entry.getKey(), entry.getValue(), settings, blockFactory);
}
- this.tableCatalogs = Map.copyOf(catalogFactories);
- this.operatorFactories = Map.copyOf(operatorFactoryProviders);
+ // Register the framework-internal FileSourceFactory as a catch-all fallback.
+ // It must be last so that plugin-provided factories (Iceberg, Flight) get priority.
+ FileSourceFactory fileFallback = new FileSourceFactory(storageProviderRegistry, formatReaderRegistry, settings);
+ sourceFactoryMap.put("file", fileFallback);
+
+ this.sourceFactories = Map.copyOf(sourceFactoryMap);
+ this.pluginFactories = Map.copyOf(operatorFactoryProviders);
+ this.managedCloseables = List.copyOf(closeables);
+
+ // Build FilterPushdownRegistry from ExternalSourceFactory capabilities
+ Map filterPushdownProviders = new HashMap<>();
+ for (Map.Entry entry : this.sourceFactories.entrySet()) {
+ FilterPushdownSupport fps = entry.getValue().filterPushdownSupport();
+ if (fps != null) {
+ filterPushdownProviders.put(entry.getKey(), fps);
+ }
+ }
+ // FIXME: standalone filterPushdownSupport() bridge omitted — no production plugin uses it.
this.filterPushdownRegistry = new FilterPushdownRegistry(filterPushdownProviders);
}
@Override
public void close() throws IOException {
- storageProviderRegistry.close();
+ List all = new ArrayList<>();
+ all.add(storageProviderRegistry);
+ all.addAll(managedCloseables);
+ IOUtils.close(all);
}
public StorageProviderRegistry storageProviderRegistry() {
@@ -140,8 +182,8 @@ public FormatReaderRegistry formatReaderRegistry() {
return formatReaderRegistry;
}
- public Map operatorFactories() {
- return operatorFactories;
+ public Map sourceFactories() {
+ return sourceFactories;
}
public FilterPushdownRegistry filterPushdownRegistry() {
@@ -149,18 +191,6 @@ public FilterPushdownRegistry filterPushdownRegistry() {
}
public OperatorFactoryRegistry createOperatorFactoryRegistry(Executor executor) {
- return new OperatorFactoryRegistry(operatorFactories, storageProviderRegistry, formatReaderRegistry, executor, settings);
- }
-
- public TableCatalog createTableCatalog(String catalogType, Settings settings) {
- TableCatalogFactory factory = tableCatalogs.get(catalogType);
- if (factory == null) {
- throw new IllegalArgumentException("No table catalog registered for type: " + catalogType);
- }
- return factory.create(settings);
- }
-
- public boolean hasTableCatalog(String catalogType) {
- return tableCatalogs.containsKey(catalogType);
+ return new OperatorFactoryRegistry(sourceFactories, pluginFactories, executor);
}
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceDrainUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceDrainUtils.java
new file mode 100644
index 0000000000000..b82d1067e93ab
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceDrainUtils.java
@@ -0,0 +1,41 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources;
+
+import org.elasticsearch.action.support.PlainActionFuture;
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.core.TimeValue;
+
+/**
+ * Utility for draining pages from a {@link CloseableIterator} into an {@link AsyncExternalSourceBuffer}
+ * with backpressure. Uses blocking wait instead of spin-wait, relying on the buffer's
+ * {@code notifyNotFull()} in {@code finish()} to wake producers when no more input is needed.
+ */
+public final class ExternalSourceDrainUtils {
+
+ private static final TimeValue DRAIN_TIMEOUT = TimeValue.timeValueMinutes(5);
+
+ private ExternalSourceDrainUtils() {}
+
+ public static void drainPages(CloseableIterator pages, AsyncExternalSourceBuffer buffer) {
+ while (pages.hasNext() && buffer.noMoreInputs() == false) {
+ var spaceListener = buffer.waitForSpace();
+ if (spaceListener.isDone() == false) {
+ PlainActionFuture future = new PlainActionFuture<>();
+ spaceListener.addListener(future);
+ future.actionGet(DRAIN_TIMEOUT);
+ }
+ if (buffer.noMoreInputs()) {
+ break;
+ }
+ Page page = pages.next();
+ page.allowPassingToDifferentDriver();
+ buffer.addPage(page);
+ }
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceResolver.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceResolver.java
index 9cd9f3faa5ec3..2b7f949232a21 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceResolver.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/ExternalSourceResolver.java
@@ -15,14 +15,11 @@
import org.elasticsearch.logging.Logger;
import org.elasticsearch.xpack.esql.core.expression.Expression;
import org.elasticsearch.xpack.esql.core.expression.Literal;
-import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
+import org.elasticsearch.xpack.esql.datasources.spi.ExternalSourceFactory;
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
-import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;
import org.elasticsearch.xpack.esql.datasources.spi.StoragePath;
import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider;
-import org.elasticsearch.xpack.esql.datasources.spi.TableCatalog;
-import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
@@ -41,12 +38,10 @@
* Runs asynchronously to avoid blocking
*
*
- * Registry-based resolution: This resolver uses the registries from {@link DataSourceModule}
- * to find appropriate handlers for different source types:
- *
- * - {@link TableCatalog} for table-based sources (Iceberg, Delta Lake)
- * - {@link FormatReader} for file-based sources (Parquet, CSV)
- *
+ * Registry-based resolution: This resolver iterates the {@link ExternalSourceFactory}
+ * instances collected by {@link DataSourceModule} to find the first factory that can handle
+ * a given path. File-based sources (Parquet, CSV) are handled by the framework-internal
+ * {@code FileSourceFactory} registered as a catch-all fallback.
*
* Configuration handling: Query parameters are converted to a generic {@code Map}
* instead of source-specific classes like S3Configuration. This allows the SPI to remain generic
@@ -80,72 +75,57 @@ public void resolve(
return;
}
- // Run resolution asynchronously to avoid blocking
executor.execute(() -> {
try {
- // Use the StorageProviderRegistry from DataSourceModule
- // This registry is populated with all discovered storage providers
- StorageProviderRegistry registry = dataSourceModule.storageProviderRegistry();
- StorageManager storageManager = new StorageManager(registry, settings);
-
- try {
- Map resolved = new HashMap<>();
-
- for (String path : paths) {
- Map params = pathParams.get(path);
-
- // Convert query parameters to generic config map
- Map config = paramsToConfigMap(params);
-
- try {
- ExternalSourceResolution.ResolvedSource resolvedSource = resolveSource(path, config, storageManager);
- resolved.put(path, resolvedSource);
- LOGGER.info("Successfully resolved external source: {}", path);
- } catch (Exception e) {
- LOGGER.error("Failed to resolve external source [{}]: {}", path, e.getMessage(), e);
- String exceptionMessage = e.getMessage();
- String errorDetail = exceptionMessage != null ? exceptionMessage : e.getClass().getSimpleName();
- String errorMessage = String.format(
- Locale.ROOT,
- "Failed to resolve external source [%s]: %s",
- path,
- errorDetail
- );
- listener.onFailure(new RuntimeException(errorMessage, e));
- return;
- }
+ Map resolved = new HashMap<>();
+
+ for (String path : paths) {
+ Map params = pathParams.get(path);
+ Map config = paramsToConfigMap(params);
+
+ try {
+ ExternalSourceResolution.ResolvedSource resolvedSource = resolveSource(path, config);
+ resolved.put(path, resolvedSource);
+ LOGGER.info("Successfully resolved external source: {}", path);
+ } catch (Exception e) {
+ LOGGER.error("Failed to resolve external source [{}]: {}", path, e.getMessage(), e);
+ String exceptionMessage = e.getMessage();
+ String errorDetail = exceptionMessage != null ? exceptionMessage : e.getClass().getSimpleName();
+ String errorMessage = String.format(Locale.ROOT, "Failed to resolve external source [%s]: %s", path, errorDetail);
+ listener.onFailure(new RuntimeException(errorMessage, e));
+ return;
}
-
- listener.onResponse(new ExternalSourceResolution(resolved));
- } finally {
- storageManager.close();
}
+
+ listener.onResponse(new ExternalSourceResolution(resolved));
} catch (Exception e) {
listener.onFailure(e);
}
});
}
- private ExternalSourceResolution.ResolvedSource resolveSource(String path, Map config, StorageManager storageManager)
- throws Exception {
+ private ExternalSourceResolution.ResolvedSource resolveSource(String path, Map config) throws Exception {
LOGGER.info("Resolving external source: path=[{}]", path);
if (GlobExpander.isMultiFile(path)) {
- return resolveMultiFileSource(path, config, storageManager);
+ return resolveMultiFileSource(path, config);
}
- SourceMetadata metadata = resolveSingleSource(path, config, storageManager);
+ SourceMetadata metadata = resolveSingleSource(path, config);
ExternalSourceMetadata extMetadata = wrapAsExternalSourceMetadata(metadata, config);
return new ExternalSourceResolution.ResolvedSource(extMetadata, FileSet.UNRESOLVED);
}
- private ExternalSourceResolution.ResolvedSource resolveMultiFileSource(
- String path,
- Map config,
- StorageManager storageManager
- ) throws Exception {
+ private ExternalSourceResolution.ResolvedSource resolveMultiFileSource(String path, Map config) throws Exception {
StoragePath storagePath = StoragePath.of(path);
- StorageProvider provider = storageManager.provider(storagePath, config);
+ StorageProviderRegistry registry = dataSourceModule.storageProviderRegistry();
+
+ StorageProvider provider;
+ if (config != null && config.isEmpty() == false) {
+ provider = registry.createProvider(storagePath.scheme(), settings, config);
+ } else {
+ provider = registry.provider(storagePath);
+ }
FileSet fileSet;
if (path.indexOf(',') >= 0) {
@@ -158,106 +138,24 @@ private ExternalSourceResolution.ResolvedSource resolveMultiFileSource(
throw new IllegalArgumentException("Glob pattern matched no files: " + path);
}
- // Use the first file to infer format and read metadata
StoragePath firstFile = fileSet.files().get(0).path();
- FormatReaderRegistry formatRegistry = dataSourceModule.formatReaderRegistry();
- FormatReader reader = formatRegistry.byExtension(firstFile.objectName());
-
- StorageObject storageObject = storageManager.newStorageObject(firstFile.toString(), config);
- SourceMetadata metadata = reader.metadata(storageObject);
+ SourceMetadata metadata = resolveSingleSource(firstFile.toString(), config);
ExternalSourceMetadata extMetadata = wrapAsExternalSourceMetadata(metadata, config);
return new ExternalSourceResolution.ResolvedSource(extMetadata, fileSet);
}
- private SourceMetadata resolveSingleSource(String path, Map config, StorageManager storageManager) throws Exception {
- // Strategy 1: Try registered TableCatalogs
- SourceMetadata metadata = tryTableCatalogs(path, config);
- if (metadata != null) {
- LOGGER.debug("Resolved via TableCatalog: {}", metadata.sourceType());
- return metadata;
- }
-
- // Strategy 2: Try FormatReader based on file extension
- metadata = tryFormatReaders(path, config, storageManager);
- if (metadata != null) {
- LOGGER.debug("Resolved via FormatReader: {}", metadata.sourceType());
- return metadata;
- }
-
- // Strategy 3: Fall back to legacy adapters for backward compatibility
- return resolveLegacy(path, config, storageManager);
- }
-
- @Nullable
- private SourceMetadata tryTableCatalogs(String path, Map config) {
- // Check if any registered TableCatalog can handle this path
- // Currently, we check for "iceberg" catalog if the path looks like an Iceberg table
- SourceType detectedType = detectSourceType(path);
-
- if (detectedType == SourceType.ICEBERG && dataSourceModule.hasTableCatalog("iceberg")) {
- try (TableCatalog catalog = dataSourceModule.createTableCatalog("iceberg", settings)) {
- if (catalog.canHandle(path)) {
- return catalog.metadata(path, config);
- }
- } catch (IOException e) {
- LOGGER.debug("TableCatalog 'iceberg' failed for path [{}]: {}", path, e.getMessage());
+ private SourceMetadata resolveSingleSource(String path, Map config) {
+ for (ExternalSourceFactory factory : dataSourceModule.sourceFactories().values()) {
+ if (factory.canHandle(path)) {
+ return factory.resolveMetadata(path, config);
}
}
-
- // Try other registered catalogs
- // Future: iterate over all registered catalogs and check canHandle()
- return null;
- }
-
- @Nullable
- private SourceMetadata tryFormatReaders(String path, Map config, StorageManager storageManager) {
- FormatReaderRegistry formatRegistry = dataSourceModule.formatReaderRegistry();
-
- // Try to get a format reader by file extension
- try {
- FormatReader reader = formatRegistry.byExtension(path);
- if (reader != null) {
- // Get storage object for the path
- StorageObject storageObject = getStorageObject(path, config, storageManager);
- return reader.metadata(storageObject);
- }
- } catch (Exception e) {
- LOGGER.debug("FormatReader lookup failed for path [{}]: {}", path, e.getMessage());
- }
-
- return null;
- }
-
- private SourceMetadata resolveLegacy(String path, Map config, StorageManager storageManager) throws Exception {
- SourceType type = detectSourceType(path);
- LOGGER.info("Attempting legacy resolution for path=[{}], detected type=[{}]", path, type);
-
- // Legacy adapters have been moved to separate modules
throw new UnsupportedOperationException(
- "No handler found for source type ["
- + type
- + "] at path ["
- + path
- + "]. "
- + "Please ensure the appropriate data source plugin is installed."
+ "No handler found for source at path [" + path + "]. " + "Please ensure the appropriate data source plugin is installed."
);
}
- private StorageObject getStorageObject(String path, Map config, StorageManager storageManager) throws Exception {
- StoragePath storagePath = StoragePath.of(path);
- String scheme = storagePath.scheme().toLowerCase(Locale.ROOT);
-
- if ((scheme.equals("http") || scheme.equals("https")) && config.isEmpty()) {
- // For HTTP/HTTPS with no config, use registry-based approach
- return storageManager.newStorageObject(path);
- } else {
- // For S3 and file schemes, or HTTP with config, use config-based approach
- // StorageManager now accepts Map directly
- return storageManager.newStorageObject(path, config);
- }
- }
-
private Map paramsToConfigMap(@Nullable Map params) {
if (params == null || params.isEmpty()) {
return Map.of();
@@ -281,17 +179,24 @@ private Map paramsToConfigMap(@Nullable Map
private ExternalSourceMetadata wrapAsExternalSourceMetadata(SourceMetadata metadata, Map queryConfig) {
if (metadata instanceof ExternalSourceMetadata extMetadata) {
- // If the metadata already carries config (e.g. from a TableCatalog), preserve it.
- // Otherwise, overlay the query-level config (from WITH clause) so that connection
- // parameters (endpoint, credentials) reach the execution phase.
if (extMetadata.config() != null && extMetadata.config().isEmpty() == false) {
return extMetadata;
}
}
- // Create a wrapper that delegates to the SourceMetadata but uses the query-level
- // config. This is scheme-agnostic: S3, HTTP, LOCAL, or any future backend — the
- // config from the WITH clause is forwarded transparently to the execution phase.
+ // Merge the config from resolveMetadata (e.g. endpoint for Flight) with query-level params (WITH clause).
+ // Query-level params take precedence so users can override connector-resolved values.
+ Map mergedConfig;
+ Map metadataConfig = metadata.config();
+ if (metadataConfig != null && metadataConfig.isEmpty() == false) {
+ mergedConfig = new HashMap<>(metadataConfig);
+ if (queryConfig != null) {
+ mergedConfig.putAll(queryConfig);
+ }
+ } else {
+ mergedConfig = queryConfig;
+ }
+
return new ExternalSourceMetadata() {
@Override
public String location() {
@@ -315,30 +220,8 @@ public Map sourceMetadata() {
@Override
public Map config() {
- return queryConfig;
+ return mergedConfig;
}
};
}
-
- private SourceType detectSourceType(String path) {
- String lowerPath = path.toLowerCase(Locale.ROOT);
- boolean isParquet = lowerPath.endsWith(".parquet");
- LOGGER.debug("Detecting source type for path: [{}], ends with .parquet: [{}]", path, isParquet);
-
- if (isParquet) {
- LOGGER.debug("Detected as PARQUET file");
- return SourceType.PARQUET;
- }
-
- // Check if path looks like an Iceberg table path
- // Iceberg tables typically have metadata directories
- // Default to Iceberg if not explicitly Parquet
- LOGGER.debug("Detected as ICEBERG table");
- return SourceType.ICEBERG;
- }
-
- private enum SourceType {
- ICEBERG,
- PARQUET
- }
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java
new file mode 100644
index 0000000000000..633106505a1eb
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/FileSourceFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources;
+
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.xpack.esql.datasources.spi.ExternalSourceFactory;
+import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
+import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
+import org.elasticsearch.xpack.esql.datasources.spi.SourceOperatorFactoryProvider;
+import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;
+import org.elasticsearch.xpack.esql.datasources.spi.StoragePath;
+import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Framework-internal factory that bridges the building-block registries
+ * ({@link StorageProviderRegistry} + {@link FormatReaderRegistry}) into the
+ * unified {@link ExternalSourceFactory} contract.
+ *
+ * This is NOT an SPI extension — it is never returned by any DataSourcePlugin.
+ * It is created by {@link DataSourceModule} itself and registered as a catch-all
+ * fallback entry (key {@code "file"}) in the sourceFactories map.
+ */
+final class FileSourceFactory implements ExternalSourceFactory {
+
+ private final StorageProviderRegistry storageRegistry;
+ private final FormatReaderRegistry formatRegistry;
+ private final Settings settings;
+
+ FileSourceFactory(StorageProviderRegistry storageRegistry, FormatReaderRegistry formatRegistry, Settings settings) {
+ if (storageRegistry == null) {
+ throw new IllegalArgumentException("storageRegistry cannot be null");
+ }
+ if (formatRegistry == null) {
+ throw new IllegalArgumentException("formatRegistry cannot be null");
+ }
+ this.storageRegistry = storageRegistry;
+ this.formatRegistry = formatRegistry;
+ this.settings = settings != null ? settings : Settings.EMPTY;
+ }
+
+ @Override
+ public String type() {
+ return "file";
+ }
+
+ @Override
+ public boolean canHandle(String location) {
+ if (location == null) {
+ return false;
+ }
+ try {
+ StoragePath path = StoragePath.of(location);
+ String scheme = path.scheme();
+ String objectName = path.objectName();
+ if (objectName == null || objectName.isEmpty()) {
+ return false;
+ }
+ int lastDot = objectName.lastIndexOf('.');
+ if (lastDot < 0 || lastDot == objectName.length() - 1) {
+ return false;
+ }
+ if (storageRegistry.hasProvider(scheme) == false) {
+ return false;
+ }
+ // byExtension triggers lazy init of format readers, so it correctly
+ // discovers extensions even on first call (unlike hasExtension).
+ formatRegistry.byExtension(objectName);
+ return true;
+ } catch (IllegalArgumentException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public SourceMetadata resolveMetadata(String location, Map config) {
+ try {
+ StoragePath storagePath = StoragePath.of(location);
+ String scheme = storagePath.scheme();
+
+ StorageProvider provider;
+ if (config != null && config.isEmpty() == false) {
+ provider = storageRegistry.createProvider(scheme, settings, config);
+ } else {
+ provider = storageRegistry.provider(storagePath);
+ }
+
+ StorageObject storageObject = provider.newObject(storagePath);
+ FormatReader reader = formatRegistry.byExtension(storagePath.objectName());
+ return reader.metadata(storageObject);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Failed to resolve metadata for [" + location + "]", e);
+ }
+ }
+
+ @Override
+ public SourceOperatorFactoryProvider operatorFactory() {
+ return context -> {
+ StoragePath path = context.path();
+ Map config = context.config();
+
+ StorageProvider storage;
+ if (config != null && config.isEmpty() == false) {
+ storage = storageRegistry.createProvider(path.scheme(), settings, config);
+ } else {
+ storage = storageRegistry.provider(path);
+ }
+
+ FormatReader format = formatRegistry.byExtension(path.objectName());
+
+ return new AsyncExternalSourceOperatorFactory(
+ storage,
+ format,
+ path,
+ context.attributes(),
+ context.batchSize(),
+ context.maxBufferSize(),
+ context.executor(),
+ context.fileSet()
+ );
+ };
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/OperatorFactoryRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/OperatorFactoryRegistry.java
index d24e30ce1d89e..593ee25ed589f 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/OperatorFactoryRegistry.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/OperatorFactoryRegistry.java
@@ -7,14 +7,17 @@
package org.elasticsearch.xpack.esql.datasources;
-import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.compute.operator.SourceOperator;
-import org.elasticsearch.xpack.esql.datasources.spi.FormatReader;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+import org.elasticsearch.xpack.esql.datasources.spi.Connector;
+import org.elasticsearch.xpack.esql.datasources.spi.ConnectorFactory;
+import org.elasticsearch.xpack.esql.datasources.spi.ExternalSourceFactory;
+import org.elasticsearch.xpack.esql.datasources.spi.QueryRequest;
import org.elasticsearch.xpack.esql.datasources.spi.SourceOperatorContext;
import org.elasticsearch.xpack.esql.datasources.spi.SourceOperatorFactoryProvider;
-import org.elasticsearch.xpack.esql.datasources.spi.StoragePath;
-import org.elasticsearch.xpack.esql.datasources.spi.StorageProvider;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -24,116 +27,78 @@
* This registry provides a single entry point for creating source operator factories.
* It supports two modes:
*
- * - Plugin factories: Custom factories registered by plugins for complex
- * datasources (Iceberg, Delta Lake) that need specialized logic.
- * - Generic factory: Falls back to {@link AsyncExternalSourceOperatorFactory}
- * for simple formats (CSV, JSON, Parquet) using the StorageProvider and FormatReader
- * abstractions.
+ * - ExternalSourceFactory: Unified factories for connectors, table catalogs,
+ * and file-based sources. ConnectorFactory instances are dispatched via the connector
+ * protocol; file-based sources go through {@code FileSourceFactory.operatorFactory()};
+ * other factories use their {@code operatorFactory()} capability.
+ * - Plugin factories: Backward-compat bridge for {@code DataSourcePlugin.operatorFactories()}.
*
- *
- * The lookup order is:
- *
- * - Check if a plugin has registered a custom factory for the source type
- * - If not, use the generic async factory with storage and format registries
- *
- *
- * Note: Method names follow the project convention of omitting the "get" prefix.
*/
public class OperatorFactoryRegistry {
+ private final Map sourceFactories;
+ // FIXME: pluginFactories is a backward-compat bridge for DataSourcePlugin.operatorFactories().
+ // Once plugins migrate to ExternalSourceFactory.operatorFactory(), remove this field.
private final Map pluginFactories;
- private final StorageProviderRegistry storageRegistry;
- private final FormatReaderRegistry formatRegistry;
private final Executor executor;
- private final Settings settings;
public OperatorFactoryRegistry(
+ Map sourceFactories,
Map pluginFactories,
- StorageProviderRegistry storageRegistry,
- FormatReaderRegistry formatRegistry,
- Executor executor,
- Settings settings
+ Executor executor
) {
- if (storageRegistry == null) {
- throw new IllegalArgumentException("storageRegistry cannot be null");
- }
- if (formatRegistry == null) {
- throw new IllegalArgumentException("formatRegistry cannot be null");
- }
if (executor == null) {
throw new IllegalArgumentException("executor cannot be null");
}
+ this.sourceFactories = sourceFactories != null ? Map.copyOf(sourceFactories) : Map.of();
this.pluginFactories = pluginFactories != null ? Map.copyOf(pluginFactories) : Map.of();
- this.storageRegistry = storageRegistry;
- this.formatRegistry = formatRegistry;
this.executor = executor;
- this.settings = settings != null ? settings : Settings.EMPTY;
}
public SourceOperator.SourceOperatorFactory factory(SourceOperatorContext context) {
String sourceType = context.sourceType();
- // 1. Plugin provides custom factory? Use it.
- if (sourceType != null && pluginFactories.containsKey(sourceType)) {
- return pluginFactories.get(sourceType).create(context);
- }
-
- // 2. Otherwise: generic async factory (handles CSV, JSON, Parquet, etc.)
- StoragePath path = context.path();
-
- // Resolve the storage provider.
- // When the context carries per-query config (e.g. endpoint, credentials from WITH clause),
- // create a fresh provider with that config. This is scheme-agnostic: S3, HTTP, LOCAL, or any
- // future backend — the config is forwarded to the SPI factory's create(settings, config).
- // Schemes that don't override that method simply ignore the config via the default delegation.
- // TODO: Per-query providers created here via createProvider() are not closed after the operator
- // finishes. Factories don't have a close lifecycle, and the provider must stay alive while
- // StorageObject streams are active. Full lifecycle tracking would require plumbing a Releasable
- // through the driver framework. For now this is acceptable because SPI-created providers
- // (e.g. S3StorageProvider) typically wrap pooled/shared clients.
- Map config = context.config();
- StorageProvider storage;
- if (config != null && config.isEmpty() == false) {
- storage = storageRegistry.createProvider(path.scheme(), settings, config);
- } else {
- storage = storageRegistry.provider(path);
+ ExternalSourceFactory sf = sourceType != null ? sourceFactories.get(sourceType) : null;
+ if (sf != null) {
+ if (sf instanceof ConnectorFactory cf) {
+ Connector connector = cf.open(context.config());
+ List projectedColumns = new ArrayList<>(context.attributes().size());
+ for (Attribute attr : context.attributes()) {
+ projectedColumns.add(attr.name());
+ }
+ // Use the target from resolved config (e.g. path component for Flight URIs);
+ // fall back to the full path string for connectors that don't set it.
+ Object targetObj = context.config().get("target");
+ String target = targetObj != null ? targetObj.toString() : context.path().toString();
+ QueryRequest request = new QueryRequest(
+ target,
+ projectedColumns,
+ context.attributes(),
+ context.config(),
+ context.batchSize(),
+ null
+ );
+ return new AsyncConnectorSourceOperatorFactory(connector, request, context.maxBufferSize(), executor);
+ }
+ SourceOperatorFactoryProvider opFactory = sf.operatorFactory();
+ if (opFactory != null) {
+ return opFactory.create(context);
+ }
}
- FormatReader format = formatRegistry.byExtension(path.objectName());
-
- if (storage == null) {
- throw new IllegalArgumentException("No storage provider registered for scheme: " + path.scheme());
- }
- if (format == null) {
- throw new IllegalArgumentException("No format reader registered for file: " + path.objectName());
+ // FIXME: backward-compat bridge for standalone pluginFactories
+ if (sourceType != null && pluginFactories.containsKey(sourceType)) {
+ return pluginFactories.get(sourceType).create(context);
}
- return new AsyncExternalSourceOperatorFactory(
- storage,
- format,
- path,
- context.attributes(),
- context.batchSize(),
- context.maxBufferSize(),
- executor,
- context.fileSet()
- );
+ throw new IllegalArgumentException("No operator factory for sourceType: " + sourceType);
}
public boolean hasPluginFactory(String sourceType) {
- return sourceType != null && pluginFactories.containsKey(sourceType);
- }
-
- public StorageProviderRegistry storageRegistry() {
- return storageRegistry;
- }
-
- public FormatReaderRegistry formatRegistry() {
- return formatRegistry;
+ return sourceType != null && (sourceFactories.containsKey(sourceType) || pluginFactories.containsKey(sourceType));
}
public Executor executor() {
return executor;
}
-
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Connector.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Connector.java
new file mode 100644
index 0000000000000..86ce9c4e74179
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Connector.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * A live connection to an external data source.
+ * Opened by {@link ConnectorFactory#open} and closed after query execution.
+ */
+public interface Connector extends Closeable {
+
+ default List discoverSplits(QueryRequest request) {
+ return List.of(Split.SINGLE);
+ }
+
+ ResultCursor execute(QueryRequest request, Split split);
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ConnectorFactory.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ConnectorFactory.java
new file mode 100644
index 0000000000000..5cd56d24fb287
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ConnectorFactory.java
@@ -0,0 +1,26 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+import java.util.Map;
+
+/**
+ * Factory for creating connectors to external data sources.
+ * Each factory handles a specific protocol or source type (e.g. "flight", "jdbc").
+ * Schema resolution lives here to avoid creating throwaway {@link Connector} instances.
+ */
+public interface ConnectorFactory extends ExternalSourceFactory {
+
+ String type();
+
+ boolean canHandle(String location);
+
+ SourceMetadata resolveMetadata(String location, Map config);
+
+ Connector open(Map config);
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/DataSourcePlugin.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/DataSourcePlugin.java
index 41ff6923ca98c..591f40bed1aa4 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/DataSourcePlugin.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/DataSourcePlugin.java
@@ -48,6 +48,14 @@ default Map formatReaders(Settings settings) {
return Map.of();
}
+ // Complete external source factories
+ default Map sourceFactories(Settings settings) {
+ return Map.of();
+ }
+
+ // FIXME: the methods below are superseded by sourceFactories() and ExternalSourceFactory capabilities.
+ // Migrate plugins from connectors()/tableCatalogs() to sourceFactories(),
+ // and from operatorFactories()/filterPushdownSupport() to ExternalSourceFactory methods.
default Map tableCatalogs(Settings settings) {
return Map.of();
}
@@ -56,6 +64,10 @@ default Map operatorFactories(Settings se
return Map.of();
}
+ default Map connectors(Settings settings) {
+ return Map.of();
+ }
+
default Map filterPushdownSupport(Settings settings) {
return Map.of();
}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ExternalSourceFactory.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ExternalSourceFactory.java
new file mode 100644
index 0000000000000..3282ad81e7c0e
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ExternalSourceFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+import java.util.Map;
+
+/**
+ * Common interface for complete external data source factories.
+ * Both API-based connectors (Flight, JDBC) and table-based catalogs (Iceberg)
+ * implement this interface, enabling unified resolution and dispatch.
+ *
+ * Building-block factories (StorageProviderFactory, FormatReaderFactory) are NOT
+ * part of this hierarchy — they are composed by the framework for file-based sources.
+ */
+public interface ExternalSourceFactory {
+
+ String type();
+
+ boolean canHandle(String location);
+
+ SourceMetadata resolveMetadata(String location, Map config);
+
+ default FilterPushdownSupport filterPushdownSupport() {
+ return null;
+ }
+
+ default SourceOperatorFactoryProvider operatorFactory() {
+ return null;
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/QueryRequest.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/QueryRequest.java
new file mode 100644
index 0000000000000..f7080286c431d
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/QueryRequest.java
@@ -0,0 +1,32 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+import org.elasticsearch.compute.data.BlockFactory;
+import org.elasticsearch.xpack.esql.core.expression.Attribute;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Describes a query to execute against a connector.
+ * Immutable; use {@link #withBlockFactory} to create a copy bound to a specific driver context.
+ */
+public record QueryRequest(
+ String target,
+ List projectedColumns,
+ List attributes,
+ Map config,
+ int batchSize,
+ BlockFactory blockFactory
+) {
+
+ public QueryRequest withBlockFactory(BlockFactory blockFactory) {
+ return new QueryRequest(target, projectedColumns, attributes, config, batchSize, blockFactory);
+ }
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ResultCursor.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ResultCursor.java
new file mode 100644
index 0000000000000..25234825593c9
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/ResultCursor.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+import org.elasticsearch.compute.data.Page;
+import org.elasticsearch.xpack.esql.datasources.CloseableIterator;
+
+/**
+ * Streaming cursor over query results from a connector.
+ * Each call to {@link #next()} returns a {@link Page} of columnar data.
+ */
+public interface ResultCursor extends CloseableIterator {
+
+ default void cancel() {}
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Split.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Split.java
new file mode 100644
index 0000000000000..d5151fb278c53
--- /dev/null
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/Split.java
@@ -0,0 +1,22 @@
+/*
+ * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
+ * or more contributor license agreements. Licensed under the Elastic License
+ * 2.0; you may not use this file except in compliance with the Elastic License
+ * 2.0.
+ */
+
+package org.elasticsearch.xpack.esql.datasources.spi;
+
+/**
+ * Represents a unit of work for a connector to execute.
+ * Connectors that support parallel reads return multiple splits from
+ * {@link Connector#discoverSplits}; simple connectors use {@link #SINGLE}.
+ */
+public interface Split {
+ Split SINGLE = new Split() {
+ @Override
+ public String toString() {
+ return "Split[SINGLE]";
+ }
+ };
+}
diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/TableCatalog.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/TableCatalog.java
index 7b06a171dae1e..626e75990fba6 100644
--- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/TableCatalog.java
+++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/datasources/spi/TableCatalog.java
@@ -29,7 +29,7 @@
* Implementations typically reuse a FormatReader (e.g., ParquetFormatReader)
* for actual data reading after planning which files to read.
*/
-public interface TableCatalog extends Closeable {
+public interface TableCatalog extends ExternalSourceFactory, Closeable {
String catalogType();
@@ -39,10 +39,26 @@ public interface TableCatalog extends Closeable {
List planScan(String tablePath, Map config, List