Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 34 additions & 7 deletions docs/content/docs/connectors/table/jdbc.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,14 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura

A driver dependency is also required to connect to a specified database. Here are drivers currently supported:

| Driver | Group Id | Artifact Id | JAR |
|:-----------| :------------------| :----------------------| :----------------|
| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) |
| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
| Driver | Group Id | Artifact Id | JAR |
|:--------------| :------------------| :----------------------| :----------------|
| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) |
| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
| Elasticsearch | `org.elasticsearch.plugin` | `x-pack-sql-jdbc` | [Download](https://www.elastic.co/downloads/jdbc-client) |


JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
Expand Down Expand Up @@ -614,6 +615,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<th class="text-left"><a href="https://docs.oracle.com/database/121/SQLRF/sql_elements001.htm#SQLRF30020">Oracle type</a></th>
<th class="text-left"><a href="https://www.postgresql.org/docs/12/datatype.html">PostgreSQL type</a></th>
<th class="text-left"><a href="https://docs.microsoft.com/en-us/sql/t-sql/data-types/data-types-transact-sql?view=sql-server-ver16">SQL Server type</a></th>
<th class="text-left"><a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html">Elastic SQL type</a></th>
<th class="text-left"><a href="{{< ref "docs/dev/table/types" >}}">Flink SQL type</a></th>
</tr>
</thead>
Expand All @@ -623,6 +625,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<td></td>
<td></td>
<td><code>TINYINT</code></td>
<td><code>BYTE</code></td>
<td><code>TINYINT</code></td>
</tr>
<tr>
Expand All @@ -636,6 +639,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>SMALLSERIAL</code><br>
<code>SERIAL2</code></td>
<td><code>SMALLINT</code></td>
<td><code>SHORT</code></td>
<td><code>SMALLINT</code></td>
</tr>
<tr>
Expand All @@ -648,6 +652,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>INTEGER</code><br>
<code>SERIAL</code></td>
<td><code>INT</code></td>
<td><code>INTEGER</code></td>
<td><code>INT</code></td>
</tr>
<tr>
Expand All @@ -659,13 +664,17 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>BIGINT</code><br>
<code>BIGSERIAL</code></td>
<td><code>BIGINT</code></td>
<td>
<code>LONG</code><br>
<code>UNSIGNED_LONG</code></td>
<td><code>BIGINT</code></td>
</tr>
<tr>
<td><code>BIGINT UNSIGNED</code></td>
<td></td>
<td></td>
<td></td>
<td></td>
<td><code>DECIMAL(20, 0)</code></td>
</tr>
<tr>
Expand All @@ -676,6 +685,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>REAL</code><br>
<code>FLOAT4</code></td>
<td><code>REAL</code></td>
<td>
<code>FLOAT</code><br>
<code>HALF_FLOAT</code></td>
<td><code>FLOAT</code></td>
</tr>
<tr>
Expand All @@ -687,6 +699,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>FLOAT8</code><br>
<code>DOUBLE PRECISION</code></td>
<td><code>FLOAT</code></td>
<td>
<code>DOUBLE</code><br>
<code>SCALED_FLOAT</code></td>
<td><code>DOUBLE</code></td>
</tr>
<tr>
Expand All @@ -703,6 +718,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>NUMERIC(p, s)</code><br>
<code>DECIMAL(p, s)</code></td>
<td><code>DECIMAL(p, s)</code></td>
<td></td>
<td><code>DECIMAL(p, s)</code></td>
</tr>
<tr>
Expand All @@ -713,19 +729,22 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<td><code>BOOLEAN</code></td>
<td><code>BIT</code></td>
<td><code>BOOLEAN</code></td>
<td><code>BOOLEAN</code></td>
</tr>
<tr>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
<td><code>DATE</code></td>
<td></td>
<td><code>DATE</code></td>
</tr>
<tr>
<td><code>TIME [(p)]</code></td>
<td><code>DATE</code></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
<td><code>TIME(0)</code></td>
<td></td>
<td><code>TIME [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
Expand All @@ -736,6 +755,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>DATETIME</code>
<code>DATETIME2</code>
</td>
<td><code>TIMESTAMP</code></td>
<td><code>TIMESTAMP [(p)] [WITHOUT TIMEZONE]</code></td>
</tr>
<tr>
Expand All @@ -760,6 +780,11 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<code>NVARCHAR(n)</code><br>
<code>TEXT</code><br>
<code>NTEXT</code></td>
<td>
<code>KEYWORD</code><br>
<code>IP</code><br>
<code>TEXT</code><br>
<code>VERSION</code></td>
<td><code>STRING</code></td>
</tr>
<tr>
Expand All @@ -774,13 +799,15 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
<td>
<code>BINARY(n)</code><br>
<code>VARBINARY(n)</code><br></td>
<td><code>BINARY</code></td>
<td><code>BYTES</code></td>
</tr>
<tr>
<td></td>
<td></td>
<td><code>ARRAY</code></td>
<td></td>
<td></td>
<td><code>ARRAY</code></td>
</tr>
</tbody>
Expand Down
20 changes: 20 additions & 0 deletions flink-connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,26 @@ under the License.
<scope>test</scope>
</dependency>

<!-- Elastic tests -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.16.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>x-pack-sql-jdbc</artifactId>
<version>8.6.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.10.0</version>
<scope>test</scope>
</dependency>

<!-- ArchUit test dependencies -->

<dependency>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.dialect.elastic;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
import org.apache.flink.connector.jdbc.internal.converter.ElasticRowConverter;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.RowType;

import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;

/** JDBC dialect for Elastic. */
@Internal
public class ElasticDialect extends AbstractDialect {

private static final long serialVersionUID = 1L;

// Define MAX/MIN precision of TIMESTAMP type according to Elastic docs:
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
private static final int MIN_TIMESTAMP_PRECISION = 0;
private static final int MAX_TIMESTAMP_PRECISION = 9;

@Override
public String dialectName() {
return "Elasticsearch";
}

@Override
public Optional<String> defaultDriverName() {
return Optional.of("org.elasticsearch.xpack.sql.jdbc.EsDriver");
}

@Override
public Set<LogicalTypeRoot> supportedTypes() {
// The list of types supported by Elastic SQL.
// https://www.elastic.co/guide/en/elasticsearch/reference/current/sql-data-types.html
return EnumSet.of(
LogicalTypeRoot.BIGINT,
LogicalTypeRoot.BOOLEAN,
LogicalTypeRoot.DATE,
LogicalTypeRoot.DOUBLE,
LogicalTypeRoot.INTEGER,
LogicalTypeRoot.FLOAT,
LogicalTypeRoot.SMALLINT,
LogicalTypeRoot.TINYINT,
LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
LogicalTypeRoot.VARBINARY,
LogicalTypeRoot.VARCHAR);
}

@Override
public Optional<Range> timestampPrecisionRange() {
return Optional.of(Range.of(MIN_TIMESTAMP_PRECISION, MAX_TIMESTAMP_PRECISION));
}

@Override
public JdbcRowConverter getRowConverter(RowType rowType) {
return new ElasticRowConverter(rowType);
}

@Override
public String getLimitClause(long limit) {
return "LIMIT " + limit;
}

@Override
public String quoteIdentifier(String identifier) {
return '"' + identifier + '"';
}

@Override
public Optional<String> getUpsertStatement(
String tableName, String[] fieldNames, String[] uniqueKeyFields) {
throw new UnsupportedOperationException("Upsert is not supported.");
}

@Override
public String getInsertIntoStatement(String tableName, String[] fieldNames) {
throw new UnsupportedOperationException("Insert into is not supported.");
}

@Override
public String getUpdateStatement(
String tableName, String[] fieldNames, String[] conditionFields) {
throw new UnsupportedOperationException("Update is not supported.");
}

@Override
public String getDeleteStatement(String tableName, String[] conditionFields) {
throw new UnsupportedOperationException("Delete is not supported.");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.dialect.elastic;

import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;

/** Factory for {@link ElasticDialect}. */
@Internal
public class ElasticDialectFactory implements JdbcDialectFactory {
@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:elasticsearch:") || url.startsWith("jdbc:es:");
}

@Override
public JdbcDialect create() {
return new ElasticDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.connector.jdbc.internal.converter;

import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

/**
* Runtime converter that responsible to convert between JDBC object and Flink internal object for
* Elastic.
*/
public class ElasticRowConverter extends AbstractJdbcRowConverter {

private static final long serialVersionUID = 1L;

@Override
public String converterName() {
return "Elasticsearch";
}

public ElasticRowConverter(RowType rowType) {
super(rowType);
}

@Override
protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case TINYINT:
case DOUBLE:
case FLOAT:
return val -> val;
default:
return super.createInternalConverter(type);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ org.apache.flink.connector.jdbc.dialect.mysql.MySqlDialectFactory
org.apache.flink.connector.jdbc.dialect.psql.PostgresDialectFactory
org.apache.flink.connector.jdbc.dialect.oracle.OracleDialectFactory
org.apache.flink.connector.jdbc.dialect.sqlserver.SqlServerDialectFactory
org.apache.flink.connector.jdbc.dialect.elastic.ElasticDialectFactory
Loading