From 7a3b94a7f7e1d19befda0abb1a92c55286fc38d7 Mon Sep 17 00:00:00 2001 From: boris-snyk Date: Tue, 30 Apr 2024 13:29:42 +0300 Subject: [PATCH 1/2] [FLINK-33761][Connector/JDBC] support snowflake dialect --- .../snowflake/dialect/SnowflakeDialect.java | 47 +++++++++++++++++++ .../dialect/SnowflakeDialectFactory.java | 37 +++++++++++++++ .../dialect/SnowflakeRowConverter.java | 45 ++++++++++++++++++ ....connector.jdbc.dialect.JdbcDialectFactory | 1 + 4 files changed, 130 insertions(+) create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialect.java create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java create mode 100644 flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialect.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialect.java new file mode 100644 index 000000000..56d387217 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialect.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.converter.JdbcRowConverter; +import org.apache.flink.connector.jdbc.dialect.AbstractPostgresCompatibleDialect; +import org.apache.flink.table.types.logical.RowType; + +import java.util.Optional; + +/** JDBC dialect for Snowflake. */ +@Internal +public class SnowflakeDialect extends AbstractPostgresCompatibleDialect { + private static final long serialVersionUID = 1L; + + @Override + public JdbcRowConverter getRowConverter(RowType rowType) { + return new SnowflakeRowConverter(rowType); + } + + @Override + public Optional defaultDriverName() { + return Optional.of("net.snowflake.client.jdbc.SnowflakeDriver"); + } + + @Override + public String dialectName() { + return "Snowflake"; + } +} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java new file mode 100644 index 000000000..4550e59d5 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectFactory.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.dialect.JdbcDialect; +import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory; + +/** Factory for {@link SnowflakeDialect}. */ +@Internal +public class SnowflakeDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:snowflake:"); + } + + @Override + public JdbcDialect create() { + return new SnowflakeDialect(); + } +} diff --git a/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java new file mode 100644 index 000000000..03cdcbe82 --- /dev/null +++ b/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeRowConverter.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.connector.jdbc.converter.AbstractPostgresCompatibleRowConverter; +import org.apache.flink.table.types.logical.RowType; + +import org.postgresql.jdbc.PgArray; + +/** + * Runtime converter that responsible to convert between JDBC object and Flink internal object for + * Snowflake. + */ +@Internal +public class SnowflakeRowConverter extends AbstractPostgresCompatibleRowConverter { + + private static final long serialVersionUID = 1L; + + @Override + public String converterName() { + return "Snowflake"; + } + + // https://docs.snowflake.com/en/sql-reference/intro-summary-data-types + public SnowflakeRowConverter(RowType rowType) { + super(rowType); + } +} diff --git a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory index 9a3cf26cb..422359e63 100644 --- a/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory +++ b/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory @@ -22,3 +22,4 @@ org.apache.flink.connector.jdbc.databases.sqlserver.dialect.SqlServerDialectFact org.apache.flink.connector.jdbc.databases.cratedb.dialect.CrateDBDialectFactory org.apache.flink.connector.jdbc.databases.db2.dialect.Db2DialectFactory org.apache.flink.connector.jdbc.databases.trino.dialect.TrinoDialectFactory +org.apache.flink.connector.jdbc.databases.snowflake.dialect.SnowflakeDialectFactory From 027afc60a8bde361d934927d96cfd8569a515442 Mon Sep 17 00:00:00 2001 From: boris-snyk Date: Wed, 1 May 2024 16:51:42 +0300 Subject: [PATCH 2/2] feat: unit test and doc for snowflake dialect --- docs/content/docs/connectors/table/jdbc.md | 64 +++++++++++++++---- .../dialect/SnowflakeDialectTypeTest.java | 63 ++++++++++++++++++ 2 files changed, 114 insertions(+), 13 deletions(-) create mode 100644 flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectTypeTest.java diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md index 056f2ab59..326dd5b0b 100644 --- a/docs/content/docs/connectors/table/jdbc.md +++ b/docs/content/docs/connectors/table/jdbc.md @@ -45,18 +45,18 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura A driver dependency is also required to connect to a specified database. Here are drivers currently supported: -| Driver | Group Id | Artifact Id | JAR | -|:-----------|:---------------------------|:-----------------------|:----------------------------------------------------------------------------------------------------------------------------------| -| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | -| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | -| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) | -| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) | -| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | -| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | -| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | -| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | -| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | - +| Driver | Group Id | Artifact Id | JAR | +|:-----------|:----------------------------|:------------------------|:----------------------------------------------------------------------------------------------------------------------------------| +| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) | +| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) | +| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download/) | +| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) | +| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) | +| CrateDB | `io.crate` | `crate-jdbc` | [Download](https://repo1.maven.org/maven2/io/crate/crate-jdbc/) | +| Db2 | `com.ibm.db2.jcc` | `db2jcc` | [Download](https://www.ibm.com/support/pages/download-db2-fix-packs-version-db2-linux-unix-and-windows) | +| Trino | `io.trino` | `trino-jdbc` | [Download](https://repo1.maven.org/maven2/io/trino/trino-jdbc/) | +| OceanBase | `com.oceanbase` | `oceanbase-client` | [Download](https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/) | +| Snowflake | `net.snowflake` | `snowflake-jdbc` | [Download](https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/) JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}). @@ -433,6 +433,13 @@ As there is no standard syntax for upsert, the following table describes the dat WHEN NOT MATCHED THEN INSERT (..)
VALUES (..) + + Snowflake + MERGE INTO .. USING (..) ON (..)
+ WHEN MATCHED THEN UPDATE SET (..)
+ WHEN NOT MATCHED THEN INSERT (..)
+ VALUES (..) + @@ -670,6 +677,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl Trino type OceanBase MySQL mode type OceanBase Oracle mode type + Snowflake }}">Flink SQL type @@ -684,6 +692,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl TINYINT TINYINT + TINYINT @@ -706,6 +715,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl SMALLINT
TINYINT UNSIGNED + SMALLINT SMALLINT @@ -728,6 +738,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl MEDIUMINT
SMALLINT UNSIGNED + INT INT @@ -748,6 +759,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl BIGINT
INT UNSIGNED + BIGINT BIGINT @@ -760,6 +772,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl BIGINT UNSIGNED + DECIMAL(20, 0) @@ -778,6 +791,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl FLOAT BINARY_FLOAT + + REAL
+ FLOAT FLOAT @@ -796,6 +812,9 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl DOUBLE DOUBLE BINARY_DOUBLE + + DOUBLE
+ DOUBLE PRECISION DOUBLE @@ -824,6 +843,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl FLOAT(s)
NUMBER(p, s) + NUMERIC(p, s) DECIMAL(p, s) @@ -841,6 +861,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl TINYINT(1) BOOLEAN + BOOLEAN DATE @@ -853,6 +874,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl DATE DATE DATE + DATE TIME [(p)] @@ -864,6 +886,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl TIME_WITHOUT_TIME_ZONE TIME [(p)] DATE + DATE TIME [(p)] [WITHOUT TIMEZONE] @@ -880,6 +903,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl DATETIME [(p)] TIMESTAMP [(p)] [WITHOUT TIMEZONE] TIMESTAMP [(p)] [WITHOUT TIMEZONE] + TIMESTAMP [(p)] [WITHOUT TIMEZONE] @@ -927,6 +951,14 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl NCHAR(n)
VARCHAR2(n)
CLOB + + CHAR(n)
+ CHARACTER(n)
+ VARCHAR(n)
+ CHARACTER VARYING(n)
+ TEXT + STRING + STRING @@ -951,7 +983,12 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl BLOB RAW(s)
- BLOB + BLOB + + + BINARY(n)
+ VARBINARY(n)
+ BYTES @@ -965,6 +1002,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl ARRAY + ARRAY diff --git a/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectTypeTest.java b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectTypeTest.java new file mode 100644 index 000000000..0adbbab36 --- /dev/null +++ b/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/databases/snowflake/dialect/SnowflakeDialectTypeTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.jdbc.databases.snowflake.dialect; + +import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeTest; + +import java.util.Arrays; +import java.util.List; + +/** Tests for all DataTypes and Dialects of JDBC Snowflake connector. */ +public class SnowflakeDialectTypeTest extends JdbcDialectTypeTest { + @Override + protected String testDialect() { + return "snowflake"; + } + + @Override + protected List testData() { + return Arrays.asList( + createTestItem("BOOLEAN"), + + createTestItem("TINYINT"), + createTestItem("SMALLINT"), + createTestItem("BIGINT"), + createTestItem("INT"), + createTestItem("INTEGER"), + + createTestItem("DECIMAL"), + createTestItem("NUMERIC"), + + createTestItem("DOUBLE"), + createTestItem("FLOAT"), + + createTestItem("DECIMAL(10, 4)"), + createTestItem("DECIMAL(38, 18)"), + createTestItem("VARCHAR"), + createTestItem("CHAR"), + createTestItem("VARBINARY"), + createTestItem("DATE"), + createTestItem("TIME"), + createTestItem("TIMESTAMP(3)"), + createTestItem("TIMESTAMP WITHOUT TIME ZONE"), + createTestItem("TIMESTAMP(1) WITHOUT TIME ZONE"), + // Not valid data + createTestItem("TIMESTAMP_LTZ(3)", "Unsupported type:TIMESTAMP_LTZ(3)")); + } +}