Skip to content

Commit 98fb02a

Browse files
Implement checkpoint store for azure storage blobs. (#2851)
Co-authored-by: Heath Stewart <[email protected]>
1 parent 00438a7 commit 98fb02a

File tree

23 files changed

+1421
-40
lines changed

23 files changed

+1421
-40
lines changed

.vscode/cspell.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"azsdk",
3333
"azurecli",
3434
"bugbug",
35+
"checkpointstore",
3536
"clippy",
3637
"contoso",
3738
"cplusplus",
@@ -210,4 +211,4 @@
210211
]
211212
}
212213
]
213-
}
214+
}

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ members = [
1313
"sdk/cosmos/azure_data_cosmos",
1414
"sdk/identity/azure_identity",
1515
"sdk/eventhubs/azure_messaging_eventhubs",
16+
"sdk/eventhubs/azure_messaging_eventhubs_checkpointstore_blob",
1617
"sdk/keyvault/azure_security_keyvault_certificates",
1718
"sdk/keyvault/azure_security_keyvault_keys",
1819
"sdk/keyvault/azure_security_keyvault_secrets",
@@ -54,14 +55,18 @@ path = "sdk/core/azure_core"
5455
version = "0.2.0"
5556
path = "sdk/core/azure_core_macros"
5657

57-
[workspace.dependencies.azure_core_opentelemetry]
58-
version = "0.2.0"
59-
path = "sdk/core/azure_core_opentelemetry"
60-
6158
[workspace.dependencies.azure_core_amqp]
6259
version = "0.7.0"
6360
path = "sdk/core/azure_core_amqp"
6461

62+
[workspace.dependencies.azure_messaging_eventhubs]
63+
version = "0.7.0"
64+
path = "sdk/eventhubs/azure_messaging_eventhubs"
65+
66+
[workspace.dependencies.azure_core_opentelemetry]
67+
# azure_core_opentelemetry should only ever be in dev-dependencies herein
68+
path = "sdk/core/azure_core_opentelemetry"
69+
6570
[workspace.dependencies.azure_core_test]
6671
# azure_core_test is not published and only ever a dev-dependency
6772
path = "sdk/core/azure_core_test"
@@ -74,6 +79,10 @@ path = "sdk/core/azure_core_test_macros"
7479
# azure_identity should only ever be in dev-dependencies herein
7580
path = "sdk/identity/azure_identity"
7681

82+
[workspace.dependencies.azure_storage_blob]
83+
path = "sdk/storage/azure_storage_blob"
84+
version = "0.5.0"
85+
7786
[workspace.dependencies]
7887
async-lock = "3.0"
7988
async-stream = { version = "0.3.5" }

sdk/core/azure_core_test/src/recording.rs

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,16 @@ impl Recording {
333333
return value;
334334
}
335335

336-
let mut variables = self.variables.write().map_err(write_lock_error).ok()?;
337-
variables.insert(key.into(), Value::from(value.as_ref(), options));
338-
value
336+
match value {
337+
None => None,
338+
Some(v) if v.is_empty() => None,
339+
Some(v) => {
340+
let v = Some(v);
341+
let mut variables = self.variables.write().map_err(write_lock_error).ok()?;
342+
variables.insert(key.into(), Value::from(v.as_ref(), options));
343+
v
344+
}
345+
}
339346
}
340347
}
341348

sdk/eventhubs/.dict.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,5 @@ consts
33
eventdata
44
mgmt
55
rustc
6+
myapp
7+
yourcontainername

sdk/eventhubs/assets.json

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"AssetsRepo": "Azure/azure-sdk-assets",
3+
"AssetsRepoPrefixPath": "rust",
4+
"TagPrefix": "rust/eventhubs",
5+
"Tag": "rust/eventhubs_8405b278af"
6+
}

sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ license.workspace = true
1111
repository.workspace = true
1212
homepage = "https://github.com/azure/azure-sdk-for-rust"
1313
documentation = "https://docs.rs/azure_messaging_eventhubs"
14-
1514
keywords = ["sdk", "azure", "messaging", "cloud", "eventhubs"]
1615
categories = ["api-bindings"]
1716

@@ -44,7 +43,6 @@ tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
4443

