Skip to content

Commit 80be218

Browse files
committed
[FLINK-30702] Add Elasticsearch dialect
1 parent 8e0496a commit 80be218

File tree

16 files changed

+1504
-2
lines changed

16 files changed

+1504
-2
lines changed

docs/content/docs/connectors/table/jdbc.md

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ A driver dependency is also required to connect to a specified database. Here ar
5555
| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) |
5656
| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) |
5757
| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) |
58-
58+
| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [Download](https://www.elastic.co/downloads/jdbc-client) |
5959

6060
JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
6161

@@ -647,7 +647,7 @@ SELECT * FROM `custom_schema.test_table2`;
647647

648648
Data Type Mapping
649649
----------------
650-
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
650+
Flink supports connect to several databases which uses dialect like MySQL, Oracle, PostgreSQL, CrateDB, Derby, SQL Server, Db2, Elasticsearch. The Derby dialect usually used for testing purpose. The field data type mappings from relational databases data types to Flink SQL data types are listed in the following table, the mapping table can help define JDBC table in Flink easily.
651651

652652
<table class="table table-bordered">
653653
<thead>
@@ -659,6 +659,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
659659
<th class="text-left"><a href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL Server type</a></th>
660660
<th class="text-left"><a href="https://www.ibm.com/docs/en/db2-for-zos/12?topic=columns-data-types">Db2</a></th>
661661
<th class="text-left"><a href="https://trino.io/docs/current/language/types.html">Trino type</a></th>
662+
<th class="text-left"><a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html">Elastic SQL type</a></th>
662663
<th class="text-left"><a href="{{< ref "docs/dev/table/types" >}}">Flink SQL type</a></th>
663664
</tr>
664665
</thead>
@@ -671,6 +672,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
671672
<td><code>TINYINT</code></td>
672673
<td></td>
673674
<td><code>TINYINT</code></td>
675+
<td><code>BYTE</code></td>
674676
<td><code>TINYINT</code></td>
675677
</tr>
676678
<tr>
@@ -689,6 +691,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
689691
<td><code>SMALLINT</code></td>
690692
<td><code>SMALLINT</code></td>
691693
<td><code>SMALLINT</code></td>
694+
<td><code>SHORT</code></td>
692695
<td><code>SMALLINT</code></td>
693696
</tr>
694697
<tr>
@@ -706,6 +709,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
706709
<td><code>INT</code></td>
707710
<td><code>INTEGER</code></td>
708711
<td><code>INTEGER</code></td>
712+
<td><code>INTEGER</code></td>
709713
<td><code>INT</code></td>
710714
</tr>
711715
<tr>
@@ -722,6 +726,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
722726
<td><code>BIGINT</code></td>
723727
<td></td>
724728
<td><code>BIGINT</code></td>
729+
<td>
730+
<code>LONG</code><br>
731+
<code>UNSIGNED_LONG</code></td>
725732
<td><code>BIGINT</code></td>
726733
</tr>
727734
<tr>
@@ -731,6 +738,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
731738
<td></td>
732739
<td></td>
733740
<td></td>
741+
<td></td>
734742
<td></td>
735743
<td><code>DECIMAL(20, 0)</code></td>
736744
</tr>
@@ -747,6 +755,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
747755
<td><code>REAL</code></td>
748756
<td><code>REAL</code></td>
749757
<td><code>FLOAT</code></td>
758+
<td>
759+
<code>FLOAT</code><br>
760+
<code>HALF_FLOAT</code></td>
750761
<td><code>FLOAT</code></td>
751762
</tr>
752763
<tr>
@@ -762,6 +773,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
762773
<code>DOUBLE PRECISION</code></td>
763774
<td><code>FLOAT</code></td>
764775
<td><code>DOUBLE</code></td>
776+
<td>
777+
<code>DOUBLE</code><br>
778+
<code>SCALED_FLOAT</code></td>
765779
<td><code>DOUBLE</code></td>
766780
</tr>
767781
<tr>
@@ -784,6 +798,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
784798
<code>DECIMAL(p, s)</code>
785799
</td>
786800
<td><code>DECIMAL(p, s)</code></td>
801+
<td></td>
787802
<td><code>DECIMAL(p, s)</code></td>
788803
</tr>
789804
<tr>
@@ -797,6 +812,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
797812
<td><code>BOOLEAN</code></td>
798813
<td></td>
799814
<td><code>BOOLEAN</code></td>
815+
<td><code>BOOLEAN</code></td>
800816
</tr>
801817
<tr>
802818
<td><code>DATE</code></td>
@@ -806,6 +822,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
806822
<td><code>DATE</code></td>
807823
<td><code>DATE</code></td>
808824
<td><code>DATE</code></td>
825+
<td></td>
809826
<td><code>DATE</code></td>
810827
</tr>
811828
<tr>
@@ -816,6 +833,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
816833
<td><code>TIME(0)</code></td>
817834
<td><code>TIME</code></td>
818835
<td><code>TIME_WITHOUT_TIME_ZONE</code></td>
836+
<td></td>
819837
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
820838
</tr>
821839
<tr>
@@ -829,6 +847,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
829847
</td>
830848
<td><code>TIMESTAMP [(p)]</code></td>
831849
<td><code>TIMESTAMP_WITHOUT_TIME_ZONE</code></td>
850+
<td><code>DATETIME</code></td>
832851
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
833852
</tr>
834853
<tr>
@@ -868,6 +887,11 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
868887
<code>CHAR</code>
869888
<code>VARCHAR</code>
870889
</td>
890+
<td>
891+
<code>KEYWORD</code><br>
892+
<code>IP</code><br>
893+
<code>TEXT</code><br>
894+
<code>VERSION</code></td>
871895
<td><code>STRING</code></td>
872896
</tr>
873897
<tr>
@@ -886,6 +910,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
886910
</td>
887911
<td></td>
888912
<td><code>VARBINARY</code></td>
913+
<td><code>BINARY</code></td>
889914
<td><code>BYTES</code></td>
890915
</tr>
891916
<tr>
@@ -895,6 +920,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
895920
<td><code>ARRAY</code></td>
896921
<td></td>
897922
<td></td>
923+
<td></td>
898924
<td><code>ARRAY</code></td>
899925
<td><code>ARRAY</code></td>
900926
</tr>

flink-connector-jdbc/pom.xml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -238,6 +238,37 @@ under the License.
238238
<scope>test</scope>
239239
</dependency>
240240

241+
<!-- Elastic tests -->
242+
<dependency>
243+
<groupId>org.testcontainers</groupId>
244+
<artifactId>elasticsearch</artifactId>
245+
<scope>test</scope>
246+
</dependency>
247+
<dependency>
248+
<groupId>org.elasticsearch.plugin</groupId>
249+
<artifactId>x-pack-sql-jdbc</artifactId>
250+
<version>8.8.1</version>
251+
<scope>test</scope>
252+
</dependency>
253+
<dependency>
254+
<groupId>com.squareup.okhttp3</groupId>
255+
<artifactId>okhttp</artifactId>
256+
<version>4.11.0</version>
257+
<scope>test</scope>
258+
</dependency>
259+
<dependency>
260+
<groupId>com.fasterxml.jackson.core</groupId>
261+
<artifactId>jackson-databind</artifactId>
262+
<version>2.15.2</version>
263+
<scope>test</scope>
264+
</dependency>
265+
<dependency>
266+
<groupId>com.fasterxml.jackson.datatype</groupId>
267+
<artifactId>jackson-datatype-jsr310</artifactId>
268+
<version>2.15.2</version>
269+
<scope>test</scope>
270+
</dependency>
271+
241272
<!-- ArchUit test dependencies -->
242273
<dependency>
243274
<groupId>org.apache.flink</groupId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
23+
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
24+
import org.apache.flink.table.types.logical.LogicalTypeRoot;
25+
import org.apache.flink.table.types.logical.RowType;
26+
27+
import java.util.EnumSet;
28+
import java.util.Optional;
29+
import java.util.Set;
30+
31+
/** JDBC dialect for Elastic. */
32+
@Internal
33+
public class ElasticsearchDialect extends AbstractDialect {
34+
35+
private static final long serialVersionUID = 1L;
36+
37+
// Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
38+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
39+
private static final int MIN_TIMESTAMP_PRECISION = 0;
40+
private static final int MAX_TIMESTAMP_PRECISION = 9;
41+
42+
@Override
43+
public String dialectName() {
44+
return "Elasticsearch";
45+
}
46+
47+
@Override
48+
public Optional<String> defaultDriverName() {
49+
return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
50+
}
51+
52+
@Override
53+
public Set<LogicalTypeRoot> supportedTypes() {
54+
// The list of types supported by Elastic SQL.
55+
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
56+
return EnumSet.of(
57+
LogicalTypeRoot.BIGINT,
58+
LogicalTypeRoot.BOOLEAN,
59+
LogicalTypeRoot.DATE,
60+
LogicalTypeRoot.DOUBLE,
61+
LogicalTypeRoot.INTEGER,
62+
LogicalTypeRoot.FLOAT,
63+
LogicalTypeRoot.SMALLINT,
64+
LogicalTypeRoot.TINYINT,
65+
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
66+
LogicalTypeRoot.VARBINARY,
67+
LogicalTypeRoot.VARCHAR);
68+
}
69+
70+
@Override
71+
public Optional<Range> timestampPrecisionRange() {
72+
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
73+
}
74+
75+
@Override
76+
public JdbcRowConverter getRowConverter(RowType rowType) {
77+
return new ElasticsearchRowConverter(rowType);
78+
}
79+
80+
@Override
81+
public String getLimitClause(long limit) {
82+
return "LIMIT " + limit;
83+
}
84+
85+
@Override
86+
public String quoteIdentifier(String identifier) {
87+
return '"' + identifier + '"';
88+
}
89+
90+
@Override
91+
public Optional<String> getUpsertStatement(
92+
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
93+
throw new UnsupportedOperationException("Upsert is not supported.");
94+
}
95+
96+
@Override
97+
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
98+
throw new UnsupportedOperationException("Insert into is not supported.");
99+
}
100+
101+
@Override
102+
public String getUpdateStatement(
103+
String tableName, String[] fieldNames, String[] conditionFields) {
104+
throw new UnsupportedOperationException("Update is not supported.");
105+
}
106+
107+
@Override
108+
public String getDeleteStatement(String tableName, String[] conditionFields) {
109+
throw new UnsupportedOperationException("Delete is not supported.");
110+
}
111+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.connector.jdbc.databases.elasticsearch.dialect;
20+
21+
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
22+
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
23+
24+
/**
25+
* Factory for {@link ElasticsearchDialect}.
26+
*/
27+
public class ElasticsearchDialectFactory implements JdbcDialectFactory {
28+
29+
@Override
30+
public boolean acceptsURL(String url) {
31+
return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
32+
}
33+
34+
@Override
35+
public JdbcDialect create() {
36+
return new ElasticsearchDialect();
37+
}
38+
}

0 commit comments

Comments
 (0)