diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index 41e46fb86..95fdb3324 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -334,3 +334,15 @@ Still not supported in Python API.
Please also take Oracle connection pooling into account.
Please refer to the `JdbcXaSinkFunction` documentation for more details.
+
+## License of JDBC driver for Elasticsearch
+
+Flink's JDBC connector defines a Maven dependency on the "JDBC driver for Elasticsearch", which is licensed under
+the Elastic License 2.0.
+
+Flink itself neither reuses source code from the "JDBC driver for Elasticsearch"
+nor packages binaries from the "JDBC driver for Elasticsearch".
+
+Users that create and publish derivative work based on Flink's JDBC connector (thereby re-distributing
+the "JDBC driver for Elasticsearch") must be aware that this may be subject to conditions declared in
+the Elastic License 2.0.
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index 056f2ab59..534b29978 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -45,18 +45,18 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
-| Driver | Group Id | Artifact Id | JAR |
-|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
-| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
-| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
-| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
-| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
-| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
-| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
-
+| Driver | Group Id | Artifact Id | JAR |
+|:--------------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
+| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
+| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
+| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
+| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
+| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
+| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [Download](https://www.elastic.co/downloads/jdbc-client) |
JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
@@ -656,7 +656,7 @@ SELECT * FROM `custom_schema.test_table2`;
Data Type Mapping
----------------
-Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
+Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase, Elasticsearch. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
@@ -670,6 +670,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
Trino type |
OceanBase MySQL mode type |
OceanBase Oracle mode type |
+ Elastic SQL type |
}}">Flink SQL type |
@@ -684,6 +685,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT |
TINYINT |
|
+ BYTE |
TINYINT |
@@ -706,6 +708,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
SMALLINT
TINYINT UNSIGNED
|
+ SHORT |
SMALLINT |
@@ -728,6 +731,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
MEDIUMINT
SMALLINT UNSIGNED
|
+ INTEGER |
INT |
@@ -748,6 +752,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
BIGINT
INT UNSIGNED
|
+
+ LONG
+ UNSIGNED_LONG |
BIGINT |
@@ -760,6 +767,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
|
BIGINT UNSIGNED |
|
+ |
DECIMAL(20, 0) |
@@ -778,6 +786,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT |
BINARY_FLOAT |
+
+ FLOAT
+ HALF_FLOAT |
FLOAT |
@@ -796,6 +807,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DOUBLE |
DOUBLE |
BINARY_DOUBLE |
+
+ DOUBLE
+ SCALED_FLOAT |
DOUBLE |
@@ -824,6 +838,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT(s)
NUMBER(p, s) |
+ |
DECIMAL(p, s) |
@@ -841,6 +856,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT(1)
|
BOOLEAN |
+ BOOLEAN |
DATE |
@@ -852,6 +868,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DATE |
DATE |
DATE |
+ |
DATE |
@@ -864,6 +881,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TIME_WITHOUT_TIME_ZONE |
TIME [(p)] |
DATE |
+ |
TIME [(p)] [WITHOUT TIMEZONE] |
@@ -879,6 +897,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TIMESTAMP_WITHOUT_TIME_ZONE |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+ DATETIME |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
@@ -927,6 +946,11 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
NCHAR(n)
VARCHAR2(n)
CLOB
+
+ KEYWORD
+ IP
+ TEXT
+ VERSION |
STRING |
@@ -952,6 +976,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
RAW(s)
BLOB |
+ BINARY |
BYTES |
@@ -964,9 +989,22 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
ARRAY |
|
|
+ |
ARRAY |
+## License of JDBC driver for Elasticsearch
+
+Flink's JDBC connector defines a Maven dependency on the "JDBC driver for Elasticsearch", which is licensed under
+the Elastic License 2.0.
+
+Flink itself neither reuses source code from the "JDBC driver for Elasticsearch"
+nor packages binaries from the "JDBC driver for Elasticsearch".
+
+Users that create and publish derivative work based on Flink's JDBC connector (thereby re-distributing
+the "JDBC driver for Elasticsearch") must be aware that this may be subject to conditions declared in
+the Elastic License 2.0.
+
{{< top >}}
diff --git a/flink-connector-jdbc/pom.xml b/flink-connector-jdbc/pom.xml
index f85861fe5..d90f2ce7a 100644
--- a/flink-connector-jdbc/pom.xml
+++ b/flink-connector-jdbc/pom.xml
@@ -42,6 +42,7 @@ under the License.
21.8.0.0
418
1.12.10
+ 8.13.1
--add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.lang=ALL-UNNAMED
@@ -114,6 +115,14 @@ under the License.
provided
+
+
+ org.elasticsearch.plugin
+ x-pack-sql-jdbc
+ ${elasticsearch.version}
+ provided
+
+
@@ -250,6 +259,31 @@ under the License.
test
+
+
+ org.testcontainers
+ elasticsearch
+ test
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ ${elasticsearch.version}
+ test
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ 2.13.4.2
+ test
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ 2.13.4
+ test
+
+
org.apache.flink
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java
new file mode 100644
index 000000000..fca18ca43
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialect.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** JDBC dialect for Elastic. */
+@Internal
+public class ElasticsearchDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
+ // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+
+ @Override
+ public String dialectName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The list of types supported by Elastic SQL.
+ // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+ return EnumSet.of(
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.VARCHAR);
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new ElasticsearchRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return '"' + identifier + '"';
+ }
+
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ throw new UnsupportedOperationException("Upsert is not supported.");
+ }
+
+ @Override
+ public String getInsertIntoStatement(String tableName, String[] fieldNames) {
+ throw new UnsupportedOperationException("Insert into is not supported.");
+ }
+
+ @Override
+ public String getUpdateStatement(
+ String tableName, String[] fieldNames, String[] conditionFields) {
+ throw new UnsupportedOperationException("Update is not supported.");
+ }
+
+ @Override
+ public String getDeleteStatement(String tableName, String[] conditionFields) {
+ throw new UnsupportedOperationException("Delete is not supported.");
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java
new file mode 100644
index 000000000..4b1c223c1
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectFactory.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link ElasticsearchDialect}. */
+public class ElasticsearchDialectFactory implements JdbcDialectFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new ElasticsearchDialect();
+ }
+}
diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java
new file mode 100644
index 000000000..14b58b6c3
--- /dev/null
+++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchRowConverter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.sql.Timestamp;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * Elasticsearch.
+ */
+public class ElasticsearchRowConverter extends AbstractJdbcRowConverter {
+ private static final long serialVersionUID = 1L;
+
+ public ElasticsearchRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public String converterName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ case DOUBLE:
+ case FLOAT:
+ return val -> val;
+ case DATE:
+ return val ->
+ (int) (((Timestamp) val).toLocalDateTime().toLocalDate().toEpochDay());
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index 9a3cf26cb..a4cf80318 100644
--- a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -22,3 +22,4 @@ org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFact
org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory
org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory
org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialectFactory
+org.apache.flink.connector.jdbc.databases.elasticsearch.dialect.ElasticsearchDialectFactory
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java
new file mode 100644
index 000000000..26eb54db9
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/ElasticsearchTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+import org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchDatabase;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for Elasticsearch testing. */
+@ExtendWith(ElasticsearchDatabase.class)
+public interface ElasticsearchTestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return ElasticsearchDatabase.getMetadata();
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectTypeTest.java
new file mode 100644
index 000000000..cfab90fba
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchDialectTypeTest.java
@@ -0,0 +1,44 @@
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The Elasticsearch params for {@link JdbcDialectTypeTest}. */
+public class ElasticsearchDialectTypeTest extends JdbcDialectTypeTest {
+
+ @Override
+ protected String testDialect() {
+ return "elasticsearch";
+ }
+
+ @Override
+ protected List testData() {
+ return Arrays.asList(
+ createTestItem("VARCHAR"),
+ createTestItem("BOOLEAN"),
+ createTestItem("TINYINT"),
+ createTestItem("SMALLINT"),
+ createTestItem("INTEGER"),
+ createTestItem("BIGINT"),
+ createTestItem("FLOAT"),
+ createTestItem("DOUBLE"),
+ createTestItem("DATE"),
+ createTestItem("TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+ createTestItem("VARBINARY"),
+
+ // Not valid data
+ createTestItem("CHAR", "The Elasticsearch dialect doesn't support type: CHAR(1)."),
+ createTestItem(
+ "BINARY", "The Elasticsearch dialect doesn't support type: BINARY(1)."),
+ createTestItem("TIME", "The Elasticsearch dialect doesn't support type: TIME(0)."),
+ createTestItem(
+ "VARBINARY(10)",
+ "The Elasticsearch dialect doesn't support type: VARBINARY(10)."),
+ createTestItem(
+ "DECIMAL(10, 4)",
+ "The Elasticsearch dialect doesn't support type: DECIMAL(10, 4)."));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchPreparedStatementTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchPreparedStatementTest.java
new file mode 100644
index 000000000..b618334dd
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/dialect/ElasticsearchPreparedStatementTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ElasticsearchPreparedStatementTest}. */
+public class ElasticsearchPreparedStatementTest {
+
+ private final JdbcDialect dialect =
+ JdbcDialectLoader.load(
+ "jdbc:elasticsearch://localhost:9200/test", getClass().getClassLoader());
+
+ private final String[] fieldNames =
+ new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
+ private final String[] keyFields = new String[] {"id", "__field_3__"};
+ private final String tableName = "tbl";
+
+ @Test
+ void testRowExistsStatement() {
+ String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
+ assertThat(rowExistStmt)
+ .isEqualTo(
+ "SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
+ }
+
+ @Test
+ void testSelectStatement() {
+ String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
+ assertThat(selectStmt)
+ .isEqualTo(
+ "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" "
+ + "FROM \"tbl\" "
+ + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java
new file mode 100644
index 000000000..091f8c5e7
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.databases.elasticsearch.table;
+
+import org.apache.flink.connector.jdbc.databases.elasticsearch.ElasticsearchTestBase;
+import org.apache.flink.connector.jdbc.databases.elasticsearch.dialect.ElasticsearchDialect;
+import org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata;
+import org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchRestClient;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
+import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchBulkBuilder.createBulkContent;
+import static org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchIndexSchemaBuilder.buildIndexSchema;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link ElasticsearchDialect}. */
+public class ElasticsearchDynamicTableSourceITCase extends AbstractTestBase
+ implements ElasticsearchTestBase {
+
+ private ElasticsearchRestClient client;
+ private TableEnvironment tEnv;
+ private final TableRow inputTable = createInputTable();
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ client = new ElasticsearchRestClient((ElasticsearchMetadata) getMetadata());
+
+ client.createIndex(inputTable.getTableName(), buildIndexSchema(inputTable));
+ client.addDataBulk(inputTable.getTableName(), createBulkContent(inputTable, getTestData()));
+
+ tEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ if (client != null) {
+ client.deleteIndex(inputTable.getTableName());
+ }
+ }
+
+ @Test
+ void testJdbcSource() {
+ String testTable = "testTable";
+ tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
+
+ List collected = executeQuery("SELECT * FROM " + testTable);
+
+ assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData());
+ }
+
+ @Test
+ void testProject() {
+ String testTable = "testTable";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ testTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='2'",
+ "'scan.partition.lower-bound'='0'",
+ "'scan.partition.upper-bound'='100'")));
+
+ String fields = String.join(",", Arrays.copyOfRange(inputTable.getTableFields(), 0, 3));
+ List collected = executeQuery(String.format("SELECT %s FROM %s", fields, testTable));
+
+ List expected =
+ getTestData().stream()
+ .map(row -> Row.of(row.getField(0), row.getField(1), row.getField(2)))
+ .collect(Collectors.toList());
+
+ assertThat(collected).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ public void testLimit() {
+ String testTable = "testTable";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ testTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='2'",
+ "'scan.partition.lower-bound'='1'",
+ "'scan.partition.upper-bound'='2'")));
+
+ List collected = executeQuery("SELECT * FROM " + testTable + " LIMIT 1");
+
+ assertThat(collected).hasSize(1);
+ assertThat(getTestData())
+ .as("The actual output is not a subset of the expected set.")
+ .containsAll(collected);
+ }
+
+ @Test
+ public void testFilter() {
+ String testTable = "testTable";
+ tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
+
+ // create a partitioned table to ensure no regression
+ String partitionedTable = "PARTITIONED_TABLE";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ partitionedTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='1'",
+ "'scan.partition.lower-bound'='1'",
+ "'scan.partition.upper-bound'='1'")));
+
+ // we create a VIEW here to test column remapping, ie. would filter push down work if we
+ // create a view that depends on our source table
+ tEnv.executeSql(
+ String.format(
+ "CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s )",
+ Arrays.stream(inputTable.getTableFields())
+ .filter(f -> !f.equals("id"))
+ .collect(Collectors.joining(",")),
+ testTable));
+
+ Row onlyRow1 =
+ getTestData().stream()
+ .filter(row -> row.getFieldAs(0).equals(1L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ Row onlyRow2 =
+ getTestData().stream()
+ .filter(row -> row.getFieldAs(0).equals(2L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ List twoRows = getTestData();
+
+ // test simple filter
+ assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1"))
+ .containsExactly(onlyRow1);
+
+ // test TIMESTAMP filter
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'"))
+ .containsExactly(onlyRow1);
+
+ // test the IN operator
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE 1 = idx AND double_col IN (100.1234, 101.1234)"))
+ .containsExactly(onlyRow1);
+
+ // test mixing AND and OR operator
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE idx = 1 AND double_col = 100.1234 OR double_col = 101.1234"))
+ .containsExactlyInAnyOrderElementsOf(twoRows);
+
+ // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE (2 = idx AND double_col = 100.1234) OR double_col = 101.1234"))
+ .containsExactly(onlyRow2);
+
+ // test Greater than, just to make sure we didnt break anything that we cannot pushdown
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE idx = 2 AND double_col > 100 OR double_col = 101.123"))
+ .containsExactly(onlyRow2);
+
+ // One more test of parenthesis
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (double_col = 100.1234 OR double_col = 102.1234)"))
+ .isEmpty();
+
+ assertThat(
+ executeQuery(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE id = 2 AND double_col > 100 OR double_col = 101.123"))
+ .isEmpty();
+
+ assertThat(
+ executeQuery(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE 1 = id AND double_col IN (100.1234, 101.1234)"))
+ .containsExactly(onlyRow1);
+ }
+
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoin(Caching caching) {
+ // Create JDBC lookup table
+ List cachingOptions = Collections.emptyList();
+ if (caching.equals(Caching.ENABLE_CACHE)) {
+ cachingOptions =
+ Arrays.asList(
+ "'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'");
+ }
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", cachingOptions));
+
+ // Create and prepare a value source
+ String dataId =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ Row.of(1L, "Alice"),
+ Row.of(1L, "Alice"),
+ Row.of(2L, "Bob"),
+ Row.of(3L, "Charlie")));
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE value_source ( "
+ + " `id` BIGINT, "
+ + " `name` STRING, "
+ + " `proctime` AS PROCTIME()"
+ + ") WITH ("
+ + " 'connector' = 'values', "
+ + " 'data-id' = '%s'"
+ + ")",
+ dataId));
+
+ if (caching == Caching.ENABLE_CACHE) {
+ LookupCacheManager.keepCacheOnRelease(true);
+ }
+
+ // Execute lookup join
+ try {
+ List collected =
+ executeQuery(
+ "SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id");
+
+ assertThat(collected).hasSize(3);
+
+ List expected =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ 100.1234d),
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ 100.1234d),
+ Row.of(
+ 2L,
+ "Bob",
+ 2L,
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ 101.1234d));
+
+ assertThat(collected)
+ .as("The actual output is not a subset of the expected set")
+ .containsAll(expected);
+
+ if (caching == Caching.ENABLE_CACHE) {
+ validateCachedValues();
+ }
+ } finally {
+ if (caching == Caching.ENABLE_CACHE) {
+ LookupCacheManager.getInstance().checkAllReleased();
+ LookupCacheManager.getInstance().clear();
+ LookupCacheManager.keepCacheOnRelease(false);
+ }
+ }
+ }
+
+ private void validateCachedValues() {
+ // Validate cache
+ Map managedCaches =
+ LookupCacheManager.getInstance().getManagedCaches();
+ assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1);
+ LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+ // jdbc does support project push down, the cached row has been projected
+ RowData key1 = GenericRowData.of(1L);
+ RowData value1 =
+ GenericRowData.of(
+ 1L,
+ 100.1234d,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-01-01T15:35:00.123456")));
+
+ RowData key2 = GenericRowData.of(2L);
+ RowData value2 =
+ GenericRowData.of(
+ 2L,
+ 101.1234d,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-01-01T15:36:01.123456")));
+
+ RowData key3 = GenericRowData.of(3L);
+
+ Map> expectedEntries = new HashMap<>();
+ expectedEntries.put(key1, Collections.singletonList(value1));
+ expectedEntries.put(key2, Collections.singletonList(value2));
+ expectedEntries.put(key3, Collections.emptyList());
+
+ LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+ }
+
+ private enum Caching {
+ ENABLE_CACHE,
+ DISABLE_CACHE
+ }
+
+ private List executeQuery(String query) {
+ return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+ }
+
+ private TableRow createInputTable() {
+ return tableRow(
+ "jdbc_dynamic_table_source",
+ field("id", DataTypes.BIGINT().notNull()),
+ field("double_col", DataTypes.DOUBLE()),
+ field("timestamp6_col", DataTypes.TIMESTAMP(6)),
+ field("boolean_col", DataTypes.BOOLEAN()),
+ field("tinyint_col", DataTypes.TINYINT()),
+ field("smallint_col", DataTypes.SMALLINT()),
+ field("integer_col", DataTypes.INT()),
+ field("bigint_col", DataTypes.BIGINT()),
+ field("float_col", DataTypes.FLOAT()),
+ field("varchar15_col", DataTypes.VARCHAR(15)),
+ field("date_col", DataTypes.DATE()));
+ }
+
+ protected List getTestData() {
+ return Arrays.asList(
+ Row.of(
+ 1L,
+ 100.1234d,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ true,
+ (byte) -12,
+ (short) 31_987,
+ 1_147_483_647,
+ 8_223_372_036_854_775_807L,
+ 70.5f,
+ "some-text",
+ LocalDate.parse("2020-01-01")),
+ Row.of(
+ 2L,
+ 101.1234d,
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ false,
+ (byte) 23,
+ (short) -29_987,
+ -2_047_483_647,
+ -1_223_372_036_854_775_807L,
+ -123.5f,
+ "other-text",
+ LocalDate.parse("2020-01-02")));
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java
new file mode 100644
index 000000000..68476cbe3
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchBulkBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.types.Row;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+
+/** Creates content for Elastic Bulk API call. */
+public class ElasticsearchBulkBuilder {
+
+ private static final ObjectMapper OBJECT_MAPPER =
+ new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+
+ public static String createBulkContent(TableRow schema, List data)
+ throws JsonProcessingException {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < data.size(); ++i) {
+ builder.append(
+ format(
+ "{\"create\":{\"_index\":\"%s\",\"_id\":\"%d\"}}\n",
+ schema.getTableName(), i + 1));
+ builder.append(rowToJson(schema, data.get(i))).append('\n');
+ }
+ return builder.toString();
+ }
+
+ private static String rowToJson(TableRow schema, Row data) throws JsonProcessingException {
+ int fieldCount = schema.getTableDataFields().length;
+ Map fieldMap = new HashMap<>(fieldCount);
+ for (int i = 0; i < fieldCount; ++i) {
+ fieldMap.put(schema.getTableFields()[i], adjustValueIfNeeded(data.getField(i)));
+ }
+ return OBJECT_MAPPER.writeValueAsString(fieldMap);
+ }
+
+ private static Object adjustValueIfNeeded(Object object) {
+ if (object instanceof LocalDateTime) {
+ return ((LocalDateTime) object)
+ .atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("UTC"))
+ .toLocalDateTime();
+ } else {
+ return object;
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java
new file mode 100644
index 000000000..d2924eb73
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchDatabase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.time.Duration;
+
+import static org.apache.flink.connector.jdbc.testutils.databases.elasticsearch.ElasticsearchMetadata.PASSWORD;
+
+/** Elasticsearch database for testing. */
+public class ElasticsearchDatabase extends DatabaseExtension implements ElasticsearchImages {
+
+ private static final ElasticsearchContainer CONTAINER =
+ new ElasticsearchContainer(ELASTICSEARCH_8)
+ .waitingFor(
+ Wait.forLogMessage(
+ ".*Node .* is selected as the current health node.*", 1)
+ .withStartupTimeout(Duration.ofMinutes(5)));
+
+ private static ElasticsearchMetadata metadata;
+ private static ElasticsearchRestClient client;
+
+ public static ElasticsearchMetadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new ElasticsearchMetadata(CONTAINER);
+ }
+ return metadata;
+ }
+
+ private static ElasticsearchRestClient getClient() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (client == null) {
+ client = new ElasticsearchRestClient(getMetadata());
+ }
+ return client;
+ }
+
+ @Override
+ protected DatabaseMetadata startDatabase() throws Exception {
+ CONTAINER.withEnv("xpack.security.enabled", "true");
+ CONTAINER.withEnv("ELASTIC_PASSWORD", PASSWORD);
+ CONTAINER.withEnv("ES_JAVA_OPTS", "-Xms1g -Xmx1g");
+ CONTAINER.start();
+
+ // JDBC plugin is available only in Platinum and Enterprise licenses or in trial.
+ if (!getClient().trialEnabled()) {
+ getClient().enableTrial();
+ }
+
+ return getMetadata();
+ }
+
+ @Override
+ protected void stopDatabase() throws Exception {
+ CONTAINER.stop();
+ metadata = null;
+ client = null;
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java
new file mode 100644
index 000000000..59d0c8f09
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchImages.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+/** Elasticsearch docker images. */
+public interface ElasticsearchImages {
+
+ String ELASTICSEARCH_8 = "docker.elastic.co/elasticsearch/elasticsearch:8.13.1";
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java
new file mode 100644
index 000000000..54234580b
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchIndexSchemaBuilder.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataTypeVisitor;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.stream.Collectors.joining;
+
+/** Creates content for Elastic Index API call. */
+public class ElasticsearchIndexSchemaBuilder {
+
+ private static final ElasticsearchDataTypeMapper MAPPER = new ElasticsearchDataTypeMapper();
+
+ public static String buildIndexSchema(TableRow tableRow) {
+ String fields =
+ stream(tableRow.getTableDataFields())
+ .map(
+ field ->
+ format(
+ "\"%s\": %s",
+ field.getName(),
+ field.getDataType().accept(MAPPER)))
+ .collect(joining(", "));
+ return "{\"settings\": {\"number_of_shards\": 1}, \"mappings\": {\"properties\": {"
+ + fields
+ + "}}}";
+ }
+
+ /** Maps Flink types to Elasticsearch types. */
+ private static class ElasticsearchDataTypeMapper
+ implements DataTypeVisitor, LogicalTypeVisitor {
+
+ @Override
+ public String visit(AtomicDataType atomicDataType) {
+ return atomicDataType.getLogicalType().accept(this);
+ }
+
+ @Override
+ public String visit(CollectionDataType collectionDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(FieldsDataType fieldsDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(KeyValueDataType keyValueDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(CharType charType) {
+ throw new IllegalArgumentException("CharType is not supported.");
+ }
+
+ @Override
+ public String visit(VarCharType varCharType) {
+ return "{\"type\": \"text\"}";
+ }
+
+ @Override
+ public String visit(BooleanType booleanType) {
+ return "{\"type\": \"boolean\"}";
+ }
+
+ @Override
+ public String visit(BinaryType binaryType) {
+ return "{\"type\": \"binary\"}";
+ }
+
+ @Override
+ public String visit(VarBinaryType varBinaryType) {
+ return "{\"type\": \"binary\"}";
+ }
+
+ @Override
+ public String visit(DecimalType decimalType) {
+ throw new IllegalArgumentException("DecimalType is not supported.");
+ }
+
+ @Override
+ public String visit(TinyIntType tinyIntType) {
+ return "{\"type\": \"byte\"}";
+ }
+
+ @Override
+ public String visit(SmallIntType smallIntType) {
+ return "{\"type\": \"short\"}";
+ }
+
+ @Override
+ public String visit(IntType intType) {
+ return "{\"type\": \"integer\"}";
+ }
+
+ @Override
+ public String visit(BigIntType bigIntType) {
+ return "{\"type\": \"long\"}";
+ }
+
+ @Override
+ public String visit(FloatType floatType) {
+ return "{\"type\": \"float\"}";
+ }
+
+ @Override
+ public String visit(DoubleType doubleType) {
+ return "{\"type\": \"double\"}";
+ }
+
+ @Override
+ public String visit(DateType dateType) {
+ return "{\"type\": \"date\", \"format\": \"strict_date||basic_date\"}";
+ }
+
+ @Override
+ public String visit(TimeType timeType) {
+ throw new IllegalArgumentException("TimeType is not supported.");
+ }
+
+ @Override
+ public String visit(TimestampType timestampType) {
+ return timestampType.getPrecision() <= 3
+ ? "{\"type\": \"date\"}"
+ : "{\"type\": \"date_nanos\"}";
+ }
+
+ @Override
+ public String visit(ZonedTimestampType zonedTimestampType) {
+ throw new IllegalArgumentException("ZonedTimestampType is not supported.");
+ }
+
+ @Override
+ public String visit(LocalZonedTimestampType localZonedTimestampType) {
+ throw new IllegalArgumentException("LocalZonedTimestampType is not supported.");
+ }
+
+ @Override
+ public String visit(YearMonthIntervalType yearMonthIntervalType) {
+ throw new IllegalArgumentException("YearMonthIntervalType is not supported.");
+ }
+
+ @Override
+ public String visit(DayTimeIntervalType dayTimeIntervalType) {
+ throw new IllegalArgumentException("DayTimeIntervalType is not supported.");
+ }
+
+ @Override
+ public String visit(ArrayType arrayType) {
+ throw new IllegalArgumentException("ArrayType is not supported.");
+ }
+
+ @Override
+ public String visit(MultisetType multisetType) {
+ throw new IllegalArgumentException("MultisetType is not supported.");
+ }
+
+ @Override
+ public String visit(MapType mapType) {
+ throw new IllegalArgumentException("MapType is not supported.");
+ }
+
+ @Override
+ public String visit(RowType rowType) {
+ throw new IllegalArgumentException("RowType is not supported.");
+ }
+
+ @Override
+ public String visit(DistinctType distinctType) {
+ throw new IllegalArgumentException("DistinctType is not supported.");
+ }
+
+ @Override
+ public String visit(StructuredType structuredType) {
+ throw new IllegalArgumentException("StructuredType is not supported.");
+ }
+
+ @Override
+ public String visit(NullType nullType) {
+ throw new IllegalArgumentException("NullType is not supported.");
+ }
+
+ @Override
+ public String visit(RawType> rawType) {
+ throw new IllegalArgumentException("RawType is not supported.");
+ }
+
+ @Override
+ public String visit(SymbolType> symbolType) {
+ throw new IllegalArgumentException("SymbolType is not supported.");
+ }
+
+ @Override
+ public String visit(LogicalType other) {
+ return other.accept(this);
+ }
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java
new file mode 100644
index 000000000..3fdffe1fd
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import javax.sql.XADataSource;
+
+/** Elasticsearch metadata. */
+public class ElasticsearchMetadata implements DatabaseMetadata {
+
+ static final int ELASTIC_PORT = 9200;
+ static final String USERNAME = "elastic";
+ static final String PASSWORD = "password";
+
+ private final String username;
+ private final String password;
+ private final String jdbcUrl;
+ private final String driver;
+ private final String version;
+ private final String containerHost;
+ private final int containerPort;
+
+ public ElasticsearchMetadata(ElasticsearchContainer container) {
+ this.containerHost = container.getHost();
+ this.containerPort = container.getMappedPort(ELASTIC_PORT);
+ this.username = USERNAME;
+ this.password = PASSWORD;
+ this.jdbcUrl = "jdbc:elasticsearch://" + containerHost + ":" + containerPort;
+ this.driver = "org.elasticsearch.xpack.sql.jdbc.EsDriver";
+ this.version = container.getDockerImageName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.jdbcUrl;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return this.version;
+ }
+
+ public String getContainerHost() {
+ return containerHost;
+ }
+
+ public int getContainerPort() {
+ return containerPort;
+ }
+}
diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java
new file mode 100644
index 000000000..5519cd614
--- /dev/null
+++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/testutils/databases/elasticsearch/ElasticsearchRestClient.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.testutils.databases.elasticsearch;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+
+/** Elasticsearch REST API client. */
+public class ElasticsearchRestClient {
+
+ private static final ObjectMapper OBJECT_MAPPER =
+ new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ private final RestClient restClient;
+
+ public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
+ this(
+ metadata.getContainerHost(),
+ metadata.getContainerPort(),
+ metadata.getUsername(),
+ metadata.getPassword());
+ }
+
+ public ElasticsearchRestClient(String host, int port, String username, String password) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ this.restClient =
+ RestClient.builder(new HttpHost(host, port, "http"))
+ .setHttpClientConfigCallback(
+ builder ->
+ builder.setDefaultCredentialsProvider(credentialsProvider))
+ .build();
+ }
+
+ public boolean trialEnabled() throws Exception {
+ Request request = new Request("GET", "/_license");
+ ElasticLicenseResponse response = executeRequest(request, ElasticLicenseResponse.class);
+ return response != null
+ && response.license.status.equals("active")
+ && response.license.type.equals("trial");
+ }
+
+ public void enableTrial() throws Exception {
+ executeRequest(new Request("POST", "/_license/start_trial?acknowledge=true"));
+ }
+
+ public void createIndex(String indexName, String indexDefinition) throws Exception {
+ Request request = new Request("PUT", format("/%s/", indexName));
+ request.setJsonEntity(indexDefinition);
+ executeRequest(request);
+ }
+
+ public void deleteIndex(String indexName) throws Exception {
+ executeRequest(new Request("DELETE", format("/%s/", indexName)));
+ }
+
+ public void addDataBulk(String indexName, String content) throws Exception {
+ Request request = new Request("PUT", format("/%s/_bulk?refresh=true", indexName));
+ request.setJsonEntity(content);
+ executeRequest(request);
+ }
+
+ private T executeRequest(Request request, Class outputClass) throws IOException {
+ org.elasticsearch.client.Response response = restClient.performRequest(request);
+ Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
+ return OBJECT_MAPPER.readValue(EntityUtils.toString(response.getEntity()), outputClass);
+ }
+
+ private void executeRequest(Request request) throws IOException {
+ org.elasticsearch.client.Response response = restClient.performRequest(request);
+ Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ private static class ElasticLicenseResponse {
+ private static class License {
+ private String status;
+ private String type;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+ }
+
+ private License license;
+
+ public License getLicense() {
+ return license;
+ }
+
+ public void setLicense(License license) {
+ this.license = license;
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 9b755483b..ff4437649 100644
--- a/pom.xml
+++ b/pom.xml
@@ -378,6 +378,37 @@ under the License.
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+ ban-elastic-dependencies
+
+ enforce
+
+
+
+
+
+
+ org.elasticsearch.*:*
+
+
+ org.elasticsearch.plugin:x-pack-sql-jdbc:*:*:provided
+ org.elasticsearch.client:elasticsearch-rest-client:*:*:test
+
+ Elastic x-pack dependencies are not allowed to be bundled due to licensing issues.
+
+
+
+
+
+
+
org.apache.maven.plugins
maven-jar-plugin
@@ -412,11 +443,6 @@ under the License.
maven-surefire-plugin
-
- org.apache.maven.plugins
- maven-enforcer-plugin
-
-
org.apache.maven.plugins
maven-shade-plugin