Skip to content

Commit 6679e41

Browse files
authored
ESQL: Make datasources plugins lazy (elastic#142815)
Defer heavy plugin factory methods (storageProviders, formatReaders, connectors, tableCatalogs) until first query access so that AWS SDK, Parquet, and Arrow classes are not loaded at node startup. DataSourcePlugin: added supportedSchemes/Formats/Extensions/Catalogs and supportedConnectorSchemes for cheap capability declarations DataSourceCapabilities: new record aggregating all plugin capabilities at startup without triggering classloading DataSourceModule: lazy delegating factories per-plugin via LazyPluginState, LazyConnectorFactory, LazyTableCatalogWrapper FormatReaderRegistry: added registerExtension for pre-registration without lazy init; removed fallback init-all-formats path ExternalSourceResolver: early scheme validation via capabilities; introduced UnsupportedSchemeException replacing string matching FileSourceFactory: canHandle uses hasExtension instead of byExtension GrpcDataSourcePlugin: migrated to supportedConnectorSchemes DataSourceModuleLazyLoadingTests: spy-based tests verifying per-plugin isolation and lazy loading guarantees Developed using AI-assisted tooling
1 parent cf28432 commit 6679e41

File tree

20 files changed

+1044
-96
lines changed

20 files changed

+1044
-96
lines changed

docs/changelog/142815.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 142815
4+
summary: Make datasources plugins lazy
5+
type: enhancement

x-pack/plugin/esql-datasource-csv/src/main/java/org/elasticsearch/xpack/esql/datasource/csv/CsvDataSourcePlugin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.datasources.spi.FormatReaderFactory;
1414

1515
import java.util.Map;
16+
import java.util.Set;
1617

1718
/**
1819
* Data source plugin that provides CSV format support for ESQL external data sources.
@@ -36,6 +37,16 @@
3637
*/
3738
public class CsvDataSourcePlugin extends Plugin implements DataSourcePlugin {
3839

40+
@Override
41+
public Set<String> supportedFormats() {
42+
return Set.of("csv");
43+
}
44+
45+
@Override
46+
public Set<String> supportedExtensions() {
47+
return Set.of(".csv", ".tsv");
48+
}
49+
3950
@Override
4051
public Map<String, FormatReaderFactory> formatReaders(Settings settings) {
4152
return Map.of("csv", (s, blockFactory) -> new CsvFormatReader(blockFactory));

x-pack/plugin/esql-datasource-gcs/src/main/java/org/elasticsearch/xpack/esql/datasource/gcs/GcsDataSourcePlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import java.util.Map;
1717
import java.util.Objects;
18+
import java.util.Set;
1819

1920
/**
2021
* Data source plugin providing Google Cloud Storage support for ESQL.
@@ -28,6 +29,11 @@
2829
*/
2930
public class GcsDataSourcePlugin extends Plugin implements DataSourcePlugin {
3031

32+
@Override
33+
public Set<String> supportedSchemes() {
34+
return Set.of("gs");
35+
}
36+
3137
@Override
3238
public Map<String, StorageProviderFactory> storageProviders(Settings settings) {
3339
StorageProviderFactory gcsFactory = new StorageProviderFactory() {

x-pack/plugin/esql-datasource-grpc/src/main/java/org/elasticsearch/xpack/esql/datasource/grpc/GrpcDataSourcePlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,19 @@
1313
import org.elasticsearch.xpack.esql.datasources.spi.DataSourcePlugin;
1414

1515
import java.util.Map;
16+
import java.util.Set;
1617

1718
/**
1819
* Registers the Arrow Flight connector for ESQL.
1920
* Handles {@code flight://} and {@code grpc://} URIs for columnar data streaming via gRPC.
2021
*/
2122
public class GrpcDataSourcePlugin extends Plugin implements DataSourcePlugin {
2223

24+
@Override
25+
public Set<String> supportedConnectorSchemes() {
26+
return Set.of("flight", "grpc");
27+
}
28+
2329
@Override
2430
public Map<String, ConnectorFactory> connectors(Settings settings) {
2531
return Map.of("flight", new FlightConnectorFactory());

x-pack/plugin/esql-datasource-http/src/main/java/org/elasticsearch/xpack/esql/datasource/http/HttpDataSourcePlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import org.elasticsearch.xpack.esql.datasources.spi.StorageProviderFactory;
1515

1616
import java.util.Map;
17+
import java.util.Set;
1718
import java.util.concurrent.ExecutorService;
1819

1920
/**
@@ -35,6 +36,11 @@
3536
*/
3637
public class HttpDataSourcePlugin extends Plugin implements DataSourcePlugin {
3738

39+
@Override
40+
public Set<String> supportedSchemes() {
41+
return Set.of("http", "https", "file");
42+
}
43+
3844
@Override
3945
public Map<String, StorageProviderFactory> storageProviders(Settings settings, ExecutorService executor) {
4046
return Map.of(

x-pack/plugin/esql-datasource-iceberg/src/main/java/org/elasticsearch/xpack/esql/datasource/iceberg/IcebergDataSourcePlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.datasources.spi.TableCatalogFactory;
1414

1515
import java.util.Map;
16+
import java.util.Set;
1617

1718
/**
1819
* Data source plugin that provides Iceberg table catalog support for ESQL external data sources.
@@ -37,6 +38,11 @@
3738
*/
3839
public class IcebergDataSourcePlugin extends Plugin implements DataSourcePlugin {
3940

41+
@Override
42+
public Set<String> supportedCatalogs() {
43+
return Set.of("iceberg");
44+
}
45+
4046
@Override
4147
public Map<String, TableCatalogFactory> tableCatalogs(Settings settings) {
4248
return Map.of("iceberg", s -> new IcebergTableCatalog());

x-pack/plugin/esql-datasource-parquet/src/main/java/org/elasticsearch/xpack/esql/datasource/parquet/ParquetDataSourcePlugin.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.elasticsearch.xpack.esql.datasources.spi.FormatReaderFactory;
1414

1515
import java.util.Map;
16+
import java.util.Set;
1617

1718
/**
1819
* Data source plugin that provides Parquet format support for ESQL external data sources.
@@ -36,6 +37,16 @@
3637
*/
3738
public class ParquetDataSourcePlugin extends Plugin implements DataSourcePlugin {
3839

40+
@Override
41+
public Set<String> supportedFormats() {
42+
return Set.of("parquet");
43+
}
44+
45+
@Override
46+
public Set<String> supportedExtensions() {
47+
return Set.of(".parquet");
48+
}
49+
3950
@Override
4051
public Map<String, FormatReaderFactory> formatReaders(Settings settings) {
4152
return Map.of("parquet", (s, blockFactory) -> new ParquetFormatReader(blockFactory));

x-pack/plugin/esql-datasource-s3/src/main/java/org/elasticsearch/xpack/esql/datasource/s3/S3DataSourcePlugin.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,19 @@
1414
import org.elasticsearch.xpack.esql.datasources.spi.StorageProviderFactory;
1515

1616
import java.util.Map;
17+
import java.util.Set;
1718

1819
/**
1920
* Data source plugin providing S3 storage support for ESQL.
2021
* Supports s3://, s3a://, and s3n:// URI schemes.
2122
*/
2223
public class S3DataSourcePlugin extends Plugin implements DataSourcePlugin {
2324

25+
@Override
26+
public Set<String> supportedSchemes() {
27+
return Set.of("s3", "s3a", "s3n");
28+
}
29+
2430
@Override
2531
public Map<String, StorageProviderFactory> storageProviders(Settings settings) {
2632
StorageProviderFactory s3Factory = new StorageProviderFactory() {
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.esql.datasources;
9+
10+
import org.elasticsearch.xpack.esql.datasources.spi.DataSourcePlugin;
11+
12+
import java.util.Collections;
13+
import java.util.LinkedHashSet;
14+
import java.util.List;
15+
import java.util.Locale;
16+
import java.util.Set;
17+
18+
/**
19+
* Aggregated capability declarations from all registered {@link DataSourcePlugin} instances.
20+
* Built once at startup from cheap SPI calls; does not store plugin references.
21+
*/
22+
public record DataSourceCapabilities(Set<String> schemes, Set<String> formats, Set<String> extensions, Set<String> catalogs) {
23+
24+
public boolean supportsScheme(String s) {
25+
return s != null && schemes.contains(s.toLowerCase(Locale.ROOT));
26+
}
27+
28+
public boolean supportsFormat(String f) {
29+
return f != null && formats.contains(f.toLowerCase(Locale.ROOT));
30+
}
31+
32+
public boolean supportsExtension(String ext) {
33+
if (ext == null) {
34+
return false;
35+
}
36+
String normalized = ext.toLowerCase(Locale.ROOT);
37+
if (normalized.startsWith(".") == false) {
38+
normalized = "." + normalized;
39+
}
40+
return extensions.contains(normalized);
41+
}
42+
43+
public boolean supportsCatalog(String c) {
44+
return c != null && catalogs.contains(c.toLowerCase(Locale.ROOT));
45+
}
46+
47+
public String supportedSchemesString() {
48+
return String.join(", ", schemes);
49+
}
50+
51+
public static DataSourceCapabilities build(List<DataSourcePlugin> plugins) {
52+
Set<String> allSchemes = new LinkedHashSet<>();
53+
Set<String> allFormats = new LinkedHashSet<>();
54+
Set<String> allExtensions = new LinkedHashSet<>();
55+
Set<String> allCatalogs = new LinkedHashSet<>();
56+
57+
for (DataSourcePlugin plugin : plugins) {
58+
for (String scheme : plugin.supportedSchemes()) {
59+
allSchemes.add(scheme.toLowerCase(Locale.ROOT));
60+
}
61+
for (String scheme : plugin.supportedConnectorSchemes()) {
62+
allSchemes.add(scheme.toLowerCase(Locale.ROOT));
63+
}
64+
for (String format : plugin.supportedFormats()) {
65+
String lower = format.toLowerCase(Locale.ROOT);
66+
if (allFormats.contains(lower)) {
67+
throw new IllegalArgumentException("Format reader for [" + format + "] is already registered");
68+
}
69+
allFormats.add(lower);
70+
}
71+
for (String ext : plugin.supportedExtensions()) {
72+
String normalized = ext.toLowerCase(Locale.ROOT);
73+
if (normalized.startsWith(".") == false) {
74+
normalized = "." + normalized;
75+
}
76+
allExtensions.add(normalized);
77+
}
78+
for (String catalog : plugin.supportedCatalogs()) {
79+
String lower = catalog.toLowerCase(Locale.ROOT);
80+
if (allCatalogs.contains(lower)) {
81+
throw new IllegalArgumentException("Source factory for type [" + catalog + "] is already registered");
82+
}
83+
allCatalogs.add(lower);
84+
}
85+
}
86+
87+
return new DataSourceCapabilities(
88+
Collections.unmodifiableSet(allSchemes),
89+
Collections.unmodifiableSet(allFormats),
90+
Collections.unmodifiableSet(allExtensions),
91+
Collections.unmodifiableSet(allCatalogs)
92+
);
93+
}
94+
}

0 commit comments

Comments
 (0)