|
2 | 2 | :description: Query Redpanda topic data stored in Iceberg tables, based on the topic Iceberg mode and schema. |
3 | 3 | :page-categories: Iceberg, Tiered Storage, Management, High Availability, Data Replication, Integration |
4 | 4 |
|
| 5 | +// tag::single-source[] |
| 6 | + |
| 7 | +ifndef::env-cloud[] |
5 | 8 | [NOTE] |
6 | 9 | ==== |
7 | 10 | include::shared:partial$enterprise-license.adoc[] |
8 | 11 | ==== |
| 12 | +endif::[] |
9 | 13 |
|
10 | 14 | When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/choose-iceberg-mode.adoc[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. You do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda. |
11 | 15 |
|
12 | | -include::manage:partial$iceberg/query-iceberg-topics.adoc[] |
| 16 | +== Access Iceberg tables |
| 17 | + |
| 18 | +ifndef::env-cloud[] |
| 19 | +Redpanda generates an Iceberg table with the same name as the topic. Depending on the processing engine and your Iceberg xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog implementation], you may also need to define the table (for example using `CREATE TABLE`) to point the data lakehouse to its location in the catalog. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[]. |
| 20 | +endif::[] |
| 21 | + |
| 22 | +ifdef::env-cloud[] |
| 23 | +Redpanda generates an Iceberg table with the same name as the topic. Depending on the processing engine and your Iceberg catalog implementation, you may also need to define the table (for example using `CREATE TABLE`) to point the data lakehouse to its location in the catalog. |
| 24 | + |
| 25 | +For BYOC clusters, the bucket name and table location are as follows: |
| 26 | + |
| 27 | +|=== |
| 28 | +| Cloud provider | Bucket or container name | Iceberg table location |
| 29 | + |
| 30 | +| AWS |
| 31 | +| `redpanda-cloud-storage-<cluster-id>` |
| 32 | +.3+a| `redpanda-iceberg-catalog/redpanda/<topic-name>` |
| 33 | + |
| 34 | +| Azure |
| 35 | +a| `<cluster-id>` |
| 36 | + |
| 37 | +The Redpanda cluster ID is also used as the container name (ID) and the storage account ID. |
| 38 | + |
| 39 | +| GCP |
| 40 | +| `redpanda-cloud-storage-<cluster-id>` |
| 41 | + |
| 42 | +|=== |
| 43 | + |
| 44 | +For Azure clusters, you must add the public IP addresses or ranges from the REST catalog service, or other clients requiring access to the Iceberg data, to your cluster's allow list. Alternatively, add subnet IDs to the allow list if the requests originate from the same Azure region. |
| 45 | + |
| 46 | +For example, to add subnet IDs to the allow list through the Control Plane API link:/api/doc/cloud-controlplane/operation/operation-clusterservice_updatecluster[`PATCH /v1/clusters/<cluster-id>`] endpoint, run: |
| 47 | + |
| 48 | +[,bash] |
| 49 | +---- |
| 50 | +curl -X PATCH https://api.cloud.redpanda.com/v1/clusters/<cluster-id> \ |
| 51 | + -H "Content-Type: application/json" \ |
| 52 | + -H "Authorization: Bearer ${RP_CLOUD_TOKEN}" \ |
| 53 | + -d @- << EOF |
| 54 | +{ |
| 55 | + "cloud_storage": { |
| 56 | + "azure": { |
| 57 | + "allowed_subnet_ids": [ |
| 58 | + <list-of-subnet-ids> |
| 59 | + ] |
| 60 | + } |
| 61 | + } |
| 62 | +} |
| 63 | +EOF |
| 64 | +---- |
| 65 | + |
| 66 | +endif::[] |
| 67 | + |
| 68 | +Some query engines may require you to manually refresh the Iceberg table snapshot (for example, by running a command like `ALTER TABLE <table-name> REFRESH;`) to see the latest data. |
| 69 | + |
| 70 | +If your engine needs the full JSON metadata path, use the following: |
| 71 | + |
| 72 | +``` |
| 73 | +redpanda-iceberg-catalog/redpanda/<topic-name>/metadata/v<version-number>.metadata.json |
| 74 | +``` |
| 75 | + |
| 76 | +This provides read access to all snapshots written as of the specified table version (denoted by `version-number`). |
| 77 | + |
| 78 | +NOTE: Redpanda automatically removes expired snapshots on a periodic basis. Snapshot expiry helps maintain a smaller metadata size and reduces the window available for <<time-travel-queries,time travel>>. |
| 79 | + |
| 80 | +== Query examples |
| 81 | + |
| 82 | +ifndef::env-cloud[] |
| 83 | +To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topics have glossterm:Tiered Storage[] configured to an AWS S3 bucket. A sample record contains the following data: |
| 84 | +endif::[] |
| 85 | + |
| 86 | +ifdef::env-cloud[] |
| 87 | +To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topic's Iceberg data is stored in an AWS S3 bucket. A sample record contains the following data: |
| 88 | +endif::[] |
| 89 | + |
| 90 | +[,bash,role=no-copy] |
| 91 | +---- |
| 92 | +{"user_id": 2324, "event_type": "BUTTON_CLICK", "ts": "2024-11-25T20:23:59.380Z"} |
| 93 | +---- |
| 94 | + |
| 95 | +=== Topic with schema (`value_schema_id_prefix` mode) |
| 96 | + |
| 97 | +NOTE: The steps in this section also apply to the `value_schema_latest` mode, except the produce step. The `value_schema_latest` mode is not compatible with the Schema Registry wire format. The xref:reference:rpk/rpk-topic/rpk-topic-produce[`rpk topic produce`] command embeds the wire format header, so you must use your own producer code with `value_schema_latest`. |
| 98 | + |
| 99 | +Assume that you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`: |
| 100 | + |
| 101 | +.`schema.avsc` |
| 102 | +[,avro] |
| 103 | +---- |
| 104 | +{ |
| 105 | + "type" : "record", |
| 106 | + "namespace" : "com.redpanda.examples.avro", |
| 107 | + "name" : "ClickEvent", |
| 108 | + "fields" : [ |
| 109 | + { "name": "user_id", "type" : "int" }, |
| 110 | + { "name": "event_type", "type" : "string" }, |
| 111 | + { "name": "ts", "type": "string" } |
| 112 | + ] |
| 113 | + } |
| 114 | +---- |
| 115 | + |
| 116 | +. Register the schema under the `ClickEvent-value` subject: |
| 117 | ++ |
| 118 | +[,bash] |
| 119 | +---- |
| 120 | +rpk registry schema create ClickEvent-value --schema path/to/schema.avsc --type avro |
| 121 | +---- |
| 122 | + |
| 123 | +. Produce to the `ClickEvent` topic using the following format: |
| 124 | ++ |
| 125 | +[,bash] |
| 126 | +---- |
| 127 | +echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' --schema-id=topic |
| 128 | +---- |
| 129 | ++ |
| 130 | +The `value_schema_id_prefix` mode requires that you produce to a topic using the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Registry wire format], which includes the magic byte and schema ID in the prefix of the message payload. This allows Redpanda to identify the correct schema version in the Schema Registry for a record. |
| 131 | + |
| 132 | +. The following Spark SQL query returns values from columns in the `ClickEvent` table, with the table structure derived from the schema, and column names matching the schema fields. If you've integrated a catalog, query engines such as Spark SQL provide Iceberg integrations that allow easy discovery and access to existing Iceberg tables in object storage. |
| 133 | ++ |
| 134 | +[,sql] |
| 135 | +---- |
| 136 | +SELECT * |
| 137 | +FROM `<catalog-name>`.redpanda.ClickEvent; |
| 138 | +---- |
| 139 | ++ |
| 140 | +[,bash,role=no-copy] |
| 141 | +---- |
| 142 | ++-----------------------------------+---------+--------------+--------------------------+ |
| 143 | +| redpanda | user_id | event_type | ts | |
| 144 | ++-----------------------------------+---------+--------------+--------------------------+ |
| 145 | +| {"partition":0,"offset":0,"timestamp":2025-03-05 15:09:20.436,"headers":null,"key":null} | 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z | |
| 146 | ++-----------------------------------+---------+--------------+--------------------------+ |
| 147 | +---- |
| 148 | + |
| 149 | +=== Topic in key-value mode |
| 150 | + |
| 151 | +In `key_value` mode, you do not associate the topic with a schema in the Schema Registry, which means using semi-structured data in Iceberg. The record keys and values can have an arbitrary structure, so Redpanda stores them in https://apache.github.io/iceberg/spec/?h=spec#primitive-types[binary format^] in Iceberg. |
| 152 | + |
| 153 | +In this example, assume that you have created the `ClickEvent_key_value` topic, and set `redpanda.iceberg.mode` to `key_value`. |
| 154 | + |
| 155 | +. Produce to the `ClickEvent_key_value` topic using the following format: |
| 156 | ++ |
| 157 | +[,bash] |
| 158 | +---- |
| 159 | +echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n' |
| 160 | +---- |
| 161 | + |
| 162 | +. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value: |
| 163 | ++ |
| 164 | +[,sql] |
| 165 | +---- |
| 166 | +SELECT * |
| 167 | +FROM `<catalog-name>`.redpanda.ClickEvent_key_value; |
| 168 | +---- |
| 169 | ++ |
| 170 | +[,bash,role=no-copy] |
| 171 | +---- |
| 172 | ++-----------------------------------+------------------------------------------------------------------------------+ |
| 173 | +| redpanda | value | |
| 174 | ++-----------------------------------+------------------------------------------------------------------------------+ |
| 175 | +| {"partition":0,"offset":0,"timestamp":2025-03-05 15:14:30.931,"headers":null,"key":key1} | {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"} | |
| 176 | ++-----------------------------------+------------------------------------------------------------------------------+ |
| 177 | +---- |
| 178 | + |
| 179 | +Depending on your query engine, you might need to first decode the binary value to display the record key and value using a SQL helper function. For example, see the https://spark.apache.org/docs/latest/api/sql/index.html#unhex[`decode` and `unhex`^] Spark SQL functions, or the https://docs.snowflake.com/en/sql-reference/functions/hex_decode_string[HEX_DECODE_STRING^] Snowflake function. Some engines may also automatically decode the binary value for you. |
| 180 | + |
| 181 | +=== Time travel queries |
| 182 | + |
| 183 | +Some query engines, such as Spark, support time travel with Iceberg, allowing you to query the table as it existed at a specific point in the past. You can run a time travel query by specifying a timestamp or version number. |
| 184 | + |
| 185 | +Redpanda automatically removes expired snapshots on a periodic basis, which also reduces the window available for time travel queries. By default, Redpanda retains snapshots for five days, so you can query Iceberg tables as of up to five days ago. |
| 186 | + |
| 187 | +The following example queries a `ClickEvent` table at a specific timestamp in Spark: |
13 | 188 |
|
14 | | -== Next steps |
| 189 | +[,sql] |
| 190 | +---- |
| 191 | +SELECT * FROM `<catalog-name>`.redpanda.ClickEvent TIMESTAMP AS OF '2025-03-02 10:00:00'; |
| 192 | +---- |
15 | 193 |
|
16 | | -* xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[] |
| 194 | +// end::single-source[] |
0 commit comments