4544
[features]
4645
in_memory_checkpoint_store = []
47-
blob_checkpoint_store = []
4846

4947
[[bench]]
5048
name = "benchmarks"

sdk/eventhubs/azure_messaging_eventhubs/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
# Azure Event Hubs client library for Rust
44

5-
[Azure Event Hubs](https://azure.microsoft.com/services/event-hubs/) is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see: [link](https://learn.microsoft.com/azure/event-hubs/event-hubs-about).
5+
[Azure Event Hubs](https://azure.microsoft.com/services/event-hubs/) is a big data streaming platform and event ingestion service from Microsoft. For more information about Event Hubs see [this link](https://learn.microsoft.com/azure/event-hubs/event-hubs-about).
66

77
The Azure Event Hubs client library allows you to send single events or batches of events to an event hub and consume events from an event hub.
88

@@ -20,7 +20,7 @@ cargo add azure_messaging_eventhubs
2020

2121
### Prerequisites
2222

23-
* A Rust Compiler. See [here](https://www.rust-lang.org/tools/install) for installation instructions.
23+
* A Rust Compiler. See [the rust compiler installation instructions](https://www.rust-lang.org/tools/install).
2424
* An [Azure subscription]
2525
* The [Azure CLI]
2626
* An [Event Hub namespace](https://learn.microsoft.com/azure/event-hubs/).
@@ -105,7 +105,7 @@ Consuming events is done using an `EventReceiver`, which can be opened from the
105105
The Processor is useful when you want to have the partition assignment be dynamically chosen, and balanced with other Processor instances.
106106
-->
107107

108-
More information about Event Hubs features and terminology can be found here: [link](https://learn.microsoft.com/azure/event-hubs/event-hubs-features)
108+
More information about Event Hubs features and terminology can be found at the [Event Hubs features documentation]](<https://learn.microsoft.com/azure/event-hubs/event-hubs-features>)
109109

110110
## Examples
111111

sdk/eventhubs/azure_messaging_eventhubs/src/common/authorizer.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -648,10 +648,9 @@ mod tests {
648648
// Verify that the token get count has increased, indicating a single refresh was attempted - we refreshed token_refresh_1 but not token_refresh_2.
649649
let final_count = mock_credential.get_token_get_count();
650650
debug!("After sleeping the first time, token count: {final_count}");
651-
assert_eq!(
652-
final_count, 3,
653-
"Expected first get token count to be 3, but got {}",
654-
final_count
651+
assert!(
652+
final_count >= 3,
653+
"Expected first get token count to be 3, but got {final_count}"
655654
);
656655

657656
info!("First token expiration get count: {}", final_count);
@@ -664,8 +663,8 @@ mod tests {
664663
// Verify that the token get count has increased, indicating a single refresh was attempted - we refreshed token_refresh_2.
665664
let final_count = mock_credential.get_token_get_count();
666665
debug!("Getting second token count: {final_count}");
667-
assert_eq!(
668-
final_count, 4,
666+
assert!(
667+
final_count >= 4,
669668
"Expected second get token count to be 4, but got {final_count}"
670669
);
671670
info!("Second token expiration get count: {}", final_count);

sdk/eventhubs/azure_messaging_eventhubs/src/event_processor/load_balancer.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,7 @@ pub(crate) mod tests {
378378
event_processor::Ownership, in_memory_checkpoint_store::InMemoryCheckpointStore,
379379
models::ConsumerClientDetails, CheckpointStore,
380380
};
381-
use azure_core::Result;
381+
use azure_core::{time::OffsetDateTime, Result};
382382
use azure_core_test::{recorded, TestContext};
383383
use tracing::info;
384384

@@ -665,7 +665,7 @@ pub(crate) mod tests {
665665
.etag
666666
.clone();
667667
let mut ownership = ownership.clone();
668-
ownership.last_modified_time = Some(SystemTime::now() - Duration::seconds(3600));
668+
ownership.last_modified_time = Some(OffsetDateTime::now_utc() - Duration::seconds(3600));
669669
ownership.etag = etag;
670670
checkpoint_store.claim_ownership(&[ownership]).await?;
671671
Ok(())

0 commit comments

Comments
 (0)