Skip to content

Commit b59245f

Browse files
Updates to the Event Processor implementation; cleaned up some issues in samples (#2429)
* Updates to the Event Processor implementation; cleaned up some issues in samples * PR feedback from GH copilot * Update sdk/eventhubs/azure_messaging_eventhubs/CHANGELOG.md Co-authored-by: Heath Stewart <[email protected]> * Update sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/README.md Co-authored-by: Heath Stewart <[email protected]> * Update sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/README.md Co-authored-by: Heath Stewart <[email protected]> --------- Co-authored-by: Heath Stewart <[email protected]>
1 parent 33fa800 commit b59245f

14 files changed

+175
-91
lines changed

sdk/eventhubs/azure_messaging_eventhubs/CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
- Removed the requirement that streaming messages from the `stream_events` method on the `EventReceiver` use `pin_mut!()` on the provided stream.
1414
- Removed direct dependencies on `tokio` package.
1515
- Added `partition_id` option to `SendMessageOptions`.
16-
- Significant modifications to API surface to improve Azure RUST guidelines - APIs which take ownership of a string consume `String` parameter instead of borrowing a `&str` parameter.
16+
- Significant modifications to API surface to improve conformance to Azure RUST guidelines e.g., APIs which take ownership of a string consume `String` parameter instead of borrowing a `&str` parameter.
1717

1818
### Breaking Changes
1919

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_batch_produce_events.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4+
//! This sample demonstrates how to send events to all partitions using a batch sender.
5+
46
use azure_core::Uuid;
5-
/// This sample demonstrates how to send events to all partitions using a batch sender.
6-
///
77
use azure_identity::DefaultAzureCredential;
88
use azure_messaging_eventhubs::{models::EventData, EventDataBatchOptions, ProducerClient};
99

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_consume_events.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
use std::time::Duration;
4+
//! This sample demonstrates how to consume events from an Event Hub partition using the [`ConsumerClient`].
55
6-
/// This sample demonstrates how to consume events from an Event Hub partition using the [`ConsumerClient`].
7-
///
86
use azure_identity::DefaultAzureCredential;
97
use azure_messaging_eventhubs::{
108
ConsumerClient, OpenReceiverOptions, StartLocation, StartPosition,
119
};
1210
use futures::StreamExt;
11+
use std::time::Duration;
1312

1413
#[tokio::main]
1514
async fn main() -> Result<(), Box<dyn std::error::Error>> {

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_consume_messages.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
use std::time::Duration;
5-
6-
/// This sample demonstrates how to consume events from an Event Hub partition using the `ConsumerClient`.
7-
///
4+
//! This sample demonstrates how to consume events from an Event Hub partition using the `ConsumerClient`.
85
use azure_identity::DefaultAzureCredential;
96
use azure_messaging_eventhubs::{
107
ConsumerClient, OpenReceiverOptions, StartLocation, StartPosition,
118
};
129
use futures::StreamExt;
10+
use std::time::Duration;
1311

1412
#[tokio::main]
1513
async fn main() -> Result<(), Box<dyn std::error::Error>> {

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_partition_properties.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
3+
4+
//! Example of using the Event Hubs SDK to get partition properties.
5+
36
use azure_core::error::Result;
47
use azure_identity::DefaultAzureCredential;
58
use azure_messaging_eventhubs::ProducerClient;
6-
79
use std::env;
810

911
#[tokio::main]

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_processor_client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33

44
// cspell: words checkpointing
55

6-
/// This sample demonstrates how to use an Event Hubs Processor to manage receiving
7-
/// events from an Event Hub partition and checkpointing the events that have been processed.
8-
///
6+
//! This sample demonstrates how to use an Event Hubs Processor to manage receiving
7+
//! events from an Event Hub partition and checkpointing the events that have been
8+
//! processed.
9+
910
use azure_identity::DefaultAzureCredential;
1011
use azure_messaging_eventhubs::{ConsumerClient, EventProcessor, InMemoryCheckpointStore};
1112
use futures::StreamExt;

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_produce_events.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
/// This sample demonstrates how to send events to an Event Hub partition using the `ProducerClient`.
5-
///
4+
//! This sample demonstrates how to send events to an Event Hub partition using the `ProducerClient`.
5+
66
use azure_core::Uuid;
77
use azure_identity::DefaultAzureCredential;
88
use azure_messaging_eventhubs::{models::EventData, ProducerClient, SendEventOptions};

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_produce_messages.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
// Copyright (c) Microsoft Corporation.
22
// Licensed under the MIT license.
33

4-
/// This sample demonstrates how to send AMQP messages to an Event Hub partition using the `ProducerClient`.
5-
///
4+
//! This sample demonstrates how to send AMQP messages to an Event Hub partition using the `ProducerClient`.
5+
66
use azure_identity::DefaultAzureCredential;
77
use azure_messaging_eventhubs::{
88
models::{AmqpMessage, AmqpValue},

sdk/eventhubs/azure_messaging_eventhubs/examples/eventhubs_properties.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
// Copyright (c) Microsoft Corporation. All Rights reserved
22
// Licensed under the MIT license.
33

4+
//! This sample shows retrieving the properties of an Event Hub.
5+
46
use azure_core::error::Result;
57
use azure_identity::DefaultAzureCredential;
68
use azure_messaging_eventhubs::ProducerClient;
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
# Azure Event Hubs Event Processor notes
2+
3+
The following are a set of collected notes on Event Hubs event processor functionality.
4+
5+
## Processor
6+
7+
The processor provides a mechanism interface which allows MULTIPLE Event Hubs clients to process the events from multiple partitions while attempting to ensure that events are never lost.
8+
9+
### Processor concepts
10+
11+
#### Checkpoint
12+
13+
A "Checkpoint" represents a fixed location within an Event Hubs partition. A Checkpoint contains two sets of information:
14+
15+
1. Information to uniquely identify the EventHubs partition (Event Hub instance FQDN, Event Hub name, consumer group, and partition ID)
16+
1. Information to identify an event within that partition (sequence number or offset).
17+
18+
#### Ownership
19+
20+
Within the processor, each partition is "owned" by a processor client. Ownership is defined by an "Ownership" structure, which describes the partition being owned, which client "owns" the partition, and when was the last time that the ownership was updated (this allows the processor to claim ownership of clients which were previously owned by processor instances that are no longer running).
21+
22+
#### Checkpoint Store
23+
24+
A Checkpoint Store is used to persist the value of `Checkpoints` and `Ownerships`. Typically different processors will use the same checkpoint store (normally backed by Azure Blob Storage) to coordinate between the processors.
25+
26+
A Checkpoint store contains 4 methods:
27+
28+
- list_checkpoints - Returns the current checkpoints for a given EventHubs instance.
29+
- list_ownership - Returns the owners for a given EventHubs instance.
30+
- claim_ownership - Claim ownership for a set of partitions.
31+
- update_checkpoint - Update a checkpoint with a new checkpoint value.
32+
33+
#### Partition Client
34+
35+
A processor partition client receives events from an eventhubs instance. It also can update the checkpoint for the partition from a received message.
36+
37+
### Processor configuration options
38+
39+
The processor has several configuration options available:
40+
41+
- Load Balancing Strategy - the strategy used by the load balancer (Greedy or Balanced). More on load balancing strategy in [Load Balancing Strategy](#load-balancing-strategy)
42+
- Update Interval - the duration that the processor should sleep between processor cycles. The default update interval is 30 seconds.
43+
- Partition Expiration Duration - The time after which a partition is considered unowned. The default partition expiration duration is 2 minutes.
44+
- Start Positions - the starting position for each partition (or a default starting position for all partitions).
45+
- Prefetch - the number of items to prefetch for each partition client.
46+
- Maximum number of partitions - The maximum number of partitions which the processor should manage.
47+
48+
### Processor operation
49+
50+
A processor client starts the processor running in the background (using a language specific mechanism to start the processor). It then calls into the processor to receive a "partition client".
51+
52+
The client can then use the partition client to receive messages, and after it has processed the message, it asks the processor to update the checkpoint for the client (indicating that the client has finished processing the message and that the processor should not hand this message to another processor client).
53+
54+
When the processor starts, it runs a dispatch cycle and then sleeps for the processor update interval. This continues until either an error occurs or the processor is stopped.
55+
56+
### Processor dispatch cycle
57+
58+
For each processor dispatch cycle, the processor first load balances among the partitions.
59+
The load balancer will return a set of ownerships that the current processor instance now owns. The processor will then add a partition client to the set of partition clients which can be returned to the client.
60+
61+
### Load Balancing Cycle
62+
63+
The first thing that the load balancer does is to query the checkpoint store for the current state of the checkpoint store. It categories all ownerships into three buckets:
64+
65+
- Partitions owned by the current processor instance.
66+
- Partitions that are unowned or are expired.
67+
- Partitions whose clients own "too many" partitions.
68+
69+
The load balancer also determines the number of *active* processors (based on ownership).
70+
71+
Once the load balancer has determined the set of ownerships, the load balancer attempts to claim the "appropriate" number of partitions.
72+
73+
To do this, the processor performs the following operations:
74+
75+
- If the processor instance already has its fair share of partitions, exit.
76+
- If there are unowned partitions, claim one at random.
77+
- If there are no unowned partitions, pick one to "steal" at random.
78+
- Update the checkpoint store with any changes. If the partition we are attempting to claim is currently owned by another processor, that will result in an HTTP 412 error. If we receive this, update local state to remove the ownership claim.
79+
80+
#### Raw notes on load balancing
81+
82+
- Renew Local Ownership
83+
- Based on local state
84+
- Each that the processor thinks it owns, call to SetMetadata. May call Upload.
85+
- If no longer owned, results in HTTP 412, local state updated to remove ownership (seen in the logs; indicates stolen between load balancing cycles)
86+
87+
- List All Ownership
88+
- Calls ListBlobs with metadata trait set
89+
- If this fails, the load balancing cycle cannot continue; local state is preserved (seen in logs; processors will fight and Event Hubs will enforce single reader)
90+
91+
- Calculate Ownership
92+
- Update state for all ownership and all unowned partitions; expired ownership is unowned
93+
- Determine number of active processors by looking at ownership; an active processor must own at least one partition
94+
95+
- Claim Ownership
96+
- Determine if this instance has its fair share based on count of active processors
97+
- If fair share is owned, do nothing; assume unowned will be claimed by a processor without its fair share
98+
- If unowned partitions, pick one to claim at random
99+
- If no unowned partitions, pick one to steal at random
100+
- Update storage with any change, call to SetMetadata. May call Upload.
101+
- If claimed by another, results in HTTP 412, local state updated to remove ownership (seen in the logs; indicates stolen between load balancing cycles)
102+
103+
- Determine balance stability
104+
- If fair share of partitions are owned and no claim was needed, assume stable
105+
106+
- Ensure owned partitions are being processed
107+
- If no current processing task, initialize and start
108+
- If owned partition has a completed task, capture exceptions, initialize and restart
109+
110+
- Calculate next cycle time
111+
- If greedy strategy and not stable, run immediate
112+
- If elapsed time was more than the load balancing interval, run immediate
113+
- Delay for (load balancing interval - current cycle time), then run next
114+
115+
## Load Balancing Strategy
116+
117+
CALLS PER CYCLE
118+
- Event Hubs
119+
- Query partitions (1)
120+
- Create receiver, start reading (varies by claimed/faulted, max of owned partitions)
121+
122+
- Storage
123+
- List Blobs (1)
124+
- SetMetadata (varies by owned/claimed, max of partition count * 2)
125+
- Upload (varies by new partitions, max of partition count)

0 commit comments

Comments
 (0)