diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md
index fdb932656..50ef6a10d 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -47,17 +47,18 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "
在连接到具体数据库时,也需要对应的驱动依赖,目前支持的驱动如下:
-| Driver | Group Id | Artifact Id | JAR |
-|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------|
-| MySQL | `mysql` | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download/) |
-| Derby | `org.apache.derby` | `derby` | [下载](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
-| CrateDB | `io.crate` | `crate-jdbc` | [下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
-| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
-| Trino | `io.trino` | `trino-jdbc` | [下载](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
-| OceanBase | `com.oceanbase` | `oceanbase-client` | [下载](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
+| Driver | Group Id | Artifact Id | JAR |
+|:--------------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------|
+| MySQL | `mysql` | `mysql-connector-java` | [下载](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download/) |
+| Derby | `org.apache.derby` | `derby` | [下载](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB | `io.crate` | `crate-jdbc` | [下载](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
+| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [下载](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
+| Trino | `io.trino` | `trino-jdbc` | [下载](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
+| OceanBase | `com.oceanbase` | `oceanbase-client` | [下载](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
+| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [下载](https://www.elastic.co/downloads/jdbc-client) |
当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解在集群上执行时如何连接它们。
@@ -723,6 +724,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
Trino type |
OceanBase MySQL mode type |
OceanBase Oracle mode type |
+ Elastic SQL type |
}}">Flink SQL type |
@@ -737,6 +739,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
TINYINT |
TINYINT |
|
+ BYTE |
TINYINT |
@@ -759,6 +762,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
SMALLINT
TINYINT UNSIGNED
|
+ SHORT |
SMALLINT |
@@ -781,6 +785,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
MEDIUMINT
SMALLINT UNSIGNED
|
+ INTEGER |
INT |
@@ -801,6 +806,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
BIGINT
INT UNSIGNED
|
+
+ LONG
+ UNSIGNED_LONG |
BIGINT |
@@ -813,6 +821,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
|
BIGINT UNSIGNED |
|
+ |
DECIMAL(20, 0) |
@@ -831,6 +840,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
FLOAT |
BINARY_FLOAT |
+
+ FLOAT
+ HALF_FLOAT |
FLOAT |
@@ -849,6 +861,9 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
DOUBLE |
DOUBLE |
BINARY_DOUBLE |
+
+ DOUBLE
+ SCALED_FLOAT |
DOUBLE |
@@ -877,6 +892,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
FLOAT(s)
NUMBER(p, s) |
+ |
DECIMAL(p, s) |
@@ -894,6 +910,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
TINYINT(1)
|
BOOLEAN |
+ BOOLEAN |
DATE |
@@ -905,6 +922,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
DATE |
DATE |
DATE |
+ |
DATE |
@@ -917,6 +935,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
TIME_WITHOUT_TIME_ZONE |
TIME [(p)] |
DATE |
+ |
TIME [(p)] [WITHOUT TIMEZONE] |
@@ -932,6 +951,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
TIMESTAMP_WITHOUT_TIME_ZONE |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+ DATETIME |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
@@ -980,6 +1000,11 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
NCHAR(n)
VARCHAR2(n)
CLOB
+
+ KEYWORD
+ IP
+ TEXT
+ VERSION |
STRING |
@@ -1005,6 +1030,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
RAW(s)
BLOB |
+ BINARY |
BYTES |
@@ -1017,6 +1043,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
ARRAY |
|
|
+ |
ARRAY |
diff --git a/docs/content/docs/connectors/datastream/jdbc.md b/docs/content/docs/connectors/datastream/jdbc.md
index b4349071b..73ba49398 100644
--- a/docs/content/docs/connectors/datastream/jdbc.md
+++ b/docs/content/docs/connectors/datastream/jdbc.md
@@ -529,4 +529,14 @@ Please also take Oracle connection pooling into account.
Please refer to the `JdbcXaSinkFunction` documentation for more details.
+## License of JDBC driver for Elasticsearch
+
+Flink's JDBC connector defines a Maven dependency on the "JDBC driver for Elasticsearch", which is licensed under
+the Elastic License 2.0.
+Flink itself neither reuses source code from the "JDBC driver for Elasticsearch"
+nor packages binaries from the "JDBC driver for Elasticsearch".
+Users that create and publish derivative work based on Flink's JDBC connector (thereby re-distributing
+the "JDBC driver for Elasticsearch") must be aware that this may be subject to conditions declared in
+the Elastic License 2.0.
+
{{< top >}}
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index b0ef84eb1..7e65cd6e6 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -45,18 +45,18 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
-| Driver | Group Id | Artifact Id | JAR |
-|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
-| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
-| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
-| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
-| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
-| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
-| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
-| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
-
+| Driver | Group Id | Artifact Id | JAR |
+|:--------------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
+| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
+| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
+| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
+| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
+| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
+| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
+| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [Download](https://www.elastic.co/downloads/jdbc-client) |
JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
@@ -702,7 +702,7 @@ SELECT * FROM given_database.test_table2;
Data Type Mapping
----------------
-Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
+Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase and Elasticsearch. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
@@ -716,6 +716,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
Trino type |
OceanBase MySQL mode type |
OceanBase Oracle mode type |
+ Elastic SQL type |
}}">Flink SQL type |
@@ -730,6 +731,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT |
TINYINT |
|
+ BYTE |
TINYINT |
@@ -752,6 +754,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
SMALLINT
TINYINT UNSIGNED
|
+ SHORT |
SMALLINT |
@@ -774,6 +777,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
MEDIUMINT
SMALLINT UNSIGNED
|
+ INTEGER |
INT |
@@ -794,6 +798,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
BIGINT
INT UNSIGNED
|
+
+ LONG
+ UNSIGNED_LONG |
BIGINT |
@@ -806,6 +813,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
|
BIGINT UNSIGNED |
|
+ |
DECIMAL(20, 0) |
@@ -824,6 +832,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT |
BINARY_FLOAT |
+
+ FLOAT
+ HALF_FLOAT |
FLOAT |
@@ -842,6 +853,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DOUBLE |
DOUBLE |
BINARY_DOUBLE |
+
+ DOUBLE
+ SCALED_FLOAT |
DOUBLE |
@@ -870,6 +884,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT(s)
NUMBER(p, s) |
+ |
DECIMAL(p, s) |
@@ -887,6 +902,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT(1)
|
BOOLEAN |
+ BOOLEAN |
DATE |
@@ -898,6 +914,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DATE |
DATE |
DATE |
+ |
DATE |
@@ -910,6 +927,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TIME_WITHOUT_TIME_ZONE |
TIME [(p)] |
DATE |
+ |
TIME [(p)] [WITHOUT TIMEZONE] |
@@ -925,6 +943,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TIMESTAMP_WITHOUT_TIME_ZONE |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+ DATETIME |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
@@ -973,6 +992,11 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
NCHAR(n)
VARCHAR2(n)
CLOB
+
+ KEYWORD
+ IP
+ TEXT
+ VERSION |
STRING |
@@ -998,6 +1022,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
RAW(s)
BLOB |
+ BINARY |
BYTES |
@@ -1010,9 +1035,20 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
ARRAY |
|
|
+ |
ARRAY |
+## License of JDBC driver for Elasticsearch
+
+Flink's JDBC connector defines a Maven dependency on the "JDBC driver for Elasticsearch", which is licensed under
+the Elastic License 2.0.
+Flink itself neither reuses source code from the "JDBC driver for Elasticsearch"
+nor packages binaries from the "JDBC driver for Elasticsearch".
+Users that create and publish derivative work based on Flink's JDBC connector (thereby re-distributing
+the "JDBC driver for Elasticsearch") must be aware that this may be subject to conditions declared in
+the Elastic License 2.0.
+
{{< top >}}
diff --git a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
index 8b721b12c..8bbd19ea5 100644
--- a/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
+++ b/flink-connector-jdbc-core/src/test/java/org/apache/flink/connector/jdbc/testutils/resources/DockerResource.java
@@ -7,7 +7,6 @@
import org.slf4j.LoggerFactory;
import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.JdbcDatabaseContainer;
import java.util.Arrays;
@@ -16,9 +15,9 @@ public class DockerResource implements DatabaseResource {
protected static final Logger LOG = LoggerFactory.getLogger(DockerResource.class);
- private final JdbcDatabaseContainer> container;
+ private final GenericContainer> container;
- public DockerResource(JdbcDatabaseContainer> container) {
+ public DockerResource(GenericContainer> container) {
this.container = container;
}
diff --git a/flink-connector-jdbc-elasticsearch/pom.xml b/flink-connector-jdbc-elasticsearch/pom.xml
new file mode 100644
index 000000000..626d2dff6
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/pom.xml
@@ -0,0 +1,122 @@
+
+
+ 4.0.0
+
+ org.apache.flink
+ flink-connector-jdbc-parent
+ 3.3-SNAPSHOT
+
+
+ flink-connector-jdbc-elasticsearch
+ Flink : Connectors : JDBC : Elasticsearch
+
+ jar
+
+
+ 8.14.3
+ 2.13.4.2
+ 2.13.4
+
+
+
+
+ org.apache.flink
+ flink-connector-jdbc-core
+ ${project.version}
+
+
+ org.apache.flink
+ flink-connector-jdbc-core
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-table-api-java-bridge
+ ${flink.version}
+ provided
+ true
+
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ test-jar
+ test
+
+
+
+ org.apache.flink
+ flink-test-utils
+ ${flink.version}
+ test
+
+
+
+
+
+ org.elasticsearch.plugin
+ x-pack-sql-jdbc
+ ${elasticsearch.version}
+ provided
+
+
+
+
+ org.assertj
+ assertj-core
+ ${assertj.version}
+ test
+
+
+
+
+ org.testcontainers
+ elasticsearch
+ test
+
+
+ org.testcontainers
+ jdbc
+ test
+
+
+ org.elasticsearch.client
+ elasticsearch-rest-client
+ ${elasticsearch.version}
+ test
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+ ${jackson-databind.version}
+ test
+
+
+ com.fasterxml.jackson.datatype
+ jackson-datatype-jsr310
+ ${jackson-datatype-jsr310.version}
+ test
+
+
+
+
+ org.apache.flink
+ flink-architecture-tests-test
+ test
+
+
+
+
+
diff --git a/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/ElasticsearchFactory.java b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/ElasticsearchFactory.java
new file mode 100644
index 000000000..5ac6ae771
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/ElasticsearchFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.database;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.JdbcFactory;
+import org.apache.flink.connector.jdbc.core.database.catalog.JdbcCatalog;
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.elasticsearch.database.dialect.ElasticsearchDialect;
+
+/** Factory for {@link ElasticsearchDialect}. */
+@Internal
+public class ElasticsearchFactory implements JdbcFactory {
+
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
+ }
+
+ @Override
+ public JdbcDialect createDialect() {
+ return new ElasticsearchDialect();
+ }
+
+ @Override
+ public JdbcCatalog createCatalog(
+ ClassLoader classLoader,
+ String catalogName,
+ String defaultDatabase,
+ String username,
+ String pwd,
+ String baseUrl) {
+ throw new UnsupportedOperationException("Catalog for Elasticsearch is not supported yet.");
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialect.java b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialect.java
new file mode 100644
index 000000000..3109361f8
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialect.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.database.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Set;
+
+/** JDBC dialect for Elasticsearch. */
+@Internal
+public class ElasticsearchDialect extends AbstractDialect {
+
+ private static final long serialVersionUID = 1L;
+
+ // Define MAX/MIN precision of TIMESTAMP type according to Elasticsearch docs:
+ // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+ private static final int MIN_TIMESTAMP_PRECISION = 0;
+ private static final int MAX_TIMESTAMP_PRECISION = 9;
+
+ @Override
+ public String dialectName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
+ }
+
+ @Override
+ public JdbcDialectConverter getRowConverter(RowType rowType) {
+ return new ElasticsearchDialectConverter(rowType);
+ }
+
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return '"' + identifier + '"';
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ return "LIMIT " + limit;
+ }
+
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ throw new UnsupportedOperationException("Upsert is not supported.");
+ }
+
+ @Override
+ public String getInsertIntoStatement(String tableName, String[] fieldNames) {
+ throw new UnsupportedOperationException("Insert into is not supported.");
+ }
+
+ @Override
+ public String getUpdateStatement(
+ String tableName, String[] fieldNames, String[] conditionFields) {
+ throw new UnsupportedOperationException("Update is not supported.");
+ }
+
+ @Override
+ public String getDeleteStatement(String tableName, String[] conditionFields) {
+ throw new UnsupportedOperationException("Delete is not supported.");
+ }
+
+ @Override
+ public Set supportedTypes() {
+ // The list of types supported by Elastic SQL.
+ // https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
+ return EnumSet.of(
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.VARCHAR);
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectConverter.java b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectConverter.java
new file mode 100644
index 000000000..c7fe39c69
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/main/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectConverter.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.database.dialect;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.sql.Timestamp;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * Elasticsearch.
+ */
+@Internal
+public class ElasticsearchDialectConverter extends AbstractDialectConverter {
+ private static final long serialVersionUID = 1L;
+
+ public ElasticsearchDialectConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ public String converterName() {
+ return "Elasticsearch";
+ }
+
+ @Override
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ case DOUBLE:
+ case FLOAT:
+ return val -> val;
+ case DATE:
+ return val ->
+ (int) (((Timestamp) val).toLocalDateTime().toLocalDate().toEpochDay());
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory b/flink-connector-jdbc-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory
new file mode 100644
index 000000000..f44be48c4
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.core.database.JdbcFactory
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.flink.connector.jdbc.elasticsearch.database.ElasticsearchFactory
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/ElasticsearchTestBase.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/ElasticsearchTestBase.java
new file mode 100644
index 000000000..e885b10f7
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/ElasticsearchTestBase.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch;
+
+import org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchDatabase;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseTest;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/** Base class for Elasticsearch testing. */
+@ExtendWith(ElasticsearchDatabase.class)
+public interface ElasticsearchTestBase extends DatabaseTest {
+
+ @Override
+ default DatabaseMetadata getMetadata() {
+ return ElasticsearchDatabase.getMetadata();
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectTest.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectTest.java
new file mode 100644
index 000000000..0eacf666f
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchDialectTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.database.dialect;
+
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialectTest;
+import org.apache.flink.connector.jdbc.elasticsearch.ElasticsearchTestBase;
+
+import java.util.Arrays;
+import java.util.List;
+
+/** The Elasticsearch params for {@link JdbcDialectTest}. */
+class ElasticsearchDialectTest extends JdbcDialectTest implements ElasticsearchTestBase {
+
+ @Override
+ protected List testData() {
+ return Arrays.asList(
+ createTestItem("VARCHAR"),
+ createTestItem("BOOLEAN"),
+ createTestItem("TINYINT"),
+ createTestItem("SMALLINT"),
+ createTestItem("INTEGER"),
+ createTestItem("BIGINT"),
+ createTestItem("FLOAT"),
+ createTestItem("DOUBLE"),
+ createTestItem("DATE"),
+ createTestItem("TIMESTAMP(3)"),
+ createTestItem("TIMESTAMP WITHOUT TIME ZONE"),
+ createTestItem("VARBINARY"),
+
+ // Not valid data
+ createTestItem("CHAR", "The Elasticsearch dialect doesn't support type: CHAR(1)."),
+ createTestItem(
+ "BINARY", "The Elasticsearch dialect doesn't support type: BINARY(1)."),
+ createTestItem("TIME", "The Elasticsearch dialect doesn't support type: TIME(0)."),
+ createTestItem(
+ "VARBINARY(10)",
+ "The Elasticsearch dialect doesn't support type: VARBINARY(10)."),
+ createTestItem(
+ "DECIMAL(10, 4)",
+ "The Elasticsearch dialect doesn't support type: DECIMAL(10, 4)."));
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchPreparedStatementTest.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchPreparedStatementTest.java
new file mode 100644
index 000000000..c4f1e4ef2
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/database/dialect/ElasticsearchPreparedStatementTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.database.dialect;
+
+import org.apache.flink.connector.jdbc.core.database.JdbcFactoryLoader;
+import org.apache.flink.connector.jdbc.core.database.dialect.JdbcDialect;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link ElasticsearchPreparedStatementTest}. */
+public class ElasticsearchPreparedStatementTest {
+
+ private final JdbcDialect dialect =
+ JdbcFactoryLoader.loadDialect(
+ "jdbc:elasticsearch://localhost:9200/test", getClass().getClassLoader());
+
+ private final String[] fieldNames =
+ new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
+ private final String[] keyFields = new String[] {"id", "__field_3__"};
+ private final String tableName = "tbl";
+
+ @Test
+ void testRowExistsStatement() {
+ String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
+ assertThat(rowExistStmt)
+ .isEqualTo(
+ "SELECT 1 FROM \"tbl\" WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
+ }
+
+ @Test
+ void testSelectStatement() {
+ String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
+ assertThat(selectStmt)
+ .isEqualTo(
+ "SELECT \"id\", \"name\", \"email\", \"ts\", \"field1\", \"field_2\", \"__field_3__\" "
+ + "FROM \"tbl\" "
+ + "WHERE \"id\" = :id AND \"__field_3__\" = :__field_3__");
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java
new file mode 100644
index 000000000..f3b56ff10
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/table/ElasticsearchDynamicTableSourceITCase.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.table;
+
+import org.apache.flink.connector.jdbc.elasticsearch.ElasticsearchTestBase;
+import org.apache.flink.connector.jdbc.elasticsearch.database.dialect.ElasticsearchDialect;
+import org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchMetadata;
+import org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchRestClient;
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.connector.source.lookup.cache.LookupCache;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.runtime.functions.table.lookup.LookupCacheManager;
+import org.apache.flink.table.test.lookup.cache.LookupCacheAssert;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchBulkBuilder.createBulkContent;
+import static org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchIndexSchemaBuilder.buildIndexSchema;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.field;
+import static org.apache.flink.connector.jdbc.testutils.tables.TableBuilder.tableRow;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link ElasticsearchDialect}. */
+public class ElasticsearchDynamicTableSourceITCase extends AbstractTestBase
+ implements ElasticsearchTestBase {
+
+ private ElasticsearchRestClient client;
+ private TableEnvironment tEnv;
+ private final TableRow inputTable = createInputTable();
+
+ @BeforeEach
+ void beforeEach() throws Exception {
+ client = new ElasticsearchRestClient((ElasticsearchMetadata) getMetadata());
+
+ client.createIndex(inputTable.getTableName(), buildIndexSchema(inputTable));
+ client.addDataBulk(inputTable.getTableName(), createBulkContent(inputTable, getTestData()));
+
+ tEnv = StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
+ }
+
+ @AfterEach
+ void afterEach() throws Exception {
+ if (client != null) {
+ client.deleteIndex(inputTable.getTableName());
+ }
+ }
+
+ @Test
+ void testJdbcSource() {
+ String testTable = "testTable";
+ tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
+
+ List collected = executeQuery("SELECT * FROM " + testTable);
+
+ assertThat(collected).containsExactlyInAnyOrderElementsOf(getTestData());
+ }
+
+ @Test
+ void testProject() {
+ String testTable = "testTable";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ testTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='2'",
+ "'scan.partition.lower-bound'='0'",
+ "'scan.partition.upper-bound'='100'")));
+
+ String fields = String.join(",", Arrays.copyOfRange(inputTable.getTableFields(), 0, 3));
+ List collected = executeQuery(String.format("SELECT %s FROM %s", fields, testTable));
+
+ List expected =
+ getTestData().stream()
+ .map(row -> Row.of(row.getField(0), row.getField(1), row.getField(2)))
+ .collect(Collectors.toList());
+
+ assertThat(collected).containsExactlyInAnyOrderElementsOf(expected);
+ }
+
+ @Test
+ public void testLimit() {
+ String testTable = "testTable";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ testTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='2'",
+ "'scan.partition.lower-bound'='1'",
+ "'scan.partition.upper-bound'='2'")));
+
+ List collected = executeQuery("SELECT * FROM " + testTable + " LIMIT 1");
+
+ assertThat(collected).hasSize(1);
+ assertThat(getTestData())
+ .as("The actual output is not a subset of the expected set.")
+ .containsAll(collected);
+ }
+
+ @Test
+ public void testFilter() {
+ String testTable = "testTable";
+ tEnv.executeSql(inputTable.getCreateQueryForFlink(getMetadata(), testTable));
+
+ // create a partitioned table to ensure no regression
+ String partitionedTable = "PARTITIONED_TABLE";
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(
+ getMetadata(),
+ partitionedTable,
+ Arrays.asList(
+ "'scan.partition.column'='id'",
+ "'scan.partition.num'='1'",
+ "'scan.partition.lower-bound'='1'",
+ "'scan.partition.upper-bound'='1'")));
+
+ // we create a VIEW here to test column remapping, ie. would filter push down work if we
+ // create a view that depends on our source table
+ tEnv.executeSql(
+ String.format(
+ "CREATE VIEW FAKE_TABLE (idx, %s) as (SELECT * from %s )",
+ Arrays.stream(inputTable.getTableFields())
+ .filter(f -> !f.equals("id"))
+ .collect(Collectors.joining(",")),
+ testTable));
+
+ Row onlyRow1 =
+ getTestData().stream()
+ .filter(row -> row.getFieldAs(0).equals(1L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ Row onlyRow2 =
+ getTestData().stream()
+ .filter(row -> row.getFieldAs(0).equals(2L))
+ .findAny()
+ .orElseThrow(NullPointerException::new);
+
+ List twoRows = getTestData();
+
+ // test simple filter
+ assertThat(executeQuery("SELECT * FROM FAKE_TABLE WHERE idx = 1"))
+ .containsExactly(onlyRow1);
+
+ // test TIMESTAMP filter
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE timestamp6_col = TIMESTAMP '2020-01-01 15:35:00.123456'"))
+ .containsExactly(onlyRow1);
+
+ // test the IN operator
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE 1 = idx AND double_col IN (100.1234, 101.1234)"))
+ .containsExactly(onlyRow1);
+
+ // test mixing AND and OR operator
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE idx = 1 AND double_col = 100.1234 OR double_col = 101.1234"))
+ .containsExactlyInAnyOrderElementsOf(twoRows);
+
+ // test mixing AND/OR with parenthesis, and the swapping the operand of equal expression
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE (2 = idx AND double_col = 100.1234) OR double_col = 101.1234"))
+ .containsExactly(onlyRow2);
+
+ // test Greater than, just to make sure we didnt break anything that we cannot pushdown
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE idx = 2 AND double_col > 100 OR double_col = 101.123"))
+ .containsExactly(onlyRow2);
+
+ // One more test of parenthesis
+ assertThat(
+ executeQuery(
+ "SELECT * FROM FAKE_TABLE WHERE 2 = idx AND (double_col = 100.1234 OR double_col = 102.1234)"))
+ .isEmpty();
+
+ assertThat(
+ executeQuery(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE id = 2 AND double_col > 100 OR double_col = 101.123"))
+ .isEmpty();
+
+ assertThat(
+ executeQuery(
+ "SELECT * FROM "
+ + partitionedTable
+ + " WHERE 1 = id AND double_col IN (100.1234, 101.1234)"))
+ .containsExactly(onlyRow1);
+ }
+
+ @ParameterizedTest
+ @EnumSource(Caching.class)
+ void testLookupJoin(Caching caching) {
+ // Create JDBC lookup table
+ List cachingOptions = Collections.emptyList();
+ if (caching.equals(Caching.ENABLE_CACHE)) {
+ cachingOptions =
+ Arrays.asList(
+ "'lookup.cache.max-rows' = '100'", "'lookup.cache.ttl' = '10min'");
+ }
+ tEnv.executeSql(
+ inputTable.getCreateQueryForFlink(getMetadata(), "jdbc_lookup", cachingOptions));
+
+ // Create and prepare a value source
+ String dataId =
+ TestValuesTableFactory.registerData(
+ Arrays.asList(
+ Row.of(1L, "Alice"),
+ Row.of(1L, "Alice"),
+ Row.of(2L, "Bob"),
+ Row.of(3L, "Charlie")));
+ tEnv.executeSql(
+ String.format(
+ "CREATE TABLE value_source ( "
+ + " `id` BIGINT, "
+ + " `name` STRING, "
+ + " `proctime` AS PROCTIME()"
+ + ") WITH ("
+ + " 'connector' = 'values', "
+ + " 'data-id' = '%s'"
+ + ")",
+ dataId));
+
+ if (caching == Caching.ENABLE_CACHE) {
+ LookupCacheManager.keepCacheOnRelease(true);
+ }
+
+ // Execute lookup join
+ try {
+ List collected =
+ executeQuery(
+ "SELECT S.id, S.name, D.id, D.timestamp6_col, D.double_col FROM value_source"
+ + " AS S JOIN jdbc_lookup for system_time as of S.proctime AS D ON S.id = D.id");
+
+ assertThat(collected).hasSize(3);
+
+ List expected =
+ Arrays.asList(
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ 100.1234d),
+ Row.of(
+ 1L,
+ "Alice",
+ 1L,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ 100.1234d),
+ Row.of(
+ 2L,
+ "Bob",
+ 2L,
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ 101.1234d));
+
+ assertThat(collected)
+ .as("The actual output is not a subset of the expected set")
+ .containsAll(expected);
+
+ if (caching == Caching.ENABLE_CACHE) {
+ validateCachedValues();
+ }
+ } finally {
+ if (caching == Caching.ENABLE_CACHE) {
+ LookupCacheManager.getInstance().checkAllReleased();
+ LookupCacheManager.getInstance().clear();
+ LookupCacheManager.keepCacheOnRelease(false);
+ }
+ }
+ }
+
+ private void validateCachedValues() {
+ // Validate cache
+ Map managedCaches =
+ LookupCacheManager.getInstance().getManagedCaches();
+ assertThat(managedCaches).as("There should be only 1 shared cache registered").hasSize(1);
+ LookupCache cache = managedCaches.get(managedCaches.keySet().iterator().next()).getCache();
+ // jdbc does support project push down, the cached row has been projected
+ RowData key1 = GenericRowData.of(1L);
+ RowData value1 =
+ GenericRowData.of(
+ 1L,
+ 100.1234d,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-01-01T15:35:00.123456")));
+
+ RowData key2 = GenericRowData.of(2L);
+ RowData value2 =
+ GenericRowData.of(
+ 2L,
+ 101.1234d,
+ TimestampData.fromLocalDateTime(
+ LocalDateTime.parse("2020-01-01T15:36:01.123456")));
+
+ RowData key3 = GenericRowData.of(3L);
+
+ Map> expectedEntries = new HashMap<>();
+ expectedEntries.put(key1, Collections.singletonList(value1));
+ expectedEntries.put(key2, Collections.singletonList(value2));
+ expectedEntries.put(key3, Collections.emptyList());
+
+ LookupCacheAssert.assertThat(cache).containsExactlyEntriesOf(expectedEntries);
+ }
+
+ private enum Caching {
+ ENABLE_CACHE,
+ DISABLE_CACHE
+ }
+
+ private List executeQuery(String query) {
+ return CollectionUtil.iteratorToList(tEnv.executeSql(query).collect());
+ }
+
+ private TableRow createInputTable() {
+ return tableRow(
+ "jdbc_dynamic_table_source",
+ field("id", DataTypes.BIGINT().notNull()),
+ field("double_col", DataTypes.DOUBLE()),
+ field("timestamp6_col", DataTypes.TIMESTAMP(6)),
+ field("boolean_col", DataTypes.BOOLEAN()),
+ field("tinyint_col", DataTypes.TINYINT()),
+ field("smallint_col", DataTypes.SMALLINT()),
+ field("integer_col", DataTypes.INT()),
+ field("bigint_col", DataTypes.BIGINT()),
+ field("float_col", DataTypes.FLOAT()),
+ field("varchar15_col", DataTypes.VARCHAR(15)),
+ field("date_col", DataTypes.DATE()));
+ }
+
+ protected List getTestData() {
+ return Arrays.asList(
+ Row.of(
+ 1L,
+ 100.1234d,
+ LocalDateTime.parse("2020-01-01T15:35:00.123456"),
+ true,
+ (byte) -12,
+ (short) 31_987,
+ 1_147_483_647,
+ 8_223_372_036_854_775_807L,
+ 70.5f,
+ "some-text",
+ LocalDate.parse("2020-01-01")),
+ Row.of(
+ 2L,
+ 101.1234d,
+ LocalDateTime.parse("2020-01-01T15:36:01.123456"),
+ false,
+ (byte) 23,
+ (short) -29_987,
+ -2_047_483_647,
+ -1_223_372_036_854_775_807L,
+ -123.5f,
+ "other-text",
+ LocalDate.parse("2020-01-02")));
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchBulkBuilder.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchBulkBuilder.java
new file mode 100644
index 000000000..9808111a9
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchBulkBuilder.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.types.Row;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+
+/** Creates content for Elastic Bulk API call. */
+public class ElasticsearchBulkBuilder {
+
+ private static final ObjectMapper OBJECT_MAPPER =
+ new ObjectMapper()
+ .registerModule(new JavaTimeModule())
+ .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false);
+
+ public static String createBulkContent(TableRow schema, List data)
+ throws JsonProcessingException {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0; i < data.size(); ++i) {
+ builder.append(
+ format(
+ "{\"create\":{\"_index\":\"%s\",\"_id\":\"%d\"}}\n",
+ schema.getTableName(), i + 1));
+ builder.append(rowToJson(schema, data.get(i))).append('\n');
+ }
+ return builder.toString();
+ }
+
+ private static String rowToJson(TableRow schema, Row data) throws JsonProcessingException {
+ int fieldCount = schema.getTableDataFields().length;
+ Map fieldMap = new HashMap<>(fieldCount);
+ for (int i = 0; i < fieldCount; ++i) {
+ fieldMap.put(schema.getTableFields()[i], adjustValueIfNeeded(data.getField(i)));
+ }
+ return OBJECT_MAPPER.writeValueAsString(fieldMap);
+ }
+
+ private static Object adjustValueIfNeeded(Object object) {
+ if (object instanceof LocalDateTime) {
+ return ((LocalDateTime) object)
+ .atZone(ZoneId.systemDefault())
+ .withZoneSameInstant(ZoneId.of("UTC"))
+ .toLocalDateTime();
+ } else {
+ return object;
+ }
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchDatabase.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchDatabase.java
new file mode 100644
index 000000000..1c48a3075
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchDatabase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseExtension;
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+import org.apache.flink.connector.jdbc.testutils.DatabaseResource;
+import org.apache.flink.connector.jdbc.testutils.resources.DockerResource;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.testcontainers.containers.wait.strategy.Wait;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import java.time.Duration;
+
+import static org.apache.flink.connector.jdbc.elasticsearch.testutils.ElasticsearchMetadata.PASSWORD;
+
+/** An Elasticsearch database for testing. */
+public class ElasticsearchDatabase extends DatabaseExtension implements ElasticsearchImages {
+
+ private static final ElasticsearchContainer CONTAINER =
+ new TestElasticsearchContainer(ELASTICSEARCH_8)
+ .waitingFor(
+ Wait.forLogMessage(
+ ".*Node .* is selected as the current health node.*", 1)
+ .withStartupTimeout(Duration.ofMinutes(5)));
+
+ private static ElasticsearchMetadata metadata;
+ private static ElasticsearchRestClient client;
+
+ public static ElasticsearchMetadata getMetadata() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (metadata == null) {
+ metadata = new ElasticsearchMetadata(CONTAINER);
+ }
+ return metadata;
+ }
+
+ @Override
+ protected DatabaseMetadata getMetadataDB() {
+ return getMetadata();
+ }
+
+ @Override
+ protected DatabaseResource getResource() {
+ return new DockerResource(CONTAINER);
+ }
+
+ private static ElasticsearchRestClient getClient() {
+ if (!CONTAINER.isRunning()) {
+ throw new FlinkRuntimeException("Container is stopped.");
+ }
+ if (client == null) {
+ client = new ElasticsearchRestClient(getMetadata());
+ }
+ return client;
+ }
+
+ /** Custom wrapper for {@link ElasticsearchContainer}. */
+ public static class TestElasticsearchContainer extends ElasticsearchContainer {
+
+ public TestElasticsearchContainer(String dockerImageName) {
+ super(dockerImageName);
+ }
+
+ @Override
+ public void start() {
+ CONTAINER.withEnv("xpack.security.enabled", "true");
+ CONTAINER.withEnv("ELASTIC_PASSWORD", PASSWORD);
+ CONTAINER.withEnv("ES_JAVA_OPTS", "-Xms1g -Xmx1g");
+
+ super.start();
+
+ // JDBC plugin is available only in Platinum and Enterprise licenses or in trial.
+ try {
+ if (!getClient().trialEnabled()) {
+ getClient().enableTrial();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void stop() {
+ super.stop();
+ client = null;
+ }
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchImages.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchImages.java
new file mode 100644
index 000000000..a62577f38
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchImages.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+/** Elasticsearch docker images. */
+public interface ElasticsearchImages {
+ String ELASTICSEARCH_8 = "docker.elastic.co/elasticsearch/elasticsearch:8.14.3";
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchIndexSchemaBuilder.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchIndexSchemaBuilder.java
new file mode 100644
index 000000000..88ec85424
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchIndexSchemaBuilder.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+import org.apache.flink.connector.jdbc.testutils.tables.TableRow;
+import org.apache.flink.table.types.AtomicDataType;
+import org.apache.flink.table.types.CollectionDataType;
+import org.apache.flink.table.types.DataTypeVisitor;
+import org.apache.flink.table.types.FieldsDataType;
+import org.apache.flink.table.types.KeyValueDataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.BinaryType;
+import org.apache.flink.table.types.logical.BooleanType;
+import org.apache.flink.table.types.logical.CharType;
+import org.apache.flink.table.types.logical.DateType;
+import org.apache.flink.table.types.logical.DayTimeIntervalType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.DistinctType;
+import org.apache.flink.table.types.logical.DoubleType;
+import org.apache.flink.table.types.logical.FloatType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LocalZonedTimestampType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.LogicalTypeVisitor;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.NullType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.SmallIntType;
+import org.apache.flink.table.types.logical.StructuredType;
+import org.apache.flink.table.types.logical.SymbolType;
+import org.apache.flink.table.types.logical.TimeType;
+import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.table.types.logical.TinyIntType;
+import org.apache.flink.table.types.logical.VarBinaryType;
+import org.apache.flink.table.types.logical.VarCharType;
+import org.apache.flink.table.types.logical.YearMonthIntervalType;
+import org.apache.flink.table.types.logical.ZonedTimestampType;
+
+import static java.lang.String.format;
+import static java.util.Arrays.stream;
+import static java.util.stream.Collectors.joining;
+
+/** Creates content for Elastic Index API call. */
+public class ElasticsearchIndexSchemaBuilder {
+
+ private static final ElasticsearchDataTypeMapper MAPPER = new ElasticsearchDataTypeMapper();
+
+ public static String buildIndexSchema(TableRow tableRow) {
+ String fields =
+ stream(tableRow.getTableDataFields())
+ .map(
+ field ->
+ format(
+ "\"%s\": %s",
+ field.getName(),
+ field.getDataType().accept(MAPPER)))
+ .collect(joining(", "));
+ return "{\"settings\": {\"number_of_shards\": 1}, \"mappings\": {\"properties\": {"
+ + fields
+ + "}}}";
+ }
+
+ /** Maps Flink types to Elasticsearch types. */
+ private static class ElasticsearchDataTypeMapper
+ implements DataTypeVisitor, LogicalTypeVisitor {
+
+ @Override
+ public String visit(AtomicDataType atomicDataType) {
+ return atomicDataType.getLogicalType().accept(this);
+ }
+
+ @Override
+ public String visit(CollectionDataType collectionDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(FieldsDataType fieldsDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(KeyValueDataType keyValueDataType) {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
+ @Override
+ public String visit(CharType charType) {
+ throw new IllegalArgumentException("CharType is not supported.");
+ }
+
+ @Override
+ public String visit(VarCharType varCharType) {
+ return "{\"type\": \"text\"}";
+ }
+
+ @Override
+ public String visit(BooleanType booleanType) {
+ return "{\"type\": \"boolean\"}";
+ }
+
+ @Override
+ public String visit(BinaryType binaryType) {
+ return "{\"type\": \"binary\"}";
+ }
+
+ @Override
+ public String visit(VarBinaryType varBinaryType) {
+ return "{\"type\": \"binary\"}";
+ }
+
+ @Override
+ public String visit(DecimalType decimalType) {
+ throw new IllegalArgumentException("DecimalType is not supported.");
+ }
+
+ @Override
+ public String visit(TinyIntType tinyIntType) {
+ return "{\"type\": \"byte\"}";
+ }
+
+ @Override
+ public String visit(SmallIntType smallIntType) {
+ return "{\"type\": \"short\"}";
+ }
+
+ @Override
+ public String visit(IntType intType) {
+ return "{\"type\": \"integer\"}";
+ }
+
+ @Override
+ public String visit(BigIntType bigIntType) {
+ return "{\"type\": \"long\"}";
+ }
+
+ @Override
+ public String visit(FloatType floatType) {
+ return "{\"type\": \"float\"}";
+ }
+
+ @Override
+ public String visit(DoubleType doubleType) {
+ return "{\"type\": \"double\"}";
+ }
+
+ @Override
+ public String visit(DateType dateType) {
+ return "{\"type\": \"date\", \"format\": \"strict_date||basic_date\"}";
+ }
+
+ @Override
+ public String visit(TimeType timeType) {
+ throw new IllegalArgumentException("TimeType is not supported.");
+ }
+
+ @Override
+ public String visit(TimestampType timestampType) {
+ return timestampType.getPrecision() <= 3
+ ? "{\"type\": \"date\"}"
+ : "{\"type\": \"date_nanos\"}";
+ }
+
+ @Override
+ public String visit(ZonedTimestampType zonedTimestampType) {
+ throw new IllegalArgumentException("ZonedTimestampType is not supported.");
+ }
+
+ @Override
+ public String visit(LocalZonedTimestampType localZonedTimestampType) {
+ throw new IllegalArgumentException("LocalZonedTimestampType is not supported.");
+ }
+
+ @Override
+ public String visit(YearMonthIntervalType yearMonthIntervalType) {
+ throw new IllegalArgumentException("YearMonthIntervalType is not supported.");
+ }
+
+ @Override
+ public String visit(DayTimeIntervalType dayTimeIntervalType) {
+ throw new IllegalArgumentException("DayTimeIntervalType is not supported.");
+ }
+
+ @Override
+ public String visit(ArrayType arrayType) {
+ throw new IllegalArgumentException("ArrayType is not supported.");
+ }
+
+ @Override
+ public String visit(MultisetType multisetType) {
+ throw new IllegalArgumentException("MultisetType is not supported.");
+ }
+
+ @Override
+ public String visit(MapType mapType) {
+ throw new IllegalArgumentException("MapType is not supported.");
+ }
+
+ @Override
+ public String visit(RowType rowType) {
+ throw new IllegalArgumentException("RowType is not supported.");
+ }
+
+ @Override
+ public String visit(DistinctType distinctType) {
+ throw new IllegalArgumentException("DistinctType is not supported.");
+ }
+
+ @Override
+ public String visit(StructuredType structuredType) {
+ throw new IllegalArgumentException("StructuredType is not supported.");
+ }
+
+ @Override
+ public String visit(NullType nullType) {
+ throw new IllegalArgumentException("NullType is not supported.");
+ }
+
+ @Override
+ public String visit(RawType> rawType) {
+ throw new IllegalArgumentException("RawType is not supported.");
+ }
+
+ @Override
+ public String visit(SymbolType> symbolType) {
+ throw new IllegalArgumentException("SymbolType is not supported.");
+ }
+
+ @Override
+ public String visit(LogicalType other) {
+ return other.accept(this);
+ }
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchMetadata.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchMetadata.java
new file mode 100644
index 000000000..89a8cfff2
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchMetadata.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+import org.apache.flink.connector.jdbc.testutils.DatabaseMetadata;
+
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+
+import javax.sql.XADataSource;
+
+/** SqlServer Metadata. */
+public class ElasticsearchMetadata implements DatabaseMetadata {
+
+ static final int ELASTIC_PORT = 9200;
+ static final String USERNAME = "elastic";
+ static final String PASSWORD = "password";
+
+ private final String username;
+ private final String password;
+ private final String jdbcUrl;
+ private final String driver;
+ private final String version;
+ private final String containerHost;
+ private final int containerPort;
+
+ public ElasticsearchMetadata(ElasticsearchContainer container) {
+ this.containerHost = container.getHost();
+ this.containerPort = container.getMappedPort(ELASTIC_PORT);
+ this.username = USERNAME;
+ this.password = PASSWORD;
+ this.jdbcUrl = "jdbc:elasticsearch://" + containerHost + ":" + containerPort;
+ this.driver = "org.elasticsearch.xpack.sql.jdbc.EsDriver";
+ this.version = container.getDockerImageName();
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return this.jdbcUrl;
+ }
+
+ @Override
+ public String getJdbcUrlWithCredentials() {
+ return String.format("%s&user=%s&password=%s", getJdbcUrl(), getUsername(), getPassword());
+ }
+
+ @Override
+ public String getUsername() {
+ return this.username;
+ }
+
+ @Override
+ public String getPassword() {
+ return this.password;
+ }
+
+ @Override
+ public XADataSource buildXaDataSource() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getDriverClass() {
+ return this.driver;
+ }
+
+ @Override
+ public String getVersion() {
+ return this.version;
+ }
+
+ public String getContainerHost() {
+ return containerHost;
+ }
+
+ public int getContainerPort() {
+ return containerPort;
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchRestClient.java b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchRestClient.java
new file mode 100644
index 000000000..e4437cf23
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/java/org/apache/flink/connector/jdbc/elasticsearch/testutils/ElasticsearchRestClient.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.elasticsearch.testutils;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.Assertions;
+
+import java.io.IOException;
+
+import static java.lang.String.format;
+
+/** Elasticsearch REST API client. */
+public class ElasticsearchRestClient {
+
+ private static final ObjectMapper OBJECT_MAPPER =
+ new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+
+ private final RestClient restClient;
+
+ public ElasticsearchRestClient(ElasticsearchMetadata metadata) {
+ this(
+ metadata.getContainerHost(),
+ metadata.getContainerPort(),
+ metadata.getUsername(),
+ metadata.getPassword());
+ }
+
+ public ElasticsearchRestClient(String host, int port, String username, String password) {
+ final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(
+ AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+ this.restClient =
+ RestClient.builder(new HttpHost(host, port, "http"))
+ .setHttpClientConfigCallback(
+ builder ->
+ builder.setDefaultCredentialsProvider(credentialsProvider))
+ .build();
+ }
+
+ public boolean trialEnabled() throws Exception {
+ Request request = new Request("GET", "/_license");
+ ElasticsearchLicenseResponse response =
+ executeRequest(request, ElasticsearchLicenseResponse.class);
+ return response != null
+ && response.license.status.equals("active")
+ && response.license.type.equals("trial");
+ }
+
+ public void enableTrial() throws Exception {
+ executeRequest(new Request("POST", "/_license/start_trial?acknowledge=true"));
+ }
+
+ public void createIndex(String indexName, String indexDefinition) throws Exception {
+ Request request = new Request("PUT", format("/%s/", indexName));
+ request.setJsonEntity(indexDefinition);
+ executeRequest(request);
+ }
+
+ public void deleteIndex(String indexName) throws Exception {
+ executeRequest(new Request("DELETE", format("/%s/", indexName)));
+ }
+
+ public void addDataBulk(String indexName, String content) throws Exception {
+ Request request = new Request("PUT", format("/%s/_bulk?refresh=true", indexName));
+ request.setJsonEntity(content);
+ executeRequest(request);
+ }
+
+ private T executeRequest(Request request, Class outputClass) throws IOException {
+ org.elasticsearch.client.Response response = restClient.performRequest(request);
+ Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
+ return OBJECT_MAPPER.readValue(EntityUtils.toString(response.getEntity()), outputClass);
+ }
+
+ private void executeRequest(Request request) throws IOException {
+ org.elasticsearch.client.Response response = restClient.performRequest(request);
+ Assertions.assertEquals(200, response.getStatusLine().getStatusCode());
+ }
+
+ private static class ElasticsearchLicenseResponse {
+ private static class License {
+ private String status;
+ private String type;
+
+ public String getStatus() {
+ return status;
+ }
+
+ public void setStatus(String status) {
+ this.status = status;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ public void setType(String type) {
+ this.type = type;
+ }
+ }
+
+ private License license;
+
+ public License getLicense() {
+ return license;
+ }
+
+ public void setLicense(License license) {
+ this.license = license;
+ }
+ }
+}
diff --git a/flink-connector-jdbc-elasticsearch/src/test/resources/log4j2-test.properties b/flink-connector-jdbc-elasticsearch/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..835c2ec9a
--- /dev/null
+++ b/flink-connector-jdbc-elasticsearch/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/pom.xml b/pom.xml
index fa8d48376..34548223a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -47,6 +47,7 @@ under the License.
flink-connector-jdbc-core
flink-connector-jdbc-cratedb
flink-connector-jdbc-db2
+ flink-connector-jdbc-elasticsearch
flink-connector-jdbc-mysql
flink-connector-jdbc-oceanbase
flink-connector-jdbc-oracle
@@ -412,6 +413,37 @@ under the License.
+
+ org.apache.maven.plugins
+ maven-enforcer-plugin
+
+
+ ban-elastic-dependencies
+
+ enforce
+
+
+
+
+
+
+ org.elasticsearch.*:*
+
+
+ org.elasticsearch.plugin:x-pack-sql-jdbc:*:*:provided
+ org.elasticsearch.client:elasticsearch-rest-client:*:*:test
+
+ Elastic x-pack dependencies are not allowed to be bundled due to licensing issues.
+
+
+
+
+
+
+
org.apache.maven.plugins
maven-jar-plugin
@@ -446,11 +478,6 @@ under the License.
maven-surefire-plugin
-
- org.apache.maven.plugins
- maven-enforcer-plugin
-
-
org.apache.maven.plugins
maven-shade-plugin