Skip to content

Commit cc8a9b4

Browse files
authored
Merge pull request #273883 from axisc/release-build-2024-event-hubs
Kafka features parity document
2 parents e4beadc + 49381d8 commit cc8a9b4

File tree

5 files changed

+364
-44
lines changed

5 files changed

+364
-44
lines changed

articles/event-hubs/TOC.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,10 @@
236236
href: apache-kafka-migration-guide.md
237237
- name: Kafka troubleshooting guide for Event Hubs
238238
href: apache-kafka-troubleshooting-guide.md
239+
- name: Transactions in Apache Kafka for Azure Event Hubs
240+
href: apache-kafka-transactions.md
241+
- name: Kafka Streams for Azure Event Hubs
242+
href: apache-kafka-streams.md
239243
- name: Replicate data from a Kafka cluster to Event Hubs
240244
items:
241245
- name: Using Apache Kafka Mirror Maker 2
Lines changed: 154 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
---
2+
title: Kafka Streams for Apache Kafka in Event Hubs on Azure Cloud
3+
description: Learn about how to use the Apache Kafka Streams API with Event Hubs service on Azure Cloud.
4+
ms.topic: overview
5+
ms.date: 04/29/2024
6+
---
7+
8+
# Kafka Streams for Azure Event Hubs
9+
10+
This article provides details on how to us the [Kafka Streams](https://kafka.apache.org/documentation/streams/) client library with Azure Event Hubs.
11+
12+
> [!NOTE]
13+
> Kafka Streams functionality is available in **Public Preview** for Event Hubs Premium and Dedicated tiers only.
14+
>
15+
16+
## Overview
17+
18+
Apache Kafka Streams is a Java only client library that provides a framework for processing of streaming data and building real-time applications against the data stored in Kafka topics. All the processing is scoped to the client, while Kafka topics act as the data store for intermediate data, before the output is written to the destination topic.
19+
20+
Event Hubs provides a Kafka endpoint to be used with your existing Kafka client applications as an alternative to running your own Kafka cluster. Event Hubs works with many of your existing Kafka applications. For more information, see [Event Hubs for Apache Kafka](azure-event-hubs-kafka-overview.md).
21+
22+
## Using Kafka Streams with Azure Event Hubs
23+
24+
Azure Event Hubs natively supports both the AMQP and Kafka protocol. However, to ensure compatible Kafka Streams behavior, some of the default configuration parameters have to be updated for Kafka clients.
25+
26+
| Property | Default behavior for Event Hubs | Modified behavior for Kafka streams | Explanation |
27+
| ----- | ---- | ----| ---- |
28+
| `messageTimestampType` | set to `AppendTime` | should be set to `CreateTime` | Kafka Streams relies on creation timestamp rather than append timestamp |
29+
| `message.timestamp.difference.max.ms` | max allowed value is 90 days | Property is used to govern past timestamps only. Future time is set to 1 hour and can't be changed. | This is in line with the Kafka protocol specification |
30+
| `min.compaction.lag.ms` | | max allowed value is two days ||
31+
| Infinite retention topics | | size based truncation of 250 GB for each topic-partition||
32+
| Delete record API for infinite retention topics| | Not implemented. As a workaround, the topic can be updated and a finite retention time can be set.| This will be done in GA |
33+
34+
### Other considerations
35+
36+
Here are some of the other considerations to keep in mind.
37+
38+
* Kafka streams client applications must be granted management, read, and write permissions for the entire namespaces to be able to create temporary topics for stream processing.
39+
* Temporary topics and partitions count towards the quota for the given namespace. These should be kept under consideration when provisioning the namespace or cluster.
40+
* Infinite retention time for "Offset" Store is limited by max message retention time of the SKU. Check [Event Hubs Quotas](event-hubs-quotas.md) for these tier specific values.
41+
42+
43+
These include, updating the topic configuration in the `messageTimestampType` to use the `CreateTime` (that is, Event creation time) instead of the `AppendTime` (that is, log append time).
44+
45+
To override the default behavior (required), the below setting must be set in Azure Resource Manager (ARM).
46+
47+
> [!NOTE]
48+
> Only the specific parts of the ARM template are shown to highlight the configuration that needs to be updated.
49+
>
50+
51+
```json
52+
{
53+
"parameters": {
54+
"namespaceName": "contoso-test-namespace",
55+
"resourceGroupName": "contoso-resource-group",
56+
"eventHubName": "contoso-event-hub-kafka-streams-test",
57+
...
58+
"parameters": {
59+
"properties": {
60+
...
61+
"messageTimestampType": "CreateTime",
62+
"retentionDescription": {
63+
"cleanupPolicy": "Delete",
64+
"retentionTimeInHours": -1,
65+
"tombstoneRetentionTimeInHours": 1
66+
}
67+
}
68+
}
69+
}
70+
}
71+
```
72+
73+
## Kafka Streams concepts
74+
75+
Kafka streams provides a simple abstraction layer over the Kafka producer and consumer APIs to help developers get started with real time streaming scenarios faster. The light-weight library depends on an **Apache Kafka compatible broker** (like Azure Event Hubs) for the internal messaging layer, and manages a **fault tolerant local state store**. With the transactional API, the Kafka streams library supports rich processing features such as **exactly once processing** and **one record at a time processing**.
76+
77+
Records arriving out of order benefit from **event-time based windowing operations**.
78+
79+
> [!NOTE]
80+
> We recommend familiarizing yourself with [Kafka Streams documentation](https://kafka.apache.org/37/documentation/streams/) and [Kafka Streams core concepts](https://kafka.apache.org/37/documentation/streams/core-concepts).
81+
>
82+
83+
### Streams
84+
85+
A stream is the abstracted representation of a Kafka topic. It consists of an unbounded, continuously updating data set of immutable data records, where each data record is a key-value pair.
86+
87+
### Stream processing topology
88+
89+
A Kafka streams application defines the computational logic through a [DAG (directed acyclic graph)](https://en.wikipedia.org/wiki/Directed_acyclic_graph) represented by a processor [topology](https://javadoc.io/doc/org.apache.kafka/kafka-streams/latest/org/apache/kafka/streams/Topology.html). The processor topology comprises stream processors(nodes in the topology) which represent a processing step, connected by streams(edges in the topology).
90+
91+
Stream processors can be chained to upstream processors or downstream processors, except for certain special cases:
92+
* Source processors - These processors don't have any upstream processors and read from one or more streams directly. They can then be chained to downstream processors.
93+
* Sink processors - These processors don't have any downstream processors and must write directly to a stream.
94+
95+
Stream processing topology can be defined either with the [Kafka Streams DSL](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html) or with the lower-level [Processor API](https://kafka.apache.org/37/documentation/streams/developer-guide/processor-api.html).
96+
97+
98+
### Stream and Table duality
99+
100+
Streams and tables are 2 different but useful abstractions provided by the [Kafka Streams DSL](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html), modeling both time series and relational data formats that must coexist for stream processing use-cases.
101+
102+
Kafka extends this further and introduces a duality between streams and tables, where a
103+
* A **stream** can be considered as a changelog of a **table**, and
104+
* A **table** can be considered as a snapshot of the latest value of each key in a **stream**.
105+
106+
This duality allows tables and streams to be used interchangeably as required by the use-case.
107+
108+
For example
109+
110+
* Joining static customer data (modeled as a table) with dynamic transactions (modeled as a stream), and
111+
* Joining changing portfolio positions in a day traders portfolio (modeled as a stream) with the latest market data feed(modeled as a stream).
112+
113+
### Time
114+
115+
Kafka Streams allows windowing and grace functions to allow for out of order data records to be ingested and still be included in the processing. To ensure that this behavior is deterministic, there are additional notions of time in Kafka streams. These include:
116+
117+
* Creation time (also known as 'Event time') - This is the time when the event occurred and the data record was created.
118+
* Processing time - This is the time when the data record is processed by the stream processing application (or when it's consumed).
119+
* Append time (also known as 'Creation time') - This is the time when the data is stored and committed to the storage of the Kafka broker. This differs from the creation time because of the time difference between the creation of the event and the actual ingestion by the broker.
120+
121+
122+
123+
124+
### Stateful operations
125+
126+
State management enables sophisticated stream processing applications like joining and aggregating data from different streams. This is achieved with state stores provided by Kafka Streams and accessed using [stateful operators in the Kafka Streams DSL](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#stateful-transformations).
127+
128+
Stateful transformations in the DSL include:
129+
* [Aggregating](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-aggregating)
130+
* [Joining](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-joins)
131+
* [Windowing (as part of aggregations and joins)](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-windowing)
132+
* [Applying custom processors and transformers](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html#streams-developer-guide-dsl-process), which may be stateful, for Processor API integration
133+
134+
### Window and grace
135+
136+
Windowing operations in the [Kafka Streams DSL](https://kafka.apache.org/37/documentation/streams/developer-guide/dsl-api.html) allow developers to control how records are grouped for a given key for [stateful operations like aggregations and joins](#stateful-operations).
137+
138+
Windowing operations also permit the specification of a **grace period** to provide some flexibility for out-of-order records for a given window. A record that is meant for a given window and arrives after the given window but within the grace period is accepted. Records arriving after the grace period is over are discarded.
139+
140+
Applications must utilize the windowing and grace period controls to improve fault tolerance for out-of-order records. The appropriate values vary based on the workload and must be identified empirically.
141+
142+
143+
### Processing guarantees
144+
145+
Business and technical users seek to extract key business insights from the output of stream processing workloads, which translate to high transactional guarantee requirements. Kafka streams works together with Kafka transactions to ensure transactional processing guarantees by integrating with the Kafka compatible brokers' (such as Azure Event Hubs) underlying storage system to ensure that offset commits and state store updates are written atomically.
146+
147+
To ensure transactional processing guarantees, the `processing.guarantee` setting in the Kafka Streams configs must be updated from the default value of `at_least_once` to `exactly_once_v2` (for client versions at or after Apache Kafka 2.5) or `exactly_once` (for client versions before Apache Kafka 2.5.x).
148+
149+
## Next steps
150+
This article provided an introduction to Event Hubs for Kafka. To learn more, see [Apache Kafka developer guide for Azure Event Hubs](apache-kafka-developer-guide.md).
151+
152+
For a **tutorial** with step-by-step instructions to create an event hub and access it using SAS or OAuth, see [Quickstart: Data streaming with Event Hubs using the Kafka protocol](event-hubs-quickstart-kafka-enabled-event-hubs.md).
153+
154+
Also, see the [OAuth samples on GitHub](https://github.com/Azure/azure-event-hubs-for-kafka/tree/master/tutorials/oauth).
Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
---
2+
title: Transactions for Apache Kafka in Event Hubs on Azure Cloud
3+
description: Learn about the transactional API in Apache Kafka and how to use that in Apache Kafka applications with Event Hubs service on Azure Cloud.
4+
ms.topic: overview
5+
ms.date: 04/29/2024
6+
---
7+
8+
# Transactions in Apache Kafka for Azure Event Hubs
9+
10+
This article provides detail on how to use the [Apache Kafka](https://kafka.apache.org/) transactional API with Azure Event Hubs.
11+
12+
## Overview
13+
Event Hubs provides a Kafka endpoint that can be used by your existing Kafka client applications as an alternative to running your own Kafka cluster. Event Hubs works with many of your existing Kafka applications. For more information, see [Event Hubs for Apache Kafka](azure-event-hubs-kafka-overview.md).
14+
15+
This document focuses on how to use Kafka’s transactional API with Azure Event Hubs seamlessly.
16+
17+
> [!NOTE]
18+
> Kafka Transactions is currently in Public preview in Premium, and Dedicated tier.
19+
>
20+
21+
## Transactions in Apache Kafka
22+
In cloud native environments, applications must be made resilient to network disruptions and namespace restarts and upgrades. Applications requiring strict processing guarantees must utilize a transactional framework or API to ensure that either all of the operations are executed, or none are so that the application and data state is reliably managed. If the set of operations fail, they can be reliably tried again atomically to ensure the right processing guarantees.
23+
24+
> [!NOTE]
25+
> Transactional guarantees are typically required when there are multiple operations that need to be processed in an "all or nothing" fashion.
26+
>
27+
> For all other operations, client applications are **resilient by default** to retry the operation with an exponential backoff, if the specific operation failed.
28+
29+
30+
Apache Kafka provides a transactional API to ensure this level of processing guarantees across the same or different set of topic/partitions.
31+
32+
Transactions apply to the below cases:
33+
34+
* Transactional producers.
35+
* Exactly once processing semantics.
36+
37+
### Transactional Producers
38+
39+
Transactional producers ensure that data is written atomically to multiple partitions across different topics. Producers can initiate a transaction, write to multiple partitions on the same topic or across different topics, and then commit or abort the transaction.
40+
41+
To ensure that a producer is transactional, `enable.idempotence` should be set to true to ensure that the data is written exactly once, thus avoiding duplicates on the *send* side. Additionally, `transaction.id` should be set to uniquely identify the producer.
42+
43+
```java
44+
producerProps.put("enable.idempotence", "true");
45+
producerProps.put("transactional.id", "transactional-producer-1");
46+
KafkaProducer<String, String> producer = new KafkaProducer(producerProps);
47+
```
48+
49+
Once the producer is initialized, the below call ensures that the producer registers with the broker as a transactional producer -
50+
51+
```java
52+
producer.initTransactions();
53+
```
54+
55+
The producer must then begin a transaction explicitly, perform send operations across different topics and partitions as normal, and then commit the transaction with the below call –
56+
57+
```java
58+
producer.beginTransaction();
59+
/*
60+
Send to multiple topic partitions.
61+
*/
62+
producer.commitTransaction();
63+
```
64+
65+
If the transaction needs to be aborted due to a fault or a timeout, then the producer can call the `abortTransaction()` method.
66+
67+
```java
68+
producer.abortTransaction();
69+
```
70+
71+
72+
### Exactly once semantics
73+
74+
Exactly once semantics builds on the transactional producers by adding consumers in the transactional scope of the producers, so that each record is guaranteed to be read, processed, and written **exactly once**.
75+
76+
First the transactional producer is instantiated -
77+
78+
```java
79+
80+
producerProps.put("enable.idempotence", "true");
81+
producerProps.put("transactional.id", "transactional-producer-1");
82+
KafkaProducer<K, V> producer = new KafkaProducer(producerProps);
83+
84+
producer.initTransactions();
85+
86+
```
87+
88+
Then, the consumer must be configured to read only nontransactional messages, or committed transactional messages by setting the below property –
89+
90+
```java
91+
92+
consumerProps.put("isolation.level", "read_committed");
93+
KafkaConsumer <K,V> consumer = new KafkaConsumer<>(consumerProps);
94+
95+
```
96+
97+
Once the consumer is instantiated, it can subscribe to the topic from where the records must be read –
98+
99+
```java
100+
101+
consumer.subscribe(singleton("inputTopic"));
102+
103+
```
104+
105+
After the consumer polls the records from the input topic, the producer begins the transactional scope within which the record is processed and written to the output topic. Once the records are written, the updated map of offsets for all partitions is created. The producer then sends this updated offset map to the transaction before committing the transaction.
106+
107+
In any exception, the transaction is aborted and the producer retries the processing once again atomically.
108+
109+
```java
110+
while (true) {
111+
ConsumerRecords records = consumer.poll(Long.Max_VALUE);
112+
producer.beginTransaction();
113+
try {
114+
for (ConsumerRecord record : records) {
115+
/*
116+
Process record as appropriate
117+
*/
118+
// Write to output topic
119+
producer.send(producerRecord(“outputTopic”, record));
120+
}
121+
122+
/*
123+
Generate the offset map to be committed.
124+
*/
125+
Map <TopicPartition, OffsetAndMetadata> offsetsToCommit = new Hashap<>();
126+
for (TopicPartition partition : records.partitions()) {
127+
// Calculate the offset to commit and populate the map.
128+
offsetsToCommit.put(partition, new OffsetAndMetadata(calculated_offset))
129+
}
130+
131+
// send offsets to transaction and then commit the transaction.
132+
producer.sendOffsetsToTransaction(offsetsToCommit, group);
133+
producer.commitTransaction();
134+
} catch (Exception e)
135+
{
136+
producer.abortTransaction();
137+
}
138+
}
139+
```
140+
141+
> [!WARNING]
142+
>If the transaction is neither committed or aborted before the `max.transaction.timeout.ms`, the transaction will be aborted by Event Hubs automatically. The default `max.transaction.timeout.ms` is set to **15 minutes** by Event Hubs, but the producer can override it to a lower value by setting the `transaction.timeout.ms` property in the producer configuration properties.
143+
144+
## Migration Guide
145+
146+
If you have existing Kafka applications that you’d like to use with Azure Event Hubs, please review the [Kafka migration guide for Azure Event Hubs](apache-kafka-migration-guide.md) to hit the ground running quickly.
147+
148+
## Next steps
149+
150+
To learn more about Event Hubs and Event Hubs for Kafka, see the following articles:
151+
152+
- [Apache Kafka troubleshooting guide for Event Hubs](apache-kafka-troubleshooting-guide.md)
153+
- [Frequently asked questions - Event Hubs for Apache Kafka](apache-kafka-frequently-asked-questions.yml)
154+
- [Apache Kafka developer guide for Azure Event Hubs](apache-kafka-developer-guide.md)
155+
- [Recommended configurations](apache-kafka-configurations.md)

articles/event-hubs/apache-kafka-troubleshooting-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ You may see consumers not getting any records and constantly rebalancing. In thi
2525
- If your configuration matches those recommended values, and you're still seeing constant rebalancing, feel free to open up an issue (make sure to include your entire configuration in the issue so that we can help debug)!
2626

2727
## Compression/Message format version issue
28-
Event Hubs for Kafka currently supports only `gzip` compression algorithm. If any other algorithm is used, client applications will see a message-format version error (for example, `The message format version on the broker does not support the request.`).
28+
Event Hubs for Kafka currently supports only `gzip` compression algorithm. If any other algorithm is used, client applications will see a message-format version error (for example, `The message format version on the broker does not support the request.`).
2929

3030
If an unsupported compression algorithm needs to be used, compressing your data with that specific algorithm before sending it to the brokers and decompressing after receiving is a valid workaround. The message body is just a byte array to the service, so client-side compression/decompression won't cause any issues.
3131

0 commit comments

Comments
 (0)