Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 1 addition & 120 deletions modules/manage/pages/iceberg/query-iceberg-topics.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,126 +10,7 @@ include::shared:partial$enterprise-license.adoc[]

When you access Iceberg topics from a data lakehouse or other Iceberg-compatible tools, how you consume the data depends on the topic xref:manage:iceberg/topic-iceberg-integration.adoc#enable-iceberg-integration[Iceberg mode] and whether you've registered a schema for the topic in the xref:manage:schema-reg/schema-reg-overview.adoc[Redpanda Schema Registry]. In either mode, you do not need to rely on complex ETL jobs or pipelines to access real-time data from Redpanda.

== Access Iceberg tables

Depending on the processing engine and your Iceberg xref:manage:iceberg/use-iceberg-catalogs.adoc[catalog implementation], you may also need to create a table to point your data lakehouse to the table location in the catalog. For an example, see xref:manage:iceberg/redpanda-topics-iceberg-snowflake-catalog.adoc[].

If your engine needs the full JSON metadata path, use the following:

```
redpanda-iceberg-catalog/metadata/redpanda/<topic-name>/v<version-number>.metadata.json
```

This provides read access to all snapshots written as of the specified table version (denoted by `version-number`).

NOTE: Redpanda automatically removes expired snapshots on a periodic basis. Snapshot expiry helps maintain a smaller metadata size and reduces the window available for <<time-travel-queries,time travel>>.

== Query examples

To follow along with the examples on this page, suppose you produce the same stream of events to a topic `ClickEvent`, which uses a schema, and another topic `ClickEvent_key_value`, which uses the key-value mode. The topics have xref:manage:tiered-storage.adoc[Tiered Storage] configured to an AWS S3 bucket. A sample record contains the following data:

[,bash,role=no-copy]
----
{"user_id": 2324, "event_type": "BUTTON_CLICK", "ts": "2024-11-25T20:23:59.380Z"}
----

=== Topic with schema (`value_schema_id_prefix` mode)

In this example, it is assumed you have created the `ClickEvent` topic, set `redpanda.iceberg.mode` to `value_schema_id_prefix`, and are connecting to a REST-based Iceberg catalog. The following is an Avro schema for `ClickEvent`:

.`schema.avsc`
[,avro]
----
{
"type" : "record",
"namespace" : "com.redpanda.examples.avro",
"name" : "ClickEvent",
"fields" : [
{ "name": "user_id", "type" : "int" },
{ "name": "event_type", "type" : "string" },
{ "name": "ts", "type": "string" }
]
}
----

. Register the schema under the `ClickEvent-value` subject:
+
[,bash]
----
rpk registry schema create ClickEvent-value --schema path/to/schema.avsc --type avro
----

. Produce to the `ClickEvent` topic using the following format:
+
[,bash]
----
echo '"key1" {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent --format='%k %v\n' --schema-id=topic
----
+
The `value_schema_id_prefix` requires that you produce to a topic using the Schema Registry wire format, which includes the magic byte and schema ID in the prefix of the message payload. This allows Redpanda to identify the correct schema version in the Schema Registry for a record. See the https://www.redpanda.com/blog/schema-registry-kafka-streaming#how-does-serialization-work-with-schema-registry-in-kafka[Understanding Apache Kafka Schema Registry^] blog post to learn more about the wire format.


. The following Spark SQL query returns values from columns in the `ClickEvent` table, with the table structure derived from the schema, and column names matching the schema fields. If you've integrated a catalog, query engines such as Spark SQL provide Iceberg integrations that allow easy discovery and access to existing Iceberg tables in object storage.
+
[,sql]
----
SELECT *
FROM `<catalog-name>`.redpanda.ClickEvent;
----
+
[,bash,role=no-copy]
----
+-----------------------------------+---------+--------------+--------------------------+
| redpanda | user_id | event_type | ts |
+-----------------------------------+---------+--------------+--------------------------+
| {"partition":0,"offset":0,"timestamp":2025-03-05 15:09:20.436,"headers":null,"key":null} | 2324 | BUTTON_CLICK | 2024-11-25T20:23:59.380Z |
+-----------------------------------+---------+--------------+--------------------------+
----

=== Topic in key-value mode

In `key_value` mode, you do not associate the topic with a schema in the Schema Registry, which means using semi-structured data in Iceberg. The record keys and values can have an arbitrary structure, so Redpanda stores them in https://apache.github.io/iceberg/spec/?h=spec#primitive-types[binary format^] in Iceberg.

In this example, it is assumed you have created the `ClickEvent_key_value` topic, and set `redpanda.iceberg.mode` to `key_value`.

. Produce to the `ClickEvent_key_value` topic using the following format:
+
[,bash]
----
echo 'key1 {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"}' | rpk topic produce ClickEvent_key_value --format='%k %v\n'
----

. The following Spark SQL query returns the semi-structured data in the `ClickEvent_key_value` table. The table consists of two columns: one named `redpanda`, containing the record key and other metadata, and another binary column named `value` for the record's value:
+
[,sql]
----
SELECT *
FROM `<catalog-name>`.redpanda.ClickEvent_key_value;
----
+
[,bash,role=no-copy]
----
+-----------------------------------+------------------------------------------------------------------------------+
| redpanda | value |
+-----------------------------------+------------------------------------------------------------------------------+
| {"partition":0,"offset":0,"timestamp":2025-03-05 15:14:30.931,"headers":null,"key":key1} | {"user_id":2324,"event_type":"BUTTON_CLICK","ts":"2024-11-25T20:23:59.380Z"} |
+-----------------------------------+------------------------------------------------------------------------------+
----

Depending on your query engine, you might need to first decode the binary value to display the record key and value using a SQL helper function. For example, see the https://spark.apache.org/docs/latest/api/sql/index.html#unhex[`decode` and `unhex`^] Spark SQL functions, or the https://docs.snowflake.com/en/sql-reference/functions/hex_decode_string[HEX_DECODE_STRING^] Snowflake function. Some engines may also automatically decode the binary value for you.

=== Time travel queries

Some query engines, such as Spark, support time travel with Iceberg, allowing you to query the table at a specific point in time. You can query the table as it existed at a specific timestamp or version number.

Redpanda automatically removes expired snapshots on a periodic basis, which also reduces the window available for time travel queries.

The following is an example of querying a `ClickEvent` table at a specific timestamp in Spark:

[,sql]
----
SELECT * FROM `<catalog-name>`.redpanda.ClickEvent TIMESTAMP AS OF '2025-03-02 10:00:00';
----
include::manage:partial$iceberg/query-iceberg-topics.adoc[]

== Next steps

Expand Down
Loading