diff --git a/docs.json b/docs.json index b308770b..d9b35c86 100644 --- a/docs.json +++ b/docs.json @@ -184,7 +184,8 @@ "group": "Database tables (non-CDC)", "pages": [ "integrations/sources/postgresql-table", - "integrations/sources/mysql-table" + "integrations/sources/mysql-table", + "ingestion/sources/snowflake" ] }, { diff --git a/iceberg/integ-snowflake.mdx b/iceberg/integ-snowflake.mdx index b8d710f0..4ee48e69 100644 --- a/iceberg/integ-snowflake.mdx +++ b/iceberg/integ-snowflake.mdx @@ -6,6 +6,12 @@ description: "Sink data from RisingWave to an Apache Iceberg table and query it This guide shows how to sink data from RisingWave into an Apache Iceberg table and make it available for querying in Snowflake. This integration allows you to use RisingWave for real-time stream processing and Snowflake for large-scale analytics and data warehousing. + +For direct Snowflake integration, see: +- [Ingest data from Snowflake](/ingestion/sources/snowflake) - Load data from Snowflake tables into RisingWave +- [Sink data to Snowflake](/integrations/destinations/snowflake) - Write data from RisingWave to Snowflake + + **How it works** RisingWave → Iceberg table on S3 → AWS Glue or REST Catalog → Snowflake diff --git a/ingestion/overview.mdx b/ingestion/overview.mdx index 52d9a084..d37e3eb4 100644 --- a/ingestion/overview.mdx +++ b/ingestion/overview.mdx @@ -36,6 +36,7 @@ Below is a complete list of source connectors in RisingWave. Click a connector n | [Webhook](/integrations/sources/webhook) | Built-in | | [Events API (HTTP)](/integrations/sources/events-api) | External service | | [Apache Iceberg](/ingestion/sources/iceberg) | | +| [Snowflake](/ingestion/sources/snowflake) | Latest | | [Load generator (datagen)](/ingestion/sources/datagen) | Built-in | For information on supported data formats and encodings, and whether you need to use `CREATE SOURCE` or `CREATE TABLE` with each format, see [Data formats and encoding options](/ingestion/formats-and-encoding-options). @@ -285,6 +286,7 @@ CREATE TABLE test_users ( | **Google Cloud Storage** | ❌ | ✅ | ⚠️ | Batch only; periodic via external tools | | **Azure Blob** | ❌ | ✅ | ⚠️ | Batch only; periodic via external tools | | **Apache Iceberg** | ❌ | ✅ | ⚠️ | Batch only; periodic via external tools | +| **Snowflake** | ❌ | ✅ | ✅ | One-time batch load; periodic refresh when `refresh_interval_sec` is set | | **Datagen** | ✅ | ❌ | ❌ | Test data generation only | | **Direct INSERT** | ❌ | ✅ | ⚠️ | Manual insertion; periodic via external tools | | **Webhook** | ✅ | ✅ | ⚠️ | Push-based HTTP ingestion; best for SaaS webhooks + request validation/signatures | @@ -321,4 +323,4 @@ CREATE TABLE test_users ( - Monitor streaming lag for real-time sources to ensure data freshness. - Track batch job success and failure rates. - Set up alerts for data quality issues. -- Use RisingWave's system tables and dashboards for monitoring. \ No newline at end of file +- Use RisingWave's system tables and dashboards for monitoring. diff --git a/ingestion/sources/snowflake.mdx b/ingestion/sources/snowflake.mdx new file mode 100644 index 00000000..0833bc7c --- /dev/null +++ b/ingestion/sources/snowflake.mdx @@ -0,0 +1,219 @@ +--- +title: "Ingest data from Snowflake" +sidebarTitle: Snowflake +description: "Load data from Snowflake tables into RisingWave using the ADBC connector." +--- + +This guide describes how to ingest batch data from Snowflake tables into RisingWave using the ADBC (Arrow Database Connectivity) connector. This enables you to create refreshable tables that periodically pull data from Snowflake. + +Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see [Snowflake official website](https://www.snowflake.com/en/). + +## Prerequisites + +* A Snowflake account with access to the database and tables you want to ingest. +* The Snowflake account identifier (e.g., `myaccount.us-east-1`). +* Valid authentication credentials (username/password, OAuth token, JWT private key, etc.). +* Network access from RisingWave to your Snowflake instance. + +## Connecting to Snowflake + +RisingWave supports loading data from Snowflake tables using the `adbc_snowflake` connector. This creates a refreshable table that periodically fetches the latest data from Snowflake. + +### Syntax + +```sql +CREATE TABLE table_name ( + primary key (order_id) -- Replace with your actual primary key column(s) +) WITH ( + connector = 'adbc_snowflake', + refresh_mode = 'FULL_RELOAD', + refresh_interval_sec = 'interval_in_seconds', + adbc_snowflake.account = 'snowflake_account', + adbc_snowflake.username = 'username', + adbc_snowflake.password = 'password', + adbc_snowflake.database = 'database_name', + adbc_snowflake.schema = 'schema_name', + adbc_snowflake.warehouse = 'warehouse_name', + adbc_snowflake.table = 'source_table_in_snowflake' +); +``` + + +**Automatic Schema Inference** + +Column definitions are automatically inferred from the Snowflake table and should not be manually specified in the `CREATE TABLE` statement. You must specify a primary key when creating a Snowflake table in RisingWave. + + +## Parameters + +Unless specified otherwise, parameters are required. + +| Parameter | Description | +| :--------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| connector | Must be `adbc_snowflake`. | +| refresh_mode | Must be `FULL_RELOAD`. The entire table is re-read on each refresh. | +| refresh_interval_sec | **Optional**. The refresh interval in seconds. Determines how frequently data is fetched from Snowflake. | +| adbc_snowflake.account | The Snowflake account identifier (e.g., `myaccount.us-east-1` or `myaccount`). | +| adbc_snowflake.username | The Snowflake username for authentication. | +| adbc_snowflake.password | **Optional**. The password for username/password authentication. Required when using username/password authentication (the default auth type). | +| adbc_snowflake.database | The name of the Snowflake database. | +| adbc_snowflake.schema | The Snowflake schema containing the table. | +| adbc_snowflake.warehouse | The Snowflake warehouse to use for queries. | +| adbc_snowflake.table | The name of the Snowflake table to ingest. | +| adbc_snowflake.auth_type | **Optional**. The authentication method. Default is `auth_snowflake` (username/password). Other options: `auth_oauth`, `auth_jwt`, `auth_ext_browser`, `auth_okta`, `auth_mfa`, `auth_pat`, `auth_wif`. | +| adbc_snowflake.auth_token | **Optional**. OAuth token for authentication. Required when `adbc_snowflake.auth_type` is `auth_oauth`. | +| adbc_snowflake.jwt_private_key_path | **Optional**. Local file path on the RisingWave server to the JWT private key file (e.g., `/path/to/key.pem`). | +| adbc_snowflake.jwt_private_key_pkcs8_value | **Optional**. Inline PKCS#8 private key value for JWT authentication. | +| adbc_snowflake.jwt_private_key_pkcs8_password | **Optional**. Password for `adbc_snowflake.jwt_private_key_pkcs8_value`, if the private key is encrypted. | + + +- If `refresh_interval_sec` is omitted, data is loaded once at table creation, and subsequent refreshes require manually running [`REFRESH TABLE`](/sql/commands/sql-refresh-table). +- When using JWT authentication (set `adbc_snowflake.auth_type` to `auth_jwt`), provide exactly one of `adbc_snowflake.jwt_private_key_path` and `adbc_snowflake.jwt_private_key_pkcs8_value`. + + +## Authentication methods + +The Snowflake connector supports multiple authentication methods: + +### Username and password (default) + +```sql +CREATE TABLE my_snowflake_table ( + primary key ("order_id") +) WITH ( + connector = 'adbc_snowflake', + refresh_mode = 'FULL_RELOAD', + refresh_interval_sec = '3600', + adbc_snowflake.account = 'myaccount.us-east-1', + adbc_snowflake.username = 'myuser', + adbc_snowflake.password = 'mypassword', + adbc_snowflake.database = 'SALES_DB', + adbc_snowflake.schema = 'PUBLIC', + adbc_snowflake.warehouse = 'COMPUTE_WH', + adbc_snowflake.table = 'ORDERS' +); +``` + +### JWT authentication + +```sql +CREATE TABLE my_snowflake_table ( + primary key ("order_id") +) WITH ( + connector = 'adbc_snowflake', + refresh_mode = 'FULL_RELOAD', + refresh_interval_sec = '7200', + adbc_snowflake.account = 'myaccount', + adbc_snowflake.username = 'myuser', + adbc_snowflake.database = 'SALES_DB', + adbc_snowflake.schema = 'PUBLIC', + adbc_snowflake.warehouse = 'COMPUTE_WH', + adbc_snowflake.table = 'ORDERS', + adbc_snowflake.auth_type = 'auth_jwt', + adbc_snowflake.jwt_private_key_path = '/path/to/key.pem' +); +``` + +### OAuth authentication + +```sql +CREATE TABLE my_snowflake_table ( + primary key ("order_id") +) WITH ( + connector = 'adbc_snowflake', + refresh_mode = 'FULL_RELOAD', + refresh_interval_sec = '3600', + adbc_snowflake.account = 'myaccount.us-east-1', + adbc_snowflake.username = 'myuser', + adbc_snowflake.database = 'SALES_DB', + adbc_snowflake.schema = 'PUBLIC', + adbc_snowflake.warehouse = 'COMPUTE_WH', + adbc_snowflake.table = 'ORDERS', + adbc_snowflake.auth_type = 'auth_oauth', + adbc_snowflake.auth_token = 'your_oauth_token' +); +``` + +## Data type mapping + +The following table shows the corresponding data types between Snowflake and RisingWave. For details on native RisingWave data types, see [Overview of data types](/sql/data-types/overview). + +| Snowflake type | RisingWave type | Notes | +| :--------------- | :------------------------------- | :------------------------------------------------- | +| STRING | VARCHAR | | +| NUMBER | DECIMAL or BIGINT | Depends on scale and precision | +| FLOAT | DOUBLE PRECISION | | +| DECIMAL | DECIMAL | | +| CHAR | VARCHAR | | +| TEXT | VARCHAR | | +| DATE | DATE | | +| TIME | Not supported | Will report an error | +| TIMESTAMP_NTZ | TIMESTAMP WITHOUT TIME ZONE | | +| TIMESTAMP_LTZ | TIMESTAMP WITH TIME ZONE | | +| TIMESTAMP_TZ | TIMESTAMP WITH TIME ZONE | | +| BOOLEAN | BOOLEAN | | +| BINARY | BYTEA | | +| VARIANT | VARCHAR | JSON data stored as string | +| OBJECT | VARCHAR | JSON objects stored as string | +| ARRAY | VARCHAR | Arrays stored as string | + +## Complete example + +This example demonstrates how to create a refreshable table that loads data from a Snowflake table every hour. + +### Step 1: Create the refreshable table + +```sql +CREATE TABLE snowflake_orders ( + primary key ("order_id") +) WITH ( + connector = 'adbc_snowflake', + refresh_mode = 'FULL_RELOAD', + refresh_interval_sec = '3600', -- Refresh every hour + + -- Snowflake connection parameters + adbc_snowflake.account = 'myaccount.us-east-1', + adbc_snowflake.username = 'analytics_user', + adbc_snowflake.password = 'secure_password', + adbc_snowflake.database = 'PRODUCTION', + adbc_snowflake.schema = 'SALES', + adbc_snowflake.warehouse = 'ANALYTICS_WH', + adbc_snowflake.table = 'ORDERS' +); +``` + +### Step 2: Query the data + +```sql +SELECT * FROM snowflake_orders LIMIT 10; +``` + +### Step 3: Create materialized views + +You can create materialized views based on the Snowflake data: + +```sql +-- The columns order_date and total_amount are automatically inferred from the Snowflake table +CREATE MATERIALIZED VIEW daily_sales AS +SELECT + DATE_TRUNC('day', order_date) AS sale_date, + COUNT(*) AS order_count, + SUM(total_amount) AS total_revenue +FROM snowflake_orders +GROUP BY DATE_TRUNC('day', order_date); +``` + +## Limitations and requirements + +* **Refresh mode**: Only `FULL_RELOAD` mode is supported. If `refresh_interval_sec` is set, RisingWave refreshes on that schedule. If it is omitted, refresh manually with [`REFRESH TABLE`](/sql/commands/sql-refresh-table). +* **Primary key**: You must define a primary key when creating a Snowflake table in RisingWave. +* **Schema inference**: Column definitions are automatically inferred from the Snowflake table. Do not manually specify columns in the `CREATE TABLE` statement. +* **Feature flag**: The Snowflake connector requires the `source-adbc_snowflake` feature to be enabled at compile time. This is enabled by default in official RisingWave builds. +* **Snapshot consistency**: The connector uses Snowflake's time travel feature when available to read data from a consistent point in time; when unavailable (for example, due to Snowflake retention settings), it falls back to reading current table data. +* **Performance**: For large tables, consider the refresh interval carefully to balance data freshness with query costs in Snowflake. + +## What's next? + +* [Sink data to Snowflake](/integrations/destinations/snowflake) - Learn how to write data from RisingWave back to Snowflake +* [Work with Snowflake and Iceberg](/iceberg/integ-snowflake) - Integrate Snowflake with Apache Iceberg catalogs +* [`REFRESH TABLE`](/sql/commands/sql-refresh-table) - Manually refresh a Snowflake table when `refresh_interval_sec` is not set diff --git a/integrations/destinations/snowflake.mdx b/integrations/destinations/snowflake.mdx index f9e618c1..3b48fca0 100644 --- a/integrations/destinations/snowflake.mdx +++ b/integrations/destinations/snowflake.mdx @@ -6,6 +6,8 @@ description: This guide describes how to sink data from RisingWave to Snowflake Snowflake is a cloud-based data warehousing platform that allows for scalable and efficient data storage and analysis. For more information about Snowflake, see [Snowflake official website](https://www.snowflake.com/en/). +This page describes how to **sink data to** Snowflake. To **ingest data from** Snowflake, see [Ingest data from Snowflake](/ingestion/sources/snowflake). + Sinking from RisingWave to Snowflake utilizes [Snowpipe](https://docs.snowflake.com/en/user-guide/data-load-snowpipe-rest-apis) for data loading. Initially, data is staged in a user-managed S3 bucket in JSON format, and then loaded into the Snowflake table via Snowpipe. For more information, see [Overview of the Snowpipe REST endpoints to load data](https://docs.snowflake.com/user-guide/data-load-snowpipe-rest-overview).