diff --git a/images/integrations/data-ingestion/google-dataflow/pubsub-inqueue-job.png b/images/integrations/data-ingestion/google-dataflow/pubsub-inqueue-job.png new file mode 100644 index 00000000..88e12e7e Binary files /dev/null and b/images/integrations/data-ingestion/google-dataflow/pubsub-inqueue-job.png differ diff --git a/integrations/connectors/data-ingestion/GCP/google-dataflow/dataflow.mdx b/integrations/connectors/data-ingestion/GCP/google-dataflow/dataflow.mdx index 7d2293c0..0b38e20f 100644 --- a/integrations/connectors/data-ingestion/GCP/google-dataflow/dataflow.mdx +++ b/integrations/connectors/data-ingestion/GCP/google-dataflow/dataflow.mdx @@ -28,7 +28,7 @@ However, this option requires knowledge of Java programming and familiarity with - Requires coding and understanding of the Beam API. ## Predefined templates {#2-predefined-templates} -ClickHouse offers [predefined templates](/integrations/connectors/data-ingestion/GCP/google-dataflow/templates) designed for specific use cases, such as importing data from BigQuery into ClickHouse. These templates are ready-to-use and simplify the integration process, making them an excellent choice if you prefer a no-code solution. +ClickHouse offers [predefined templates](/integrations/connectors/data-ingestion/GCP/google-dataflow/templates) designed for specific use cases, such as batch imports from BigQuery or streaming ingestion from Pub/Sub into ClickHouse. These templates are ready-to-use and simplify the integration process, making them an excellent choice if you prefer a no-code solution. ### Key features {#key-features-1} - No Beam coding required. diff --git a/integrations/connectors/data-ingestion/GCP/google-dataflow/templates.mdx b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates.mdx index 3987c218..c6e58640 100644 --- a/integrations/connectors/data-ingestion/GCP/google-dataflow/templates.mdx +++ b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates.mdx @@ -26,5 +26,5 @@ For detailed step-by-step instructions, refer to the [Google Dataflow Run Pipeli ## List of ClickHouse Templates {#list-of-clickhouse-templates} * [BigQuery To ClickHouse](/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse) +* [Pub/Sub To ClickHouse](/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/pubsub-to-clickhouse) * [GCS To ClickHouse](https://github.com/ClickHouse/DataflowTemplates/issues/3) (coming soon!) -* [Pub Sub To ClickHouse](https://github.com/ClickHouse/DataflowTemplates/issues/4) (coming soon!) diff --git a/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse.mdx b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse.mdx index 133eac0e..d93db4b3 100644 --- a/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse.mdx +++ b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse.mdx @@ -177,4 +177,7 @@ This error occurs when ClickHouse runs out of memory while processing large batc ## Template source code {#template-source-code} -The template's source code is available in ClickHouse's [DataflowTemplates](https://github.com/ClickHouse/DataflowTemplates) fork. +The template's source code is available in: + +- [`GoogleCloudPlatform/DataflowTemplates`](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/googlecloud-to-clickhouse) — the upstream Google Cloud Platform repository. +- [`ClickHouse/DataflowTemplates`](https://github.com/ClickHouse/DataflowTemplates) — ClickHouse's fork. diff --git a/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/pubsub-to-clickhouse.mdx b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/pubsub-to-clickhouse.mdx new file mode 100644 index 00000000..87ef7d89 --- /dev/null +++ b/integrations/connectors/data-ingestion/GCP/google-dataflow/templates/pubsub-to-clickhouse.mdx @@ -0,0 +1,211 @@ +--- +sidebarTitle: 'Pub/Sub to ClickHouse' +slug: /integrations/google-dataflow/templates/pubsub-to-clickhouse +description: 'You can stream JSON messages from Pub/Sub into ClickHouse using a Google Dataflow template' +title: 'Dataflow Pub/Sub to ClickHouse template' +doc_type: 'guide' +keywords: ['Dataflow', 'Pub/Sub', 'PubSub', 'streaming', 'dead-letter'] +--- + +import { Image } from "/snippets/components/Image.jsx"; + +The Pub/Sub to ClickHouse template is a streaming pipeline that reads JSON-encoded messages from a Pub/Sub subscription and writes them into a ClickHouse table. +Messages that fail to parse or fail to map to the target schema are routed to a dead-letter destination: a ClickHouse table, a Pub/Sub topic, or both. + +## Pipeline requirements {#pipeline-requirements} + +* The source Pub/Sub subscription must exist. +* Messages published to the subscription must be valid JSON. +* The target ClickHouse table must exist, and its column names must match the field names in the JSON payload. +* The ClickHouse host must be accessible from the Dataflow worker machines. +* At least one dead-letter destination (`clickHouseDeadLetterTable` or `deadLetterTopic`) must be provided. If both are provided, failed messages are routed to both destinations simultaneously. +* When `clickHouseDeadLetterTable` is set, the dead-letter table must already exist in ClickHouse with the schema shown in [Dead-letter handling](#dead-letter-handling). +* When `deadLetterTopic` is set, the Pub/Sub topic must already exist. + +## Template parameters {#template-parameters} + +
+
+ +| Parameter Name | Parameter Description | Required | Notes | +|-----------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `inputSubscription` | The Pub/Sub subscription to read messages from. Example: `projects//subscriptions/`. | ✅ | Messages must be JSON-encoded. | +| `clickHouseUrl` | The ClickHouse endpoint URL. Use `https://` for SSL connections (ClickHouse Cloud) or `http://` for non-SSL connections. Example: `https://:8443` or `http://:8123`. | ✅ | For ClickHouse Cloud, use the HTTPS endpoint on port `8443`. | +| `clickHouseDatabase` | The name of the ClickHouse database where the target table resides. Example: `default`. | ✅ | | +| `clickHouseTable` | The name of the ClickHouse table to write data into. | ✅ | The table must exist before running the pipeline. | +| `clickHouseUsername` | The username to authenticate with ClickHouse. | ✅ | | +| `clickHousePassword` | The password to authenticate with ClickHouse. | ✅ | | +| `clickHouseDeadLetterTable` | The ClickHouse table to write failed messages into. Example: `my_table_dead_letter`. | | At least one of `clickHouseDeadLetterTable` or `deadLetterTopic` must be provided. The table must exist with the dead-letter schema shown in [Dead-letter handling](#dead-letter-handling). | +| `deadLetterTopic` | The Pub/Sub topic to publish failed messages to. Example: `projects//topics/`. | | At least one of `clickHouseDeadLetterTable` or `deadLetterTopic` must be provided. Failed payloads are published to the topic with `errorMessage` and `failedAt` set as message attributes. | +| `windowSeconds` | Duration in seconds for time-based batching windows. | | See [Batching and windowing](#batching-and-windowing) for the interaction with `batchRowCount`. If neither is set, combined mode uses defaults of `30s` and `1000` rows. | +| `batchRowCount` | Number of rows to accumulate before flushing to ClickHouse. | | See [Batching and windowing](#batching-and-windowing) for the interaction with `windowSeconds`. | +| `maxInsertBlockSize` | Maximum number of rows per `INSERT` statement sent to ClickHouse. Defaults to `1,000,000`. | | A `ClickHouseIO` option. | +| `maxRetries` | Maximum number of retry attempts for failed ClickHouse inserts. Defaults to `5`. | | A `ClickHouseIO` option. | +| `insertDeduplicate` | Whether to enable deduplication for `INSERT` queries in replicated ClickHouse tables. Defaults to `true`. | | A `ClickHouseIO` option. | +| `insertQuorum` | For `INSERT` queries in replicated tables, wait for the specified number of replicas to acknowledge the write and linearize the data addition. `0` disables quorum writes. | | A `ClickHouseIO` option. Disabled in default server settings. | +| `insertDistributedSync` | If enabled, `INSERT` queries into distributed tables wait until data is sent to all nodes in the cluster. Defaults to `true`. | | A `ClickHouseIO` option. | + + +Default values for all `ClickHouseIO` parameters can be found in [`ClickHouseIO` Apache Beam Connector](/integrations/connectors/data-ingestion/etl-tools/apache-beam#clickhouseiowrite-parameters). + + +## Message format and schema mapping {#message-format-and-schema-mapping} + +Pub/Sub messages must be JSON objects whose top-level field names exactly match the column names of the target ClickHouse table. + +To map incoming messages onto the target table, the pipeline performs the following at startup: + +1. Fetches the schema of the target ClickHouse table. +2. Builds a Beam `Row` schema from that ClickHouse schema. +3. For each incoming Pub/Sub message, parses the JSON payload and assembles a row by reading the fields named in the ClickHouse schema. + +
+ + +JSON field names must exactly match the ClickHouse column names (matching is case-sensitive). Fields in the message that do not correspond to a ClickHouse column are ignored. If a ClickHouse column has no matching field in the JSON payload, the pipeline attempts to write `NULL` for that column — which only succeeds when the column is declared as [`Nullable`](/core/reference/data-types/nullable). Messages that fail to parse, whose values cannot be coerced into the column type, or that would write `NULL` to a non-nullable column, are routed to the dead-letter destination. + + +### Type conversion {#type-conversion} + +JSON values are coerced into the corresponding ClickHouse column type: + +| ClickHouse Type | Notes | +|--------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------| +| [`Float32`](/core/reference/data-types/float) | Parsed via `Float.valueOf`. | +| [`Float64`](/core/reference/data-types/float) | Parsed via `Double.valueOf`. | +| [`Date`](/core/reference/data-types/date) | Parsed as an ISO-8601 date string. | +| [`DateTime`](/core/reference/data-types/datetime) | Parsed as an ISO-8601 datetime string (e.g. `2026-01-15T12:34:56Z`). | +| [`Array(T)`](/core/reference/data-types/array) | JSON array; each element is converted to the element type `T`. Empty or missing arrays produce an empty array. | +| Integer types (`Int8`/`Int16`/`Int32`/`Int64`, `UInt8`/`UInt16`/`UInt32`/`UInt64`) | Parsed from the JSON number or its string representation. | +| [`String`](/core/reference/data-types/string) | Used as-is for textual fields; non-textual JSON nodes are serialized to their JSON string form. | + +## Batching and windowing {#batching-and-windowing} + +Because the pipeline is streaming, incoming rows are accumulated into windows before being flushed to ClickHouse. The windowing strategy is selected from the parameters you provide: + +| `windowSeconds` | `batchRowCount` | Behavior | +|-----------------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------| +| set | unset | Time-based fixed windows of `windowSeconds`. | +| unset | set | Global window with a count trigger; fires every `batchRowCount` rows. | +| both set | both set | Global window with a combined trigger; fires on whichever condition is met first (time **or** row count). | +| neither set | neither set | Combined mode with defaults: `30` seconds or `1000` rows, whichever comes first. | + +Tuning these values lets you trade latency against insert efficiency. Smaller windows reduce end-to-end latency; larger windows produce fewer, larger `INSERT` batches. + +## Dead-letter handling {#dead-letter-handling} + +Messages that fail JSON parsing, schema mapping, or type coercion are routed to the configured dead-letter destination(s). At least one of `clickHouseDeadLetterTable` or `deadLetterTopic` must be provided; if both are set, failed messages are sent to both. + +### ClickHouse dead-letter table {#clickhouse-dead-letter-table} + +When `clickHouseDeadLetterTable` is set, the dead-letter table must already exist with this fixed schema: + +| Column | Type | Description | +|-----------------|------------|--------------------------------------------------------| +| `raw_message` | `String` | The original Pub/Sub message payload as UTF-8 text. | +| `error_message` | `String` | The exception message describing why the row failed. | +| `stack_trace` | `String` | The full Java stack trace captured at failure time. | +| `failed_at` | `DateTime` | The processing-time timestamp at which the row failed. | + +A minimal definition for a single-node deployment: + +```sql +CREATE TABLE my_table_dead_letter ( + raw_message String, + error_message String, + stack_trace String, + failed_at DateTime +) ENGINE = MergeTree() +ORDER BY failed_at; +``` + + +Adapt the engine and `ORDER BY` clause for your deployment — use `ReplicatedMergeTree` for replicated tables, add `ON CLUSTER` for distributed setups, and adjust partitioning or TTL as needed. + + +### Pub/Sub dead-letter topic {#pubsub-dead-letter-topic} + +When `deadLetterTopic` is set, each failed message is republished to the topic with: + +- **Payload**: the original message bytes. +- **Attribute** `errorMessage`: the exception message captured at failure time. +- **Attribute** `failedAt`: the processing-time timestamp at which the row failed. + +This makes it convenient to replay failed messages once the underlying schema or producer issue has been resolved. + +## Running the template {#running-the-template} + +The Pub/Sub to ClickHouse template is available from the Google Cloud Console. + + +Be sure to review this document, and specifically the above sections, to fully understand the template's configuration requirements and prerequisites. + + +Sign in to your Google Cloud Console and search for Dataflow. + +1. Press the `CREATE JOB FROM TEMPLATE` button. + Dataflow console +2. Once the template form is open, enter a job name and select the desired region. + + {/* PLACEHOLDER: add screenshot of the Pub/Sub to ClickHouse template initial form (job name + region) */} + +3. In the `Dataflow Template` input, type `ClickHouse` or `Pub/Sub`, and select the `Pub/Sub to ClickHouse` template. + + {/* PLACEHOLDER: add screenshot of selecting the "Pub/Sub to ClickHouse" template from the dropdown */} + +4. Once selected, the form expands. Fill in: + * The Pub/Sub input subscription, in the form `projects//subscriptions/`. + * The ClickHouse endpoint URL — for ClickHouse Cloud use `https://:8443`. + * The ClickHouse database, target table, username and password. + * At least one dead-letter destination: a ClickHouse table or a Pub/Sub topic (or both). + + {/* PLACEHOLDER: add screenshot of the expanded Pub/Sub to ClickHouse template form showing the required fields and the dead-letter section */} + +5. Optionally customize batching (`windowSeconds`, `batchRowCount`) and `ClickHouseIO` tuning parameters, as detailed in the [Template parameters](#template-parameters) section. + +### Monitor the job {#monitor-the-job} + +Navigate to the [Dataflow Jobs tab](https://console.cloud.google.com/dataflow/jobs) in your Google Cloud Console to monitor the status of the job. You'll find the job details, including progress and any errors: + +Dataflow console showing a running Pub/Sub to ClickHouse job + +The template also emits the following custom metrics under the `PubSubToClickHouse` namespace, viewable from the Dataflow job page: + +| Metric | Type | Description | +|-------------------------|--------------|------------------------------------------------------------------------------| +| `messages-received` | Counter | Total Pub/Sub messages received by the parsing step. | +| `rows-parsed-ok` | Counter | Messages successfully converted to a row and routed to the main output. | +| `rows-parse-failed` | Counter | Messages that failed parsing or schema mapping and were routed to dead-letter. | +| `message-payload-bytes` | Distribution | Distribution of incoming Pub/Sub message payload sizes, in bytes. | + +## Troubleshooting {#troubleshooting} + +### Memory limit (total) exceeded error (code 241) {#code-241-dbexception-memory-limit-total-exceeded} + +This error occurs when ClickHouse runs out of memory while processing large batches of data. To resolve this issue: + +* Increase the instance resources: Upgrade your ClickHouse server to a larger instance with more memory to handle the data processing load. +* Decrease the batch size: Reduce `batchRowCount` (and/or `maxInsertBlockSize`) in your Dataflow job configuration to send smaller chunks of data to ClickHouse, reducing memory consumption per batch. + +### All messages are going to the dead-letter destination {#all-messages-going-to-dlq} + +The most common causes are: + +* The JSON field names do not match the ClickHouse column names exactly (matching is case-sensitive). +* A column type cannot be coerced from the JSON value (for example, a non-ISO-8601 string in a `DateTime` column). +* The target table schema has changed since the pipeline started — the schema is fetched once at startup. Restart the job after applying schema changes. + +Inspect the `error_message` and `stack_trace` columns of the ClickHouse dead-letter table (or the `errorMessage` attribute on Pub/Sub dead-letter messages) to identify the root cause. + +### Pipeline starts but no rows arrive in ClickHouse {#no-rows-arriving} + +* Confirm the subscription is receiving messages — check the `messages-received` metric on the Dataflow job page. +* In time-based mode (`windowSeconds` only), rows are flushed only at window boundaries. Lower `windowSeconds` to verify flushes are occurring. +* Verify network reachability between Dataflow workers and the ClickHouse endpoint (firewall, VPC peering, or private service connect). + +## Template source code {#template-source-code} + +The template's source code is available in: + +- [`GoogleCloudPlatform/DataflowTemplates`](https://github.com/GoogleCloudPlatform/DataflowTemplates/tree/main/v2/googlecloud-to-clickhouse) — the upstream Google Cloud Platform repository. +- [`ClickHouse/DataflowTemplates`](https://github.com/ClickHouse/DataflowTemplates) — ClickHouse's fork. diff --git a/integrations/connectors/navigation.json b/integrations/connectors/navigation.json index d70b3637..3c3fb2be 100644 --- a/integrations/connectors/navigation.json +++ b/integrations/connectors/navigation.json @@ -141,7 +141,8 @@ "group": "Dataflow templates", "expanded": false, "pages": [ - "integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse" + "integrations/connectors/data-ingestion/GCP/google-dataflow/templates/bigquery-to-clickhouse", + "integrations/connectors/data-ingestion/GCP/google-dataflow/templates/pubsub-to-clickhouse" ] } ]