Skip to content

Commit 6032746

Browse files
committed
[FLINK-30702] Add Elasticsearch dialect
1 parent 7025642 commit 6032746

File tree

16 files changed

+1490
-13
lines changed

16 files changed

+1490
-13
lines changed

docs/content/docs/connectors/table/jdbc.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,18 +45,18 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
4545

4646
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
4747

48-
| Driver | Group Id | Artifact Id | JAR |
49-
|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
50-
| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
51-
| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
52-
| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
53-
| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
54-
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
55-
| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
56-
| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
57-
| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
58-
| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
59-
48+
| Driver | Group Id | Artifact Id | JAR |
49+
|:--------------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------|
50+
| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
51+
| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
52+
| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) |
53+
| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
54+
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
55+
| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
56+
| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
57+
| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
58+
| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) |
59+
| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [Download](https://www.elastic.co/downloads/jdbc-client) |
6060

6161
JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
6262

@@ -656,7 +656,7 @@ SELECT * FROM `custom_schema.test_table2`;
656656

657657
Data Type Mapping
658658
----------------
659-
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2 and OceanBase. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
659+
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, OceanBase, Elasticsearch. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
660660

661661
<table class="table table-bordered">
662662
<thead>
@@ -670,6 +670,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
670670
<th class="text-left"><a href="https://trino.io/docs/current/language/types.html">Trino type</a></th>
671671
<th class="text-left"><a href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000001106898">OceanBase MySQL mode type</a></th>
672672
<th class="text-left"><a href="https://en.oceanbase.com/docs/common-oceanbase-database-10000000001107076">OceanBase Oracle mode type</a></th>
673+
<th class="text-left"><a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html">Elastic SQL type</a></th>
673674
<th class="text-left"><a href="{{< ref "docs/dev/table/types" >}}">Flink SQL type</a></th>
674675
</tr>
675676
</thead>
@@ -684,6 +685,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
684685
<td><code>TINYINT</code></td>
685686
<td><code>TINYINT</code></td>
686687
<td></td>
688+
<td><code>BYTE</code></td>
687689
<td><code>TINYINT</code></td>
688690
</tr>
689691
<tr>
@@ -706,6 +708,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
706708
<code>SMALLINT</code><br>
707709
<code>TINYINT UNSIGNED</code></td>
708710
<td></td>
711+
<td><code>SHORT</code></td>
709712
<td><code>SMALLINT</code></td>
710713
</tr>
711714
<tr>
@@ -728,6 +731,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
728731
<code>MEDIUMINT</code><br>
729732
<code>SMALLINT UNSIGNED</code></td>
730733
<td></td>
734+
<td><code>INTEGER</code></td>
731735
<td><code>INT</code></td>
732736
</tr>
733737
<tr>
@@ -748,6 +752,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
748752
<code>BIGINT</code><br>
749753
<code>INT UNSIGNED</code></td>
750754
<td></td>
755+
<td>
756+
<code>LONG</code><br>
757+
<code>UNSIGNED_LONG</code></td>
751758
<td><code>BIGINT</code></td>
752759
</tr>
753760
<tr>
@@ -760,6 +767,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
760767
<td></td>
761768
<td><code>BIGINT UNSIGNED</code></td>
762769
<td></td>
770+
<td></td>
763771
<td><code>DECIMAL(20, 0)</code></td>
764772
</tr>
765773
<tr>
@@ -778,6 +786,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
778786
<td><code>FLOAT</code></td>
779787
<td>
780788
<code>BINARY_FLOAT</code></td>
789+
<td>
790+
<code>FLOAT</code><br>
791+
<code>HALF_FLOAT</code></td>
781792
<td><code>FLOAT</code></td>
782793
</tr>
783794
<tr>
@@ -796,6 +807,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
796807
<td><code>DOUBLE</code></td>
797808
<td><code>DOUBLE</code></td>
798809
<td><code>BINARY_DOUBLE</code></td>
810+
<td>
811+
<code>DOUBLE</code><br>
812+
<code>SCALED_FLOAT</code></td>
799813
<td><code>DOUBLE</code></td>
800814
</tr>
801815
<tr>
@@ -824,6 +838,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
824838
<td>
825839
<code>FLOAT(s)</code><br>
826840
<code>NUMBER(p, s)</code></td>
841+
<td></td>
827842
<td><code>DECIMAL(p, s)</code></td>
828843
</tr>
829844
<tr>
@@ -841,6 +856,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
841856
<code>TINYINT(1)</code></td>
842857
<td></td>
843858
<td><code>BOOLEAN</code></td>
859+
<td><code>BOOLEAN</code></td>
844860
</tr>
845861
<tr>
846862
<td><code>DATE</code></td>
@@ -852,6 +868,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
852868
<td><code>DATE</code></td>
853869
<td><code>DATE</code></td>
854870
<td><code>DATE</code></td>
871+
<td></td>
855872
<td><code>DATE</code></td>
856873
</tr>
857874
<tr>
@@ -864,6 +881,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
864881
<td><code>TIME_WITHOUT_TIME_ZONE</code></td>
865882
<td><code>TIME [(p)]</code></td>
866883
<td><code>DATE</code></td>
884+
<td></td>
867885
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
868886
</tr>
869887
<tr>
@@ -879,6 +897,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
879897
<td><code>TIMESTAMP_WITHOUT_TIME_ZONE</code></td>
880898
<td><code>DATETIME [(p)]</code></td>
881899
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
900+
<td><code>DATETIME</code></td>
882901
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
883902
</tr>
884903
<tr>
@@ -927,6 +946,11 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
927946
<code>NCHAR(n)</code><br>
928947
<code>VARCHAR2(n)</code><br>
929948
<code>CLOB</code></td>
949+
<td>
950+
<code>KEYWORD</code><br>
951+
<code>IP</code><br>
952+
<code>TEXT</code><br>
953+
<code>VERSION</code></td>
930954
<td><code>STRING</code></td>
931955
</tr>
932956
<tr>
@@ -952,6 +976,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
952976
<td>
953977
<code>RAW(s)</code><br>
954978
<code>BLOB</code></td>
979+
<td><code>BINARY</code></td>
955980
<td><code>BYTES</code></td>
956981
</tr>
957982
<tr>
@@ -964,6 +989,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
964989
<td><code>ARRAY</code></td>
965990
<td></td>
966991
<td></td>
992+
<td></td>
967993
<td><code>ARRAY</code></td>
968994
</tr>
969995
</tbody>

flink-connector-jdbc/pom.xml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ under the License.
4242
<oracle.version>21.8.0.0</oracle.version>
4343
<trino.version>418</trino.version>
4444
<byte-buddy.version>1.12.10</byte-buddy.version>
45+
<elasticsearch.version>8.11.1</elasticsearch.version>
4546
<surefire.module.config> <!-- required by
4647
Db2ExactlyOnceSinkE2eTest --> --add-opens=java.base/java.util=ALL-UNNAMED <!--
4748
SimpleJdbcConnectionProviderDriverClassConcurrentLoadingITCase--> --add-opens=java.base/java.lang=ALL-UNNAMED
@@ -114,6 +115,14 @@ under the License.
114115
<scope>provided</scope>
115116
</dependency>
116117

118+
<!-- Elasticsearch -->
119+
<dependency>
120+
<groupId>org.elasticsearch.plugin</groupId>
121+
<artifactId>x-pack-sql-jdbc</artifactId>
122+
<version>${elasticsearch.version}</version>
123+
<scope>provided</scope>
124+
</dependency>
125+
117126
<!-- Tests -->
118127

119128
<dependency>
@@ -250,6 +259,31 @@ under the License.
250259
<scope>test</scope>
251260
</dependency>
252261

262+
<!-- Elastic tests -->
263+
<dependency>
264+
<groupId>org.testcontainers</groupId>
265+
<artifactId>elasticsearch</artifactId>
266+
<scope>test</scope>
267+
</dependency>
268+
<dependency>
269+
<groupId>org.elasticsearch.client</groupId>
270+
<artifactId>elasticsearch-rest-client</artifactId>
271+
<version>${elasticsearch.version}</version>
272+
<scope>test</scope>
273+
</dependency>
274+
<dependency>
275+
<groupId>com.fasterxml.jackson.core</groupId>
276+
<artifactId>jackson-databind</artifactId>
277+
<version>2.13.4.2</version>
278+
<scope>test</scope>
279+
</dependency>
280+
<dependency>
281+
<groupId>com.fasterxml.jackson.datatype</groupId>
282+
<artifactId>jackson-datatype-jsr310</artifactId>
283+
<version>2.13.4</version>
284+
<scope>test</scope>
285+
</dependency>
286+
253287
<!-- ArchUit test dependencies -->
254288
<dependency>
255289
<groupId>org.apache.flink</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
23+
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
24+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
25+
import org.apache.flink.table.types.logical.RowType;
26+
27+
import java.util.EnumSet;
28+
import java.util.Optional;
29+
import java.util.Set;
30+
31+
/** JDBC dialect for Elastic. */
32+
@Internal
33+
public class ElasticsearchDialect extends AbstractDialect {
34+
35+
private static final long serialVersionUID = 1L;
36+
37+
// Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
38+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
39+
private static final int MIN_TIMESTAMP_PRECISION = 0;
40+
private static final int MAX_TIMESTAMP_PRECISION = 9;
41+
42+
@Override
43+
public String dialectName() {
44+
return "Elasticsearch";
45+
}
46+
47+
@Override
48+
public Optional<String> defaultDriverName() {
49+
return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
50+
}
51+
52+
@Override
53+
public Set<LogicalTypeRoot> supportedTypes() {
54+
// The list of types supported by Elastic SQL.
55+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
56+
return EnumSet.of(
57+
LogicalTypeRoot.BIGINT,
58+
LogicalTypeRoot.BOOLEAN,
59+
LogicalTypeRoot.DATE,
60+
LogicalTypeRoot.DOUBLE,
61+
LogicalTypeRoot.INTEGER,
62+
LogicalTypeRoot.FLOAT,
63+
LogicalTypeRoot.SMALLINT,
64+
LogicalTypeRoot.TINYINT,
65+
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
66+
LogicalTypeRoot.VARBINARY,
67+
LogicalTypeRoot.VARCHAR);
68+
}
69+
70+
@Override
71+
public Optional<Range> timestampPrecisionRange() {
72+
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
73+
}
74+
75+
@Override
76+
public JdbcRowConverter getRowConverter(RowType rowType) {
77+
return new ElasticsearchRowConverter(rowType);
78+
}
79+
80+
@Override
81+
public String getLimitClause(long limit) {
82+
return "LIMIT " + limit;
83+
}
84+
85+
@Override
86+
public String quoteIdentifier(String identifier) {
87+
return '"' + identifier + '"';
88+
}
89+
90+
@Override
91+
public Optional<String> getUpsertStatement(
92+
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
93+
throw new UnsupportedOperationException("Upsert is not supported.");
94+
}
95+
96+
@Override
97+
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
98+
throw new UnsupportedOperationException("Insert into is not supported.");
99+
}
100+
101+
@Override
102+
public String getUpdateStatement(
103+
String tableName, String[] fieldNames, String[] conditionFields) {
104+
throw new UnsupportedOperationException("Update is not supported.");
105+
}
106+
107+
@Override
108+
public String getDeleteStatement(String tableName, String[] conditionFields) {
109+
throw new UnsupportedOperationException("Delete is not supported.");
110+
}
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
20+
21+
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
22+
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
23+
24+
/** Factory for {@link ElasticsearchDialect}. */
25+
public class ElasticsearchDialectFactory implements JdbcDialectFactory {
26+
27+
@Override
28+
public boolean acceptsURL(String url) {
29+
return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
30+
}
31+
32+
@Override
33+
public JdbcDialect create() {
34+
return new ElasticsearchDialect();
35+
}
36+
}

0 commit comments

Comments
 (0)