diff --git a/modules/manage/pages/iceberg/query-iceberg-topics.adoc b/modules/manage/pages/iceberg/query-iceberg-topics.adoc index 5ae290529c..447725db3f 100644 --- a/modules/manage/pages/iceberg/query-iceberg-topics.adoc +++ b/modules/manage/pages/iceberg/query-iceberg-topics.adoc @@ -10,126 +10,7 @@ include::shared:partial$enterprise-license.adoc[] When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/topic-iceberg-integration.adoc#enable-iceberg-integration[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda. -== Access Iceberg tables - -Depending on the processing engine and your Iceberg xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog implementation], you may also need to create a table to point your data lakehouse to the table location in the catalog. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[]. - -If your engine needs the full JSON metadata path, use the following: - -``` -redpanda-iceberg-catalog/metadata/redpanda//v.metadata.json -``` - -This provides read access to all snapshots written as of the specified table version (denoted by `version-number`). - -NOTE: Redpanda automatically removes expired snapshots on a periodic basis. Snapshot expiry helps maintain a smaller metadata size and reduces the window available for <>. - -== Query examples - -To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topics have xref:manage:tiered-storage.adoc[Tiered Storage] configured to an AWS S3 bucket. A sample record contains the following data: - -[,bash,role=no-copy] ----- -{"user_id": 2324, "event_type": "BUTTON_CLICK", "ts": "2024-11-25T20:23:59.380Z"} ----- - -=== Topic with schema (`value_schema_id_prefix` mode) - -In this example, it is assumed you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`: - -.`schema.avsc` -[,avro] ----- -{ - "type" : "record", - "namespace" : "com.redpanda.examples.avro", - "name" : "ClickEvent", - "fields" : [ - { "name": "user_id", "type" : "int" }, - { "name": "event_type", "type" : "string" }, - { "name": "ts", "type": "string" } - ] - } ----- - -. Register the schema under the `ClickEvent-value` subject: -+ -[,bash] ----- -rpk registry schema create ClickEvent-value --schema path/to/schema.avsc --type avro ----- - -. Produce to the `ClickEvent` topic using the following format: -+ -[,bash] ----- -echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' --schema-id=topic ----- -+ -The `value_schema_id_prefix` requires that you produce to a topic using the Schema Registry wire format, which includes the magic byte and schema ID in the prefix of the message payload. This allows Redpanda to identify the correct schema version in the Schema Registry for a record. See the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more about the wire format. - - -. The following Spark SQL query returns values from columns in the `ClickEvent` table, with the table structure derived from the schema, and column names matching the schema fields. If you've integrated a catalog, query engines such as Spark SQL provide Iceberg integrations that allow easy discovery and access to existing Iceberg tables in object storage. -+ -[,sql] ----- -SELECT * -FROM ``.redpanda.ClickEvent; ----- -+ -[,bash,role=no-copy] ----- -+-----------------------------------+---------+--------------+--------------------------+ -| redpanda | user_id | event_type | ts | -+-----------------------------------+---------+--------------+--------------------------+ -| {"partition":0,"offset":0,"timestamp":2025-03-05 15:09:20.436,"headers":null,"key":null} | 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z | -+-----------------------------------+---------+--------------+--------------------------+ ----- - -=== Topic in key-value mode - -In `key_value` mode, you do not associate the topic with a schema in the Schema Registry, which means using semi-structured data in Iceberg. The record keys and values can have an arbitrary structure, so Redpanda stores them in https://apache.github.io/iceberg/spec/?h=spec#primitive-types[binary format^] in Iceberg. - -In this example, it is assumed you have created the `ClickEvent_key_value` topic, and set `redpanda.iceberg.mode` to `key_value`. - -. Produce to the `ClickEvent_key_value` topic using the following format: -+ -[,bash] ----- -echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n' ----- - -. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value: -+ -[,sql] ----- -SELECT * -FROM ``.redpanda.ClickEvent_key_value; ----- -+ -[,bash,role=no-copy] ----- -+-----------------------------------+------------------------------------------------------------------------------+ -| redpanda | value | -+-----------------------------------+------------------------------------------------------------------------------+ -| {"partition":0,"offset":0,"timestamp":2025-03-05 15:14:30.931,"headers":null,"key":key1} | {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"} | -+-----------------------------------+------------------------------------------------------------------------------+ ----- - -Depending on your query engine, you might need to first decode the binary value to display the record key and value using a SQL helper function. For example, see the https://spark.apache.org/docs/latest/api/sql/index.html#unhex[`decode` and `unhex`^] Spark SQL functions, or the https://docs.snowflake.com/en/sql-reference/functions/hex_decode_string[HEX_DECODE_STRING^] Snowflake function. Some engines may also automatically decode the binary value for you. - -=== Time travel queries - -Some query engines, such as Spark, support time travel with Iceberg, allowing you to query the table at a specific point in time. You can query the table as it existed at a specific timestamp or version number. - -Redpanda automatically removes expired snapshots on a periodic basis, which also reduces the window available for time travel queries. - -The following is an example of querying a `ClickEvent` table at a specific timestamp in Spark: - -[,sql] ----- -SELECT * FROM ``.redpanda.ClickEvent TIMESTAMP AS OF '2025-03-02 10:00:00'; ----- +include::manage:partial$iceberg/query-iceberg-topics.adoc[] == Next steps diff --git a/modules/manage/pages/iceberg/topic-iceberg-integration.adoc b/modules/manage/pages/iceberg/topic-iceberg-integration.adoc index 45f82ba88e..ef38cf9a26 100644 --- a/modules/manage/pages/iceberg/topic-iceberg-integration.adoc +++ b/modules/manage/pages/iceberg/topic-iceberg-integration.adoc @@ -9,435 +9,7 @@ include::shared:partial$enterprise-license.adoc[] ==== -The Apache Iceberg integration for Redpanda allows you to store topic data in the cloud in the Iceberg open table format. This makes your streaming data immediately available in downstream analytical systems, including data warehouses like Snowflake, Databricks, ClickHouse, and Redshift, without setting up and maintaining additional ETL pipelines. You can also integrate your data directly into commonly-used big data processing frameworks, such as Apache Spark and Flink, standardizing and simplifying the consumption of streams as tables in a wide variety of data analytics pipelines. - -The Iceberg integration uses xref:manage:tiered-storage.adoc[Tiered Storage]. When a cluster or topic has Tiered Storage enabled, Redpanda stores the Iceberg files in the configured Tiered Storage bucket or container. - -Redpanda supports https://iceberg.apache.org/spec/#format-versioning[version 2^] of the Iceberg table format. - -== Iceberg concepts - -https://iceberg.apache.org[Apache Iceberg^] is an open source format specification for defining structured tables in a data lake. The table format lets you quickly and easily manage, query, and process huge amounts of structured and unstructured data. This is similar to the way in which you would manage and run SQL queries against relational data in a database or data warehouse. The open format lets you use many different languages, tools, and applications to process the same data in a consistent way, so you can avoid vendor lock-in. This data management system is also known as a _data lakehouse_. - -In the Iceberg specification, tables consist of the following layers: - -* Data layer: Stores the data in data files. The Iceberg integration currently supports the Parquet file format. Parquet files are column-based and suitable for analytical workloads at scale. They come with compression capabilities that optimize files for object storage. -* Metadata layer: Stores table metadata separately from data files. The metadata layer allows multiple writers to stage metadata changes and apply updates atomically. It also supports database snapshots, and time travel queries that query the database at a previous point in time. -+ --- -** Manifest files: Track data files and contain metadata about these files, such as record count, partition membership, and file paths. -** Manifest list: Tracks all the manifest files belonging to a table, including file paths and upper and lower bounds for partition fields. -** Metadata file: Stores metadata about the table, including its schema, partition information, and snapshots. Whenever a change is made to the table, a new metadata file is created and becomes the latest version of the metadata in the catalog. --- -+ -For Iceberg-enabled topics, the manifest files are in JSON format. -* Catalog: Contains the current metadata pointer for the table. Clients reading and writing data to the table see the same version of the current state of the table. The Iceberg integration supports two xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog integration] types. You can configure Redpanda to catalog files stored in the same object storage bucket or container where the Iceberg data files are located, or you can configure Redpanda to use an https://iceberg.apache.org/terms/#decoupling-using-the-rest-catalog[Iceberg REST catalog^] endpoint to update an externally-managed catalog when there are changes to the Iceberg data and metadata. - -image::shared:iceberg-integration-optimized.png[Redpanda's Iceberg integration] - -When you enable the Iceberg integration for a Redpanda topic, Redpanda brokers store streaming data in the Iceberg-compatible format in Parquet files in object storage, in addition to the log segments uploaded using Tiered Storage. Storing the streaming data in Iceberg tables in the cloud allows you to derive real-time insights through many compatible data lakehouse, data engineering, and business intelligence https://iceberg.apache.org/vendors/[tools^]. - -== Prerequisites - -To enable Iceberg for Redpanda topics, you must have the following: - -* *rpk*: See xref:get-started:rpk-install.adoc[]. -* *Enterprise license*: To check if you already have a license key applied to your cluster: -+ -[,bash] ----- -rpk cluster license info ----- -* *Tiered Storage*: Enable xref:manage:tiered-storage.adoc#set-up-tiered-storage[Tiered Storage] for the topics for which you want to generate Iceberg tables. - -== Limitations - -* It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda. -* If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data. -* JSON schemas are not currently supported. If the topic data is in JSON, use the `key_value` mode to store the JSON in Iceberg, which then can be parsed by most query engines. -* If you're using Avro or Protobuf data, you must use the Schema Registry wire format, where producers include the magic byte and schema ID in the message payload header. See also: xref:manage:schema-reg/schema-id-validation.adoc[] and the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more about the wire format. - -== Enable Iceberg integration - -To create an Iceberg table for a Redpanda topic, you must set the cluster configuration property config_ref:iceberg_enabled,true,properties/cluster-properties[`iceberg_enabled`] to `true`, and also configure the topic property xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode`]. You can choose to provide a schema if you need the Iceberg table to be structured with defined columns. - -. Set the `iceberg_enabled` configuration option on your cluster to `true`. You must restart your cluster if you change this configuration for a running cluster. -+ -[,bash] ----- -rpk cluster config set iceberg_enabled true ----- -+ -[,bash,role=no-copy] ----- -Successfully updated configuration. New configuration version is 2. ----- - -. (Optional) Create a new topic. -+ -[,bash,] ----- -rpk topic create ----- -+ -[,bash,role=no-copy] ----- -TOPIC STATUS - OK ----- -+ -To improve query performance, consider implementing custom https://iceberg.apache.org/docs/nightly/partitioning/[partitioning^] for the Iceberg topic. Use the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-partition-spec[`redpanda.iceberg.partition.spec`] topic property to define the partitioning scheme: -+ -[,bash,] ----- -# Create new topic with five topic partitions, replication factor 1, and custom table partitioning for Iceberg -rpk topic create -p5 -r1 -c "redpanda.iceberg.partition.spec=(, , ...)" ----- -+ -Valid `` values include a source column name or a transformation of a column. The columns referenced can be Redpanda-defined (such as `redpanda.timestamp`) or user-defined based on a schema that you register for the topic. The Iceberg table stores records that share the same partition key values in the same files based on this specification. -+ -For example: -+ --- -* To partition the table by a single key, such as a column `col1`, use: `redpanda.iceberg.partition.spec=(col1)`. -* To partition by multiple columns, use a comma-separated list: `redpanda.iceberg.partition.spec=(col1, col2)`. -* To partition by the year of a timestamp column `ts1`, and a string column `col1`, use: `redpanda.iceberg.partition.spec=(year(ts1), col1)`. - -For details on the partitioning specification, including allowed transforms, see the https://iceberg.apache.org/spec/#partitioning[Apache Iceberg documentation^]. --- - -. Enable the integration for the topic by configuring `redpanda.iceberg.mode`. You can choose one of the following modes: -+ --- -* `key_value`: Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record's value. -* `value_schema_id_prefix`: Creates an Iceberg table whose structure matches the Redpanda schema for this topic, with columns corresponding to each field. You must register a schema in the Schema Registry (see next step), and producers must write to the topic using the Schema Registry wire format. Redpanda parses the schema used by the record based on the schema ID encoded in the payload header, and stores the topic values in the corresponding table columns. -* `disabled` (default): Disables writing to an Iceberg table for this topic. --- -+ -[,bash] ----- -rpk topic alter-config --set redpanda.iceberg.mode= ----- -+ -[,bash,role=no-copy] ----- -TOPIC STATUS - OK ----- - -. Register a schema for the topic. This step is required for the `value_schema_id_prefix` mode, but is optional otherwise. -+ -[,bash] ----- -rpk registry schema create --schema --type ----- -+ -[,bash,role=no-copy] ----- -SUBJECT VERSION ID TYPE - 1 1 PROTOBUF ----- - -The Iceberg table is inside a namespace called `redpanda`, and has the same name as the Redpanda topic name. As you produce records to the topic, the data also becomes available in object storage for consumption by Iceberg-compatible clients. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database. - -== About schema support and translation to Iceberg format - -The xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode`] property determines how Redpanda maps the topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of a Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table. - -The JSON Schema format is not supported. If your topic data is in JSON, it is recommended to use the `key_value` mode. - -=== Iceberg modes and table schemas - -For both `key_value` and `value_schema_id_prefix` modes, Redpanda writes to a `redpanda` table column that stores a single Iceberg https://iceberg.apache.org/spec/#nested-types[struct^] per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset. - -For example, if you produce to a topic according to the following Avro schema: - -[,avro] ----- -{ - "type": "record", - "name": "ClickEvent", - "fields": [ - { - "name": "user_id", - "type": "int" - }, - { - "name": "event_type", - "type": "string" - }, - { - "name": "ts", - "type": "string" - } - ] -} ----- - -The `key_value` mode writes to the following table format: - -[,sql] ----- -CREATE TABLE ClickEvent ( - redpanda struct< - partition: integer NOT NULL, - timestamp: timestamp NOT NULL, - offset: long NOT NULL, - headers: array>, - key: binary - >, - value binary -) ----- - -Consider this approach if the topic data is in JSON, or if you can use the Iceberg data in its semi-structured format. - -The `value_schema_id_prefix` mode translates to the following table format: - -[,sql] ----- -CREATE TABLE ClickEvent ( - redpanda struct< - partition: integer NOT NULL, - timestamp: timestamp NOT NULL, - offset: long NOT NULL, - headers: array>, - key: binary - >, - user_id integer NOT NULL, - event_type string, - ts string -) ----- - -With schema integration, Redpanda uses the schema ID prefix embedded in each record to find the matching schema in the Schema Registry. Producers to the topic must use the schema ID prefix in the serialization process so Redpanda can determine the schema used for each record, parse the record according to that schema, and use the schema for the Iceberg table as well. - -If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See <> for more information. - -=== Schema types translation - -Redpanda supports direct translations of the following types to Iceberg value domains: - -[tabs] -====== -Avro:: -+ --- -|=== -| Avro type | Iceberg type - -| boolean | boolean -| int | int -| long | long -| float | float -| double | double -| bytes | binary -| string | string -| record | struct -| array | list -| maps | list -| fixed | fixed -| decimal | decimal -| uuid | uuid -| date | date -| time | time -| timestamp | timestamp -|=== - -* Different flavors of time (such as `time-millis`) and timestamp (such as `timestamp-millis`) types are translated to the same Iceberg `time` and `timestamp` types respectively. -* Avro unions are flattened to Iceberg structs with optional fields: -** For example, the union `["int", "long", "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>`. -** The union `["int", null, "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 FLOAT NULLABLE>`. -* All fields are required by default (Avro always sets a default in binary representation). -* The Avro duration logical type is ignored. -* The Avro null type is ignored and not represented in the Iceberg schema. -* Recursive types are not supported. --- - -Protobuf:: -+ --- -|=== -| Protobuf type | Iceberg type - -| bool | boolean -| double | double -| float | float -| int32 | int -| sint32 | int -| int64 | long -| sint64 | long -| sfixed32 | int -| sfixed64 | int -| string | string -| bytes | binary -| map | map -|=== - -* Repeated values are translated into Iceberg `array` types. -* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type. -* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg. -* `uint64` and `fixed64` values are translated into their Base-10 string representation. -* The `timestamp` type in Protobuf is translated into `timestamp` in Iceberg. -* Messages are converted into Iceberg structs. -* Recursive types are not supported. --- -====== - -=== Schema evolution - -Redpanda supports schema evolution for Avro and Protobuf schemas in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema. - -For example, if you produce records to a topic `demo-topic` with the following Avro schema: - -.schema_1.avsc -[,avro] ----- -{ - "type": "record", - "name": "ClickEvent", - "fields": [ - { - "name": "user_id", - "type": "int" - }, - { - "name": "event_type", - "type": "string" - } - ] -} ----- - -[,bash] ----- -rpk registry schema create demo-topic-value --schema schema_1.avsc - -echo '{"user_id":23, "event_type":"BUTTON_CLICK"}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic ----- - -Then, you update the schema to add a new field `ts`, and produce records with the updated schema: - -.schema_2.avsc -[,avro] ----- -{ - "type": "record", - "name": "ClickEvent", - "fields": [ - { - "name": "user_id", - "type": "int" - }, - { - "name": "event_type", - "type": "string" - }. - { - "name": "ts", - "type": [ - "null", - { "type": "string", "logicalType": "date" } - ], - "default": null # Default value for the new field - } - ] -} ----- -The `ts` field can be either null or a string representing a date. The default value is null. - -[,bash] ----- -rpk registry schema create demo-topic-value --schema schema_2.avsc - -echo '{"user_id":858, "event_type":"BUTTON_CLICK", "ts":{"string":"2025-02-26T20:05:23.230ZZ"}}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic ----- - -xref:manage:iceberg/query-iceberg-topics.adoc[Querying the Iceberg table] for `demo-topic` includes the new column `ts`: - -[,bash,role=no-copy] ----- -+---------+--------------+--------------------------+ -| user_id | event_type | ts | -+---------+--------------+--------------------------+ -| 858 | BUTTON_CLICK | 2025-02-26T20:05:23.230Z | -| 23 | BUTTON_CLICK | NULL | -+---------+--------------+--------------------------+ ----- - -== Manage dead-letter queue - -Errors may occur when translating records in the `value_schema_id_prefix` mode to the Iceberg table format; for example, if you do not use the Schema Registry wire format with the magic byte, if the schema ID in the record is not found in the Schema Registry, or if an Avro or Protobuf data type cannot be translated to an Iceberg type. - -If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `~dlq`. To disable the default behavior for a topic and drop the record, set the xref:reference:properties/topic-properties.adoc#redpandaiceberginvalidrecordaction[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property. - -The DLQ table itself uses the `key_value` schema, consisting of two columns: the record metadata including the key, and a binary column for the record's value. - -You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream. - -=== Reprocess DLQ records - -The following example produces a record to a topic named `ClickEvent` and does not use the Schema Registry wire format that includes the magic byte and schema ID: - -[,bash,role=no-copy] ----- -echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' ----- - -Querying the DLQ table returns the record that was not translated: - -[,sql] ----- -SELECT - value -FROM ."ClickEvent~dlq"; -- Fully qualified table name ----- - -[,bash,role=no-copy] ----- -+-------------------------------------------------+ -| value | -+-------------------------------------------------+ -| 7b 22 75 73 65 72 5f 69 64 22 3a 32 33 32 34 2c | -| 22 65 76 65 6e 74 5f 74 79 70 65 22 3a 22 42 55 | -| 54 54 4f 4e 5f 43 4c 49 43 4b 22 2c 22 74 73 22 | -| 3a 22 32 30 32 34 2d 31 31 2d 32 35 54 32 30 3a | -| 32 33 3a 35 39 2e 33 38 30 5a 22 7d | -+-------------------------------------------------+ ----- - -The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema. - -You can apply a transformation and reprocess the record in your data lakehouse to the original Iceberg table. In this case, you have a JSON value represented as a UTF-8 binary. Depending on your query engine, you might need to decode the binary value first before extracting the JSON fields. Some engines may automatically decode the binary value for you: - -.ClickHouse SQL example to reprocess DLQ record -[,sql] ----- -SELECT - CAST(jsonExtractString(json, 'user_id') AS Int32) AS user_id, - jsonExtractString(json, 'event_type') AS event_type, - jsonExtractString(json, 'ts') AS ts -FROM ( - SELECT - CAST(value AS String) AS json - FROM .`ClickEvent~dlq` -- Ensure that the table name is properly parsed -); ----- - -[,bash,role=no-copy] ----- -+---------+--------------+--------------------------+ -| user_id | event_type | ts | -+---------+--------------+--------------------------+ -| 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z | -+---------+--------------+--------------------------+ ----- - -You can now insert the transformed record back into the main Iceberg table. Redpanda recommends employing a strategy for exactly-once processing to avoid duplicates when reprocessing records. - -== Next steps - -* xref:manage:iceberg/use-iceberg-catalogs.adoc[] +include::manage:partial$iceberg/about-iceberg-topics.adoc[] == Suggested reading diff --git a/modules/manage/pages/iceberg/use-iceberg-catalogs.adoc b/modules/manage/pages/iceberg/use-iceberg-catalogs.adoc index 78f75be4d3..3dc6ae2338 100644 --- a/modules/manage/pages/iceberg/use-iceberg-catalogs.adoc +++ b/modules/manage/pages/iceberg/use-iceberg-catalogs.adoc @@ -8,103 +8,7 @@ include::shared:partial$enterprise-license.adoc[] ==== -To read from the Redpanda-generated xref:manage:iceberg/topic-iceberg-integration.adoc[Iceberg table], your Iceberg-compatible client or tool needs access to the catalog to retrieve the table metadata and know the current state of the table. The catalog provides the current table metadata, which includes locations for all the table's data files. You can configure Redpanda to either connect to a REST-based catalog, or use a file-system based catalog. - -For production deployments, Redpanda recommends using an external REST catalog to manage Iceberg metadata. This enables built-in table maintenance, safely handles multiple engines and tools accessing tables at the same time, facilitates data governance, and maximizes data discovery. However, if it is not possible to use a REST catalog, you may use the file-system based catalog, which does not require you to maintain a separate service to access the Iceberg data. In either case, you use the catalog to load, query, or refresh the Iceberg table as you produce to the Redpanda topic. See the documentation for your query engine or Iceberg-compatible tool for specific guidance on adding the Iceberg tables to your data warehouse or lakehouse using the catalog. - -After you have selected a catalog type at the cluster level and xref:manage:iceberg/topic-iceberg-integration.adoc#enable-iceberg-integration[enabled the Iceberg integration] for a topic, you cannot switch to another catalog type. - -== Connect to a REST catalog - -Redpanda supports connecting to an Iceberg REST catalog, using the standard https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml[REST API^] supported by many catalog providers. Use this catalog integration type with REST-enabled Iceberg catalog services, such as https://docs.databricks.com/en/data-governance/unity-catalog/index.html[Databricks Unity^] and https://other-docs.snowflake.com/en/opencatalog/overview[Snowflake Open Catalog^]. - -To connect to a REST catalog, set the following cluster configuration properties: - -* config_ref:iceberg_catalog_type,true,properties/cluster-properties[`iceberg_catalog_type`]: `rest` -* config_ref:iceberg_rest_catalog_endpoint,true,properties/cluster-properties[`iceberg_rest_catalog_endpoint`]: The endpoint URL for your Iceberg catalog, which you either manage directly, or is managed by an external catalog service. -* config_ref:iceberg_rest_catalog_client_id,true,properties/cluster-properties[`iceberg_rest_catalog_client_id`]: The ID to connect to the REST catalog. -* config_ref:iceberg_rest_catalog_client_secret,true,properties/cluster-properties[`iceberg_rest_catalog_client_secret`]: The secret data to connect to the REST catalog. - -For REST catalogs that use self-signed certificates, also configure these properties: - -* config_ref:iceberg_rest_catalog_trust_file,true,properties/cluster-properties[`iceberg_rest_catalog_trust_file`]: The path to a file containing a certificate chain to trust for the REST catalog. -* config_ref:iceberg_rest_catalog_crl_file,true,properties/cluster-properties[`iceberg_rest_catalog_crl_file`]: The path to the certificate revocation list for the specified trust file. - -See xref:reference:properties/cluster-properties.adoc[Cluster Configuration Properties] for the full list of cluster properties to configure for a catalog integration. - -=== Example REST catalog configuration - -For example, if you have Redpanda cluster configuration properties set to connect to a REST catalog: - -[,yaml] ----- -iceberg_catalog_type: rest -iceberg_rest_catalog_endpoint: http://catalog-service:8181 -iceberg_rest_catalog_client_id: -iceberg_rest_catalog_client_secret: ----- - -And you use Apache Spark as a processing engine, configured to use a catalog named `streaming`: - -[,spark] ----- -spark.sql.catalog.streaming = org.apache.iceberg.spark.SparkCatalog -spark.sql.catalog.streaming.type = rest -spark.sql.catalog.streaming.uri = http://catalog-service:8181 -# You may need to configure additional properties based on your object storage provider. -# See https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration and https://spark.apache.org/docs/latest/configuration.html -# For example, for AWS S3: -# spark.sql.catalog.streaming.io-impl = org.apache.iceberg.aws.s3.S3FileIO -# spark.sql.catalog.streaming.warehouse = s3:/// -# spark.sql.catalog.streaming.s3.endpoint = http:// ----- - -NOTE: Redpanda recommends setting credentials in environment variables so Spark can securely access your Iceberg data in object storage. For example, for AWS, use `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. - -Using Spark SQL, you can query the Iceberg table directly by specifying the catalog name, the namespace, and the table name: - -[,sql] ----- -SELECT * FROM streaming.redpanda.; ----- - -The Iceberg table name is the name of your Redpanda topic. Redpanda puts the Iceberg table into a namespace called `redpanda`, creating the namespace if necessary. - -The Spark engine can use the REST catalog to automatically discover the topic's Iceberg table. Depending on your processing engine, you may need to also create a table in the engine to point the data lakehouse to the table location in the catalog. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[]. - -== Integrate file-system based catalog (`object_storage`) - -By default, Iceberg topics use the file-system based catalog (config_ref:iceberg_catalog_type,true,properties/cluster-properties[`iceberg_catalog_type`] cluster configuration set to `object_storage`). Redpanda stores the table metadata in https://iceberg.apache.org/javadoc/1.5.0/org/apache/iceberg/hadoop/HadoopCatalog.html[HadoopCatalog^] format in the same object storage bucket or container as the data files. - -If using the `object_storage` catalog type, you provide the object storage URI of the table's metadata.json file to an Iceberg client so it can access the catalog and data files for your Redpanda Iceberg tables. - -=== Example file-system based catalog configuration - -To configure Apache Spark to use a file system-based catalog, specify at least the following properties: - -[,spark] ----- -spark.sql.catalog.streaming = org.apache.iceberg.spark.SparkCatalog -spark.sql.catalog.streaming.type = hadoop -# URI for table metadata: AWS S3 example -spark.sql.catalog.streaming.warehouse = s3a:///redpanda-iceberg-catalog -# You may need to configure additional properties based on your object storage provider. -# See https://iceberg.apache.org/docs/latest/spark-configuration/#spark-configuration and https://spark.apache.org/docs/latest/configuration.html -# For example, for AWS S3: -# spark.hadoop.fs.s3.impl = org.apache.hadoop.fs.s3a.S3AFileSystem -# spark.hadoop.fs.s3a.endpoint = http:// -# spark.sql.catalog.streaming.s3.endpoint = http:// ----- - -NOTE: Redpanda recommends setting credentials in environment variables so Spark can securely access your Iceberg data in object storage. For example, for AWS, use `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. - -Depending on your processing engine, you may need to also create a new table to point the data lakehouse to the table location. - -=== Specify metadata location - -The config_ref:iceberg_catalog_base_location,true,properties/cluster-properties[`iceberg_catalog_base_location`] property stores the base path for the file-system based catalog if using the `object_storage` catalog type. The default value is `redpanda-iceberg-catalog`. - -CAUTION: Do not change the `iceberg_catalog_base_location` value after you have enabled Iceberg integration for a topic. +include::manage:partial$iceberg/use-iceberg-catalogs.adoc[] == Next steps diff --git a/modules/manage/partials/iceberg/about-iceberg-topics.adoc b/modules/manage/partials/iceberg/about-iceberg-topics.adoc new file mode 100644 index 0000000000..17b47ca67a --- /dev/null +++ b/modules/manage/partials/iceberg/about-iceberg-topics.adoc @@ -0,0 +1,481 @@ +:schema-id-val-doc: manage:schema-reg/schema-id-validation.adoc + +The Apache Iceberg integration for Redpanda allows you to store topic data in the cloud in the Iceberg open table format. This makes your streaming data immediately available in downstream analytical systems, including data warehouses like Snowflake, Databricks, ClickHouse, and Redshift, without setting up and maintaining additional ETL pipelines. You can also integrate your data directly into commonly-used big data processing frameworks, such as Apache Spark and Flink, standardizing and simplifying the consumption of streams as tables in a wide variety of data analytics pipelines. + +Redpanda supports https://iceberg.apache.org/spec/#format-versioning[version 2^] of the Iceberg table format. + +== Iceberg concepts + +https://iceberg.apache.org[Apache Iceberg^] is an open source format specification for defining structured tables in a data lake. The table format lets you quickly and easily manage, query, and process huge amounts of structured and unstructured data. This is similar to the way you would manage and run SQL queries against relational data in a database or data warehouse. The open format lets you use many different languages, tools, and applications to process the same data in a consistent way, so you can avoid vendor lock-in. This data management system is also known as a _data lakehouse_. + +In the Iceberg specification, tables consist of the following layers: + +* *Data layer*: Stores the data in data files. The Iceberg integration currently supports the Parquet file format. Parquet files are column-based and suitable for analytical workloads at scale. They come with compression capabilities that optimize files for object storage. +* *Metadata layer*: Stores table metadata separately from data files. The metadata layer allows multiple writers to stage metadata changes and apply updates atomically. It also supports database snapshots, and time travel queries that query the database at a previous point in time. ++ +-- +** Manifest files: Track data files and contain metadata about these files, such as record count, partition membership, and file paths. +** Manifest list: Tracks all the manifest files belonging to a table, including file paths and upper and lower bounds for partition fields. +** Metadata file: Stores metadata about the table, including its schema, partition information, and snapshots. Whenever a change is made to the table, a new metadata file is created and becomes the latest version of the metadata in the catalog. +-- ++ +For Iceberg-enabled topics, the manifest files are in JSON format. +* *Catalog*: Contains the current metadata pointer for the table. Clients reading and writing data to the table see the same version of the current state of the table. The Iceberg integration supports two xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog integration] types. You can configure Redpanda to catalog files stored in the same object storage bucket or container where the Iceberg data files are located, or you can configure Redpanda to use an https://iceberg.apache.org/terms/#decoupling-using-the-rest-catalog[Iceberg REST catalog^] endpoint to update an externally-managed catalog when there are changes to the Iceberg data and metadata. + +image::shared:iceberg-integration-optimized.png[Redpanda's Iceberg integration] + +When you enable the Iceberg integration for a Redpanda topic, Redpanda brokers store streaming data in the Iceberg-compatible format in Parquet files in object storage, in addition to the log segments uploaded using Tiered Storage. Storing the streaming data in Iceberg tables in the cloud allows you to derive real-time insights through many compatible data lakehouse, data engineering, and business intelligence https://iceberg.apache.org/vendors/[tools^]. + +== Prerequisites + +To enable Iceberg for Redpanda topics, you must have the following: + +ifdef::env-cloud[] +* A running xref:get-started:cluster-types/byoc/index.adoc[BYOC] cluster. The Iceberg integration is supported only for BYOC. +* rpk: See xref:get-started:rpk-install.adoc[]. +* Familiarity with the Redpanda Cloud API. You must xref:redpanda-cloud:manage:api/cloud-api-authentication.adoc[authenticate] to the Cloud API and use the Control Plane API to update your cluster configuration. +endif::[] + +ifndef::env-cloud[] +* *rpk*: See xref:get-started:rpk-install.adoc[]. +* *Enterprise license*: To check if you already have a license key applied to your cluster: ++ +[,bash] +---- +rpk cluster license info +---- +* *Tiered Storage*: Enable xref:manage:tiered-storage.adoc#set-up-tiered-storage[Tiered Storage] for the topics for which you want to generate Iceberg tables. +endif::[] + +== Limitations + +* It is not possible to append topic data to an existing Iceberg table that is not created by Redpanda. +* If you enable the Iceberg integration on an existing Redpanda topic, Redpanda does not backfill the generated Iceberg table with topic data. +* JSON schemas are not currently supported. If the topic data is in JSON, use the `key_value` mode to store the JSON in Iceberg, which then can be parsed by most query engines. +ifndef::env-cloud[] +* If you're using Avro or Protobuf data, you must use the Schema Registry wire format, where producers include the magic byte and schema ID in the message payload header. See also: xref:{schema-id-val-doc}[] and +the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more. +endif::[] +ifdef::env-cloud[] +* If you're using Avro or Protobuf data, you must use the Schema Registry wire format, where producers include the magic byte and schema ID in the message payload header. See also: the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more. +endif::[] + +== Enable Iceberg integration + +To create an Iceberg table for a Redpanda topic, you must set the cluster configuration property config_ref:iceberg_enabled,true,properties/cluster-properties[`iceberg_enabled`] to `true`, and also configure the topic property xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode`]. You can choose to provide a schema if you need the Iceberg table to be structured with defined columns. + +. Set the `iceberg_enabled` configuration option on your cluster to `true`. You must restart your cluster if you change this configuration for a running cluster. +ifdef::env-cloud[] ++ +[,bash] +---- +# Store your cluster ID in a variable +export RP_CLUSTER_ID= + +# Retrieve a Redpanda Cloud access token +export RP_CLOUD_TOKEN=`curl -X POST "https://auth.prd.cloud.redpanda.com/oauth/token" \ + -H "content-type: application/x-www-form-urlencoded" \ + -d "grant_type=client_credentials" \ + -d "client_id=" \ + -d "client_secret="` + +# Update cluster configuration to enable Iceberg topics +curl -H "Authorization: Bearer ${RP_CLOUD_TOKEN}" -X PATCH \ + "https://api.cloud.redpanda.com/v1/clusters/${RP_CLUSTER_ID}" \ + -H 'accept: application/json'\ + -H 'content-type: application/json' \ + -d '{"cluster_configuration":{"custom_properties": {"iceberg_enabled":true}}}' +---- ++ +The xref:api:ROOT:cloud-controlplane-api.adoc#patch-/v1/clusters/-cluster.id-[`PATCH /clusters/{cluster.id}`] request returns the ID of a long-running operation. The operation may take up to ten minutes to complete. You can check the status of the operation by polling the xref:api:ROOT:cloud-controlplane-api.adoc#get-/v1/operations/-id-[`GET /operations/\{id}`] endpoint. +endif::[] +ifndef::env-cloud[] ++ +[,bash] +---- +rpk cluster config set iceberg_enabled true +---- ++ +[,bash,role=no-copy] +---- +Successfully updated configuration. New configuration version is 2. +---- +endif::[] + +. (Optional) Create a new topic. ++ +[,bash,] +---- +rpk topic create +---- ++ +[,bash,role=no-copy] +---- +TOPIC STATUS + OK +---- ++ +To improve query performance, consider implementing custom https://iceberg.apache.org/docs/nightly/partitioning/[partitioning^] for the Iceberg topic. Use the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-partition-spec[`redpanda.iceberg.partition.spec`] topic property to define the partitioning scheme: ++ +[,bash,] +---- +# Create new topic with five topic partitions, replication factor 1, and custom table partitioning for Iceberg +rpk topic create -p5 -r1 -c "redpanda.iceberg.partition.spec=(, , ...)" +---- ++ +Valid `` values include a source column name or a transformation of a column. The columns referenced can be Redpanda-defined (such as `redpanda.timestamp`) or user-defined based on a schema that you register for the topic. The Iceberg table stores records that share the same partition key values in the same files based on this specification. ++ +For example: ++ +-- +* To partition the table by a single key, such as a column `col1`, use: `redpanda.iceberg.partition.spec=(col1)`. +* To partition by multiple columns, use a comma-separated list: `redpanda.iceberg.partition.spec=(col1, col2)`. +* To partition by the year of a timestamp column `ts1`, and a string column `col1`, use: `redpanda.iceberg.partition.spec=(year(ts1), col1)`. + +For details on the partitioning specification, including allowed transforms, see the https://iceberg.apache.org/spec/#partitioning[Apache Iceberg documentation^]. +-- + +. Configure `redpanda.iceberg.mode` for the topic. You can choose one of the following modes: ++ +-- +* `key_value`: Creates an Iceberg table using a simple schema, consisting of two columns, one for the record metadata including the key, and another binary column for the record's value. +* `value_schema_id_prefix`: Creates an Iceberg table whose structure matches the Redpanda schema for this topic, with columns corresponding to each field. You must register a schema in the Schema Registry (see next step), and producers must write to the topic using the Schema Registry wire format. Redpanda parses the schema used by the record based on the schema ID encoded in the payload header and stores the topic values in the corresponding table columns. +* `disabled` (default): Disables writing to an Iceberg table for this topic. +-- ++ +[,bash] +---- +rpk topic alter-config --set redpanda.iceberg.mode= +---- ++ +[,bash,role=no-copy] +---- +TOPIC STATUS + OK +---- + +. Register a schema for the topic. This step is required for the `value_schema_id_prefix` mode, but is optional otherwise. ++ +[,bash] +---- +rpk registry schema create --schema --type +---- ++ +[,bash,role=no-copy] +---- +SUBJECT VERSION ID TYPE + 1 1 PROTOBUF +---- + +ifdef::env-cloud[] +To query the Iceberg table, you need access to the object storage bucket or container where the Iceberg data is stored. For BYOC clusters on AWS and GCP, the bucket name and table location are as follows: + +|=== +| Bucket name | Iceberg table location + +| `redpanda-cloud-storage-` +| `redpanda-iceberg-catalog/redpanda/` + +|=== +endif::[] + +The Iceberg table resides in a namespace called `redpanda` and has the same name as the Redpanda topic name. As you produce records to the topic, the data also becomes available in object storage for Iceberg-compatible clients to consume. You can use the same analytical tools to xref:manage:iceberg/query-iceberg-topics.adoc[read the Iceberg topic data] in a data lake as you would for a relational database. + +== About schema support and translation to Iceberg format + +The xref:reference:properties/topic-properties.adoc#redpanda-iceberg-mode[`redpanda.iceberg.mode`] property determines how Redpanda maps the topic data to the Iceberg table structure. You can have the generated Iceberg table match the structure of a Avro or Protobuf schema in the Schema Registry, or you can use the `key_value` mode where Redpanda stores the record values as-is in the table. + +The JSON Schema format is not supported. If your topic data is in JSON, use the `key_value` mode. + +=== Iceberg modes and table schemas + +For both `key_value` and `value_schema_id_prefix` modes, Redpanda writes to a `redpanda` table column that stores a single Iceberg https://iceberg.apache.org/spec/#nested-types[struct^] per record, containing nested columns of the metadata from each record, including the record key, headers, timestamp, the partition it belongs to, and its offset. + +For example, if you produce to a topic according to the following Avro schema: + +[,avro] +---- +{ + "type": "record", + "name": "ClickEvent", + "fields": [ + { + "name": "user_id", + "type": "int" + }, + { + "name": "event_type", + "type": "string" + }, + { + "name": "ts", + "type": "string" + } + ] +} +---- + +The `key_value` mode writes to the following table format: + +[,sql] +---- +CREATE TABLE ClickEvent ( + redpanda struct< + partition: integer NOT NULL, + timestamp: timestamp NOT NULL, + offset: long NOT NULL, + headers: array>, + key: binary + >, + value binary +) +---- + +Consider this approach if the topic data is in JSON, or if you can use the Iceberg data in its semi-structured format. + +The `value_schema_id_prefix` mode translates to the following table format: + +[,sql] +---- +CREATE TABLE ClickEvent ( + redpanda struct< + partition: integer NOT NULL, + timestamp: timestamp NOT NULL, + offset: long NOT NULL, + headers: array>, + key: binary + >, + user_id integer NOT NULL, + event_type string, + ts string +) +---- + +With schema integration, Redpanda uses the schema ID prefix embedded in each record to find the matching schema in the Schema Registry. Producers to the topic must use the schema ID prefix in the serialization process so Redpanda can determine the schema used for each record, parse the record according to that schema, and use the schema for the Iceberg table as well. + +If Redpanda fails to translate the record to the columnar format as defined by the schema, it writes the record to a dead-letter queue (DLQ) table. See <> for more information. + +=== Schema types translation + +Redpanda supports direct translations of the following types to Iceberg value domains: + +[tabs] +====== +Avro:: ++ +-- +|=== +| Avro type | Iceberg type + +| boolean | boolean +| int | int +| long | long +| float | float +| double | double +| bytes | binary +| string | string +| record | struct +| array | list +| maps | list +| fixed | fixed +| decimal | decimal +| uuid | uuid +| date | date +| time | time +| timestamp | timestamp +|=== + +* Different flavors of time (such as `time-millis`) and timestamp (such as `timestamp-millis`) types are translated to the same Iceberg `time` and `timestamp` types, respectively. +* Avro unions are flattened to Iceberg structs with optional fields. For example: +** The union `["int", "long", "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 LONG NULLABLE, 2 FLOAT NULLABLE>`. +** The union `["int", null, "float"]` is represented as an Iceberg struct `struct<0 INT NULLABLE, 1 FLOAT NULLABLE>`. +* All fields are required by default. (Avro always sets a default in binary representation.) +* The Avro duration logical type is ignored. +* The Avro null type is ignored and not represented in the Iceberg schema. +* Recursive types are not supported. +-- + +Protobuf:: ++ +-- +|=== +| Protobuf type | Iceberg type + +| bool | boolean +| double | double +| float | float +| int32 | int +| sint32 | int +| int64 | long +| sint64 | long +| sfixed32 | int +| sfixed64 | int +| string | string +| bytes | binary +| map | map +|=== + +* Repeated values are translated into Iceberg `array` types. +* Enums are translated into Iceberg `int` types based on the integer value of the enumerated type. +* `uint32` and `fixed32` are translated into Iceberg `long` types as that is the existing semantic for unsigned 32-bit values in Iceberg. +* `uint64` and `fixed64` values are translated into their Base-10 string representation. +* The `timestamp` type in Protobuf is translated into `timestamp` in Iceberg. +* Messages are converted into Iceberg structs. +* Recursive types are not supported. +-- +====== + +=== Schema evolution + +Redpanda supports schema evolution for Avro and Protobuf schemas in accordance with the https://iceberg.apache.org/spec/#schema-evolution[Iceberg specification^]. Permitted schema evolutions include reordering fields and promoting field types. When you update the schema in Schema Registry, Redpanda automatically updates the Iceberg table schema to match the new schema. + +For example, if you produce records to a topic `demo-topic` with the following Avro schema: + +.schema_1.avsc +[,avro] +---- +{ + "type": "record", + "name": "ClickEvent", + "fields": [ + { + "name": "user_id", + "type": "int" + }, + { + "name": "event_type", + "type": "string" + } + ] +} +---- + +[,bash] +---- +rpk registry schema create demo-topic-value --schema schema_1.avsc + +echo '{"user_id":23, "event_type":"BUTTON_CLICK"}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic +---- + +Then, you update the schema to add a new field `ts`, and produce records with the updated schema: + +.schema_2.avsc +[,avro] +---- +{ + "type": "record", + "name": "ClickEvent", + "fields": [ + { + "name": "user_id", + "type": "int" + }, + { + "name": "event_type", + "type": "string" + }. + { + "name": "ts", + "type": [ + "null", + { "type": "string", "logicalType": "date" } + ], + "default": null # Default value for the new field + } + ] +} +---- +The `ts` field can be either null or a string representing a date. The default value is null. + +[,bash] +---- +rpk registry schema create demo-topic-value --schema schema_2.avsc + +echo '{"user_id":858, "event_type":"BUTTON_CLICK", "ts":{"string":"2025-02-26T20:05:23.230ZZ"}}' | rpk topic produce demo-topic --format='%v\n' --schema-id=topic +---- + +Querying the Iceberg table for `demo-topic` includes the new column `ts`: + +[,bash,role=no-copy] +---- ++---------+--------------+--------------------------+ +| user_id | event_type | ts | ++---------+--------------+--------------------------+ +| 858 | BUTTON_CLICK | 2025-02-26T20:05:23.230Z | +| 23 | BUTTON_CLICK | NULL | ++---------+--------------+--------------------------+ +---- + +== Manage dead-letter queue + +Errors may occur when translating records in the `value_schema_id_prefix` mode to the Iceberg table format; for example, if you do not use the Schema Registry wire format with the magic byte, if the schema ID in the record is not found in the Schema Registry, or if an Avro or Protobuf data type cannot be translated to an Iceberg type. + +If Redpanda encounters an error while writing a record to the Iceberg table, Redpanda writes the record to a separate dead-letter queue (DLQ) Iceberg table named `~dlq`. To disable the default behavior for a topic and drop the record, set the xref:reference:properties/topic-properties.adoc#redpanda-iceberg-invalid-record-action[`redpanda.iceberg.invalid.record.action`] topic property to `drop`. You can also configure the default cluster-wide behavior for invalid records by setting the `iceberg_invalid_record_action` property. + +The DLQ table itself uses the `key_value` schema, consisting of two columns: the record metadata including the key, and a binary column for the record's value. + +You can inspect the DLQ table for records that failed to write to the Iceberg table, and you can take further action on these records, such as transforming and reprocessing them, or debugging issues that occurred upstream. + +=== Reprocess DLQ records + +The following example produces a record to a topic named `ClickEvent` and does not use the Schema Registry wire format that includes the magic byte and schema ID: + +[,bash,role=no-copy] +---- +echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' +---- + +Querying the DLQ table returns the record that was not translated: + +[,sql] +---- +SELECT + value +FROM ."ClickEvent~dlq"; -- Fully qualified table name +---- + +[,bash,role=no-copy] +---- ++-------------------------------------------------+ +| value | ++-------------------------------------------------+ +| 7b 22 75 73 65 72 5f 69 64 22 3a 32 33 32 34 2c | +| 22 65 76 65 6e 74 5f 74 79 70 65 22 3a 22 42 55 | +| 54 54 4f 4e 5f 43 4c 49 43 4b 22 2c 22 74 73 22 | +| 3a 22 32 30 32 34 2d 31 31 2d 32 35 54 32 30 3a | +| 32 33 3a 35 39 2e 33 38 30 5a 22 7d | ++-------------------------------------------------+ +---- + +The data is in binary format, and the first byte is not `0x00`, indicating that it was not produced with a schema. + +You can apply a transformation and reprocess the record in your data lakehouse to the original Iceberg table. In this case, you have a JSON value represented as a UTF-8 binary. Depending on your query engine, you might need to decode the binary value first before extracting the JSON fields. Some engines may automatically decode the binary value for you: + +.ClickHouse SQL example to reprocess DLQ record +[,sql] +---- +SELECT + CAST(jsonExtractString(json, 'user_id') AS Int32) AS user_id, + jsonExtractString(json, 'event_type') AS event_type, + jsonExtractString(json, 'ts') AS ts +FROM ( + SELECT + CAST(value AS String) AS json + FROM .`ClickEvent~dlq` -- Ensure that the table name is properly parsed +); +---- + +[,bash,role=no-copy] +---- ++---------+--------------+--------------------------+ +| user_id | event_type | ts | ++---------+--------------+--------------------------+ +| 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z | ++---------+--------------+--------------------------+ +---- + +You can now insert the transformed record back into the main Iceberg table. Redpanda recommends employing a strategy for exactly-once processing to avoid duplicates when reprocessing records. + +== Next steps + +* xref:manage:iceberg/use-iceberg-catalogs.adoc[] \ No newline at end of file diff --git a/modules/manage/partials/iceberg/query-iceberg-topics.adoc b/modules/manage/partials/iceberg/query-iceberg-topics.adoc new file mode 100644 index 0000000000..2d91c69842 --- /dev/null +++ b/modules/manage/partials/iceberg/query-iceberg-topics.adoc @@ -0,0 +1,142 @@ +== Access Iceberg tables + +ifndef::env-cloud[] +Depending on the processing engine and your Iceberg xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog implementation], you may also need to create a table to point your data lakehouse to the table location in the catalog. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[]. +endif::[] + +ifdef::env-cloud[] +Depending on the processing engine and your Iceberg catalog implementation, you may also need to create a table to point your data lakehouse to the table location in the catalog. + +For BYOC clusters on AWS and GCP, the bucket name and table location are as follows: + +|=== +| Bucket name | Iceberg table location + +| `redpanda-cloud-storage-` +| `redpanda-iceberg-catalog/redpanda/` + +|=== +endif::[] + +If your engine needs the full JSON metadata path, use the following: + +``` +redpanda-iceberg-catalog/metadata/redpanda//v.metadata.json +``` + +This provides read access to all snapshots written as of the specified table version (denoted by `version-number`). + +NOTE: Redpanda automatically removes expired snapshots on a periodic basis. Snapshot expiry helps maintain a smaller metadata size and reduces the window available for <>. + +== Query examples + +ifndef::env-cloud[] +To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topics have glossterm:tiered-storage[] configured to an AWS S3 bucket. A sample record contains the following data: +endif::[] + +ifdef::env-cloud[] +To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topic's Iceberg data is stored in an AWS S3 bucket. A sample record contains the following data: +endif::[] + +[,bash,role=no-copy] +---- +{"user_id": 2324, "event_type": "BUTTON_CLICK", "ts": "2024-11-25T20:23:59.380Z"} +---- + +=== Topic with schema (`value_schema_id_prefix` mode) + +Assume that you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`: + +.`schema.avsc` +[,avro] +---- +{ + "type" : "record", + "namespace" : "com.redpanda.examples.avro", + "name" : "ClickEvent", + "fields" : [ + { "name": "user_id", "type" : "int" }, + { "name": "event_type", "type" : "string" }, + { "name": "ts", "type": "string" } + ] + } +---- + +. Register the schema under the `ClickEvent-value` subject: ++ +[,bash] +---- +rpk registry schema create ClickEvent-value --schema path/to/schema.avsc --type avro +---- + +. Produce to the `ClickEvent` topic using the following format: ++ +[,bash] +---- +echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' --schema-id=topic +---- ++ +The `value_schema_id_prefix` requires that you produce to a topic using the Schema Registry wire format, which includes the magic byte and schema ID in the prefix of the message payload. This allows Redpanda to identify the correct schema version in the Schema Registry for a record. See the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more. + + +. The following Spark SQL query returns values from columns in the `ClickEvent` table, with the table structure derived from the schema, and column names matching the schema fields. If you've integrated a catalog, query engines such as Spark SQL provide Iceberg integrations that allow easy discovery and access to existing Iceberg tables in object storage. ++ +[,sql] +---- +SELECT * +FROM ``.redpanda.ClickEvent; +---- ++ +[,bash,role=no-copy] +---- ++-----------------------------------+---------+--------------+--------------------------+ +| redpanda | user_id | event_type | ts | ++-----------------------------------+---------+--------------+--------------------------+ +| {"partition":0,"offset":0,"timestamp":2025-03-05 15:09:20.436,"headers":null,"key":null} | 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z | ++-----------------------------------+---------+--------------+--------------------------+ +---- + +=== Topic in key-value mode + +In `key_value` mode, you do not associate the topic with a schema in the Schema Registry, which means using semi-structured data in Iceberg. The record keys and values can have an arbitrary structure, so Redpanda stores them in https://apache.github.io/iceberg/spec/?h=spec#primitive-types[binary format^] in Iceberg. + +In this example, assume that you have created the `ClickEvent_key_value` topic, and set `redpanda.iceberg.mode` to `key_value`. + +. Produce to the `ClickEvent_key_value` topic using the following format: ++ +[,bash] +---- +echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n' +---- + +. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value: ++ +[,sql] +---- +SELECT * +FROM ``.redpanda.ClickEvent_key_value; +---- ++ +[,bash,role=no-copy] +---- ++-----------------------------------+------------------------------------------------------------------------------+ +| redpanda | value | ++-----------------------------------+------------------------------------------------------------------------------+ +| {"partition":0,"offset":0,"timestamp":2025-03-05 15:14:30.931,"headers":null,"key":key1} | {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"} | ++-----------------------------------+------------------------------------------------------------------------------+ +---- + +Depending on your query engine, you might need to first decode the binary value to display the record key and value using a SQL helper function. For example, see the https://spark.apache.org/docs/latest/api/sql/index.html#unhex[`decode` and `unhex`^] Spark SQL functions, or the https://docs.snowflake.com/en/sql-reference/functions/hex_decode_string[HEX_DECODE_STRING^] Snowflake function. Some engines may also automatically decode the binary value for you. + +=== Time travel queries + +Some query engines, such as Spark, support time travel with Iceberg, allowing you to query the table at a specific point in time. You can query the table as it existed at a specific timestamp or version number. + +Redpanda automatically removes expired snapshots on a periodic basis, which also reduces the window available for time travel queries. + +The following example queries a `ClickEvent` table at a specific timestamp in Spark: + +[,sql] +---- +SELECT * FROM ``.redpanda.ClickEvent TIMESTAMP AS OF '2025-03-02 10:00:00'; +---- diff --git a/modules/manage/partials/iceberg/use-iceberg-catalogs.adoc b/modules/manage/partials/iceberg/use-iceberg-catalogs.adoc new file mode 100644 index 0000000000..6434c68bc7 --- /dev/null +++ b/modules/manage/partials/iceberg/use-iceberg-catalogs.adoc @@ -0,0 +1,114 @@ +ifdef::env-cloud[:about-iceberg-doc: manage:iceberg/about-iceberg-topics.adoc] +ifndef::env-cloud[:about-iceberg-doc: manage:iceberg/topic-iceberg-integration.adoc] + +To read from the Redpanda-generated xref:{about-iceberg-doc}[Iceberg table], your Iceberg-compatible client or tool needs access to the catalog to retrieve the table metadata and know the current state of the table. The catalog provides the current table metadata, which includes locations for all the table's data files. You can configure Redpanda to either connect to a REST-based catalog, or use a filesystem-based catalog. + +ifdef::env-cloud[] +NOTE: The Iceberg integration for Redpanda Cloud is a beta feature. It is not supported for production deployments. To configure REST catalog authentication for use with Iceberg topics in your cloud cluster, contact https://support.redpanda.com/hc/en-us/requests/new[Redpanda support^]. +endif::[] + +For production deployments, Redpanda recommends using an external REST catalog to manage Iceberg metadata. This enables built-in table maintenance, safely handles multiple engines and tools accessing tables at the same time, facilitates data governance, and maximizes data discovery. However, if it is not possible to use a REST catalog, you may use the filesystem-based catalog (`object_storage` catalog type), which does not require you to maintain a separate service to access the Iceberg data. In either case, you use the catalog to load, query, or refresh the Iceberg table as you produce to the Redpanda topic. See the documentation for your query engine or Iceberg-compatible tool for specific guidance on adding the Iceberg tables to your data warehouse or lakehouse using the catalog. + +After you have selected a catalog type at the cluster level and xref:{about-iceberg-doc}#enable-iceberg-integration[enabled the Iceberg integration] for a topic, you cannot switch to another catalog type. + +ifndef::env-cloud[] +== Connect to a REST catalog + +Connect to an Iceberg REST catalog using the standard https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml[REST API^] supported by many catalog providers. Use this catalog integration type with REST-enabled Iceberg catalog services, such as https://docs.databricks.com/en/data-governance/unity-catalog/index.html[Databricks Unity^] and https://other-docs.snowflake.com/en/opencatalog/overview[Snowflake Open Catalog^]. + +To connect to a REST catalog, set the following cluster configuration properties: + +* config_ref:iceberg_catalog_type,true,properties/cluster-properties[`iceberg_catalog_type`]: `rest` +* config_ref:iceberg_rest_catalog_endpoint,true,properties/cluster-properties[`iceberg_rest_catalog_endpoint`]: The endpoint URL for your Iceberg catalog, which you either manage directly, or is managed by an external catalog service. +* config_ref:iceberg_rest_catalog_client_id,true,properties/cluster-properties[`iceberg_rest_catalog_client_id`]: The ID to connect to the REST catalog. +* config_ref:iceberg_rest_catalog_client_secret,true,properties/cluster-properties[`iceberg_rest_catalog_client_secret`]: The secret data to connect to the REST catalog. + +For REST catalogs that use self-signed certificates, also configure these properties: + +* config_ref:iceberg_rest_catalog_trust_file,true,properties/cluster-properties[`iceberg_rest_catalog_trust_file`]: The path to a file containing a certificate chain to trust for the REST catalog. +* config_ref:iceberg_rest_catalog_crl_file,true,properties/cluster-properties[`iceberg_rest_catalog_crl_file`]: The path to the certificate revocation list for the specified trust file. + +See xref:reference:properties/cluster-properties.adoc[Cluster Configuration Properties] for the full list of cluster properties to configure for a catalog integration. + +=== Example REST catalog configuration + +For example, if you have Redpanda cluster configuration properties set to connect to a REST catalog: + +[,yaml] +---- +iceberg_catalog_type: rest +iceberg_rest_catalog_endpoint: http://catalog-service:8181 +iceberg_rest_catalog_client_id: +iceberg_rest_catalog_client_secret: +---- + +And you use Apache Spark as a processing engine, configured to use a catalog named `streaming`: + +[,spark] +---- +spark.sql.catalog.streaming = org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.streaming.type = rest +spark.sql.catalog.streaming.uri = http://catalog-service:8181 +# You may need to configure additional properties based on your object storage provider. +# See https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration and https://spark.apache.org/docs/latest/configuration.html +# For example, for AWS S3: +# spark.sql.catalog.streaming.io-impl = org.apache.iceberg.aws.s3.S3FileIO +# spark.sql.catalog.streaming.warehouse = s3:/// +# spark.sql.catalog.streaming.s3.endpoint = http:// +---- + +NOTE: Redpanda recommends setting credentials in environment variables so Spark can securely access your Iceberg data in object storage. For example, for AWS, use `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. + +The Spark engine can use the REST catalog to automatically discover the topic's Iceberg table. Using Spark SQL, you can query the Iceberg table directly by specifying the catalog name, the namespace, and the table name: + +[,sql] +---- +SELECT * FROM streaming.redpanda.; +---- + +The Iceberg table name is the name of your Redpanda topic. Redpanda puts the Iceberg table into a namespace called `redpanda`, creating the namespace if necessary. + +TIP: You may need to explicitly create a table for the Iceberg data in your query engine. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[]. +endif::[] + +== Integrate filesystem-based catalog (`object_storage`) + +By default, Iceberg topics use the filesystem-based catalog (config_ref:iceberg_catalog_type,true,properties/cluster-properties[`iceberg_catalog_type`] cluster property set to `object_storage`). Redpanda stores the table metadata in hhttps://iceberg.apache.org/docs/latest/java-api-quickstart/#using-a-hadoop-catalog[HadoopCatalog^] format in the same object storage bucket or container as the data files. + +If using the `object_storage` catalog type, you provide the object storage URI of the table's `metadata.json` file to an Iceberg client so it can access the catalog and data files for your Redpanda Iceberg tables. + +NOTE: The `metadata.json` file points to a specific Iceberg table snapshot. In your query engine, you must update your tables whenever a new snapshot is created so that they point to the latest snapshot. See the https://iceberg.apache.org/docs/latest/maintenance/[official Iceberg documentation] for more information, and refer to the documentation for your query engine or Iceberg-compatible tool for specific guidance on Iceberg table update or refresh. + +=== Example filesystem-based catalog configuration + +To configure Apache Spark to use a filesystem-based catalog, specify at least the following properties: + +[,spark] +---- +spark.sql.catalog.streaming = org.apache.iceberg.spark.SparkCatalog +spark.sql.catalog.streaming.type = hadoop +# URI for table metadata: AWS S3 example +spark.sql.catalog.streaming.warehouse = s3a:///redpanda-iceberg-catalog +# You may need to configure additional properties based on your object storage provider. +# See https://iceberg.apache.org/docs/latest/spark-configuration/#spark-configuration and https://spark.apache.org/docs/latest/configuration.html +# For example, for AWS S3: +# spark.hadoop.fs.s3.impl = org.apache.hadoop.fs.s3a.S3AFileSystem +# spark.hadoop.fs.s3a.endpoint = http:// +# spark.sql.catalog.streaming.s3.endpoint = http:// +---- + +NOTE: Redpanda recommends setting credentials in environment variables so Spark can securely access your Iceberg data in object storage. For example, for AWS, use `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. + +Depending on your processing engine, you may need to also create a new table to point the data lakehouse to the table location. + +=== Specify metadata location + +ifndef::env-cloud[] +The config_ref:iceberg_catalog_base_location,true,properties/cluster-properties[`iceberg_catalog_base_location`] property stores the base path for the filesystem-based catalog if using the `object_storage` catalog type. The default value is `redpanda-iceberg-catalog`. + +CAUTION: Do not change the `iceberg_catalog_base_location` value after you have enabled Iceberg integration for a topic. +endif::[] + +ifdef::env-cloud[] +The base path for the filesystem-based catalog if using the `object_storage` catalog type is `redpanda-iceberg-catalog`. +endif::[]