Skip to content

Commit 4fca682

Browse files
Adding framework for perf testing, samples, README.md for JedisRedisCheckpointStore
* Adding framework for perf testing, samples, README.md for JedisRedisCheckpointStore
1 parent c84b21f commit 4fca682

File tree

12 files changed

+589
-14
lines changed

12 files changed

+589
-14
lines changed

eng/code-quality-reports/src/main/resources/revapi/revapi.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,6 +365,18 @@
365365
"new": "parameter <T> reactor.core.publisher.Mono<T> com.azure.spring.data.cosmos.core.ReactiveCosmosTemplate::insert(java.lang.String, ===T===, com.azure.cosmos.models.PartitionKey)",
366366
"parameterIndex": "1",
367367
"justification": "To support mono method chaining, without explicit typcast for upper bounded generics"
368+
},
369+
{
370+
"regex": true,
371+
"code": "java.class.externalClassExposedInAPI",
372+
"new": "(interface|class|enum) redis\\.clients\\.jedis\\..*",
373+
"justification": "To support the EventHubs JedisRedisCheckpointStore constructor"
374+
},
375+
{
376+
"regex": true,
377+
"code": "java.class.externalClassExposedInAPI",
378+
"new": "class org\\.json\\.JSON(Array|Exception|Object|Pointer|PointerException|Tokener)",
379+
"justification": "To support the EventHubs JedisRedisCheckpointStore constructor"
368380
}
369381
]
370382
}
Lines changed: 164 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,176 @@
1-
# Azure Event Hubs Checkpoint Store using Redis client library for Java
1+
# Azure Event Hubs Checkpoint Store client library for Java using the Jedis Client Library for Redis
2+
3+
Azure Event Hubs Checkpoint Store can be used for storing checkpoints while processing events from Azure Event Hubs.
4+
This package makes use of Redis as a persistent store for maintaining checkpoints and partition ownership information.
5+
The `JedisRedisCheckpointStore` provided in this package can be plugged in to `EventProcessor`.
6+
7+
[Source code][source_code]| [API reference documentation][api_documentation] | [Product
8+
documentation][event_hubs_product_docs] | [Samples][sample_examples]
29

310
## Getting started
411

12+
### Prerequisites
13+
14+
- A [Java Development Kit (JDK)][jdk_link], version 8 or later.
15+
- [Maven][maven]
16+
- Microsoft Azure subscription
17+
- You can create a free account at: [https://azure.microsoft.com](https://azure.microsoft.com)
18+
- Azure Event Hubs instance
19+
- Step-by-step guide for [creating an Event Hub using the Azure Portal][event_hubs_create]
20+
- Azure Redis Cache or a suitable alternative Redis server
21+
- Step-by-step guide for [creating a Redis Cache using the Azure Portal][redis_cache]
22+
23+
### Include the package
24+
#### Include the BOM file
25+
26+
Please include the azure-sdk-bom to your project to take dependency on the General Availability (GA) version of the library. In the following snippet, replace the {bom_version_to_target} placeholder with the version number.
27+
To learn more about the BOM, see the [AZURE SDK BOM README](https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/boms/azure-sdk-bom/README.md).
28+
29+
```xml
30+
<dependencyManagement>
31+
<dependencies>
32+
<dependency>
33+
<groupId>com.azure</groupId>
34+
<artifactId>azure-sdk-bom</artifactId>
35+
<version>{bom_version_to_target}</version>
36+
<type>pom</type>
37+
<scope>import</scope>
38+
</dependency>
39+
</dependencies>
40+
</dependencyManagement>
41+
```
42+
and then include the direct dependency in the dependencies section without the version tag as shown below.
43+
44+
```xml
45+
<dependencies>
46+
<dependency>
47+
<groupId>com.azure</groupId>
48+
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
49+
</dependency>
50+
</dependencies>
51+
```
52+
53+
#### Include direct dependency
54+
If you want to take dependency on a particular version of the library that is not present in the BOM,
55+
add the direct dependency to your project as follows.
56+
57+
[//]: # ({x-version-update-start;com.azure:azure-messaging-eventhubs-checkpointstore-jedis;current})
58+
```xml
59+
<dependency>
60+
<groupId>com.azure</groupId>
61+
<artifactId>azure-messaging-eventhubs-checkpointstore-jedis</artifactId>
62+
<version>1.0.0-beta.1</version>
63+
</dependency>
64+
```
65+
[//]: # ({x-version-update-end})
66+
67+
### Authenticate the storage container client
68+
69+
In order to create an instance of `JedisCheckpointStore`, a `JedisPool` object must be created. To make this `JedisPool` object, a hostname String and a primary key String are required. These can be used as shown below to create a `JedisPool` object.
70+
71+
572
## Key concepts
673

74+
Key concepts are explained in detail [here][key_concepts].
75+
776
## Examples
77+
- [Create and run an instance of JedisRedisCheckpointStore][sample_jedis_client]
78+
- [Consume events from all Event Hub partitions][sample_event_processor]
79+
80+
### Create an instance of JedisPool with Azure Redis Cache
81+
82+
```java
83+
String hostname = "yourHostName.redis.cache.windows.net";
84+
85+
String password = "<PRIMARY KEY FOR AZURE REDIS CACHE>";
86+
87+
String name = "<NAME OF THE USER CLIENT>"; //this can also be a default value as the connection of Redis Cache is not dependent on this value
88+
89+
JedisPool jedisPool = new JedisPool(poolConfig, hostname, port, 1000, 1000, password, Protocol.DEFAULT_DATABASE, name, true, null, null, null);
90+
```
91+
92+
### Consume events using an Event Processor Client
93+
94+
To consume events for all partitions of an Event Hub, you'll create an
95+
[`EventProcessorClient`][source_eventprocessorclient] for a specific consumer group. When an Event Hub is created, it
96+
provides a default consumer group that can be used to get started.
97+
98+
The [`EventProcessorClient`][source_eventprocessorclient] will delegate processing of events to a callback function that you
99+
provide, allowing you to focus on the logic needed to provide value while the processor holds responsibility for
100+
managing the underlying consumer operations.
101+
102+
In our example, we will focus on building the [`EventProcessor`][source_eventprocessorclient], use the
103+
[`JedisRedisCheckpointStore`][source_jedisredischeckpointstore], and a simple callback function to process the events
104+
received from the Event Hubs, writes to console and updates the checkpoint in Blob storage after each event.
105+
106+
```java
107+
JedisPool jedisPool = new JedisPool(poolConfig, hostname, port, 1000, 1000, password, Protocol.DEFAULT_DATABASE, name, true, null, null, null);
108+
109+
EventProcessorClient eventProcessorClient = new EventProcessorClientBuilder()
110+
.consumerGroup("<< CONSUMER GROUP NAME >>")
111+
.connectionString("<< EVENT HUB CONNECTION STRING >>")
112+
.checkpointStore(new JedisRedisCheckpointStore(jedisPool))
113+
.processEvent(eventContext -> {
114+
System.out.println("Partition id = " + eventContext.getPartitionContext().getPartitionId() + " and "
115+
+ "sequence number of event = " + eventContext.getEventData().getSequenceNumber());
116+
})
117+
.processError(errorContext -> {
118+
System.out.println("Error occurred while processing events " + errorContext.getThrowable().getMessage());
119+
})
120+
.buildEventProcessorClient();
121+
122+
// This will start the processor. It will start processing events from all partitions.
123+
eventProcessorClient.start();
124+
125+
// (for demo purposes only - adding sleep to wait for receiving events)
126+
TimeUnit.SECONDS.sleep(5);
127+
128+
// When the user wishes to stop processing events, they can call `stop()`.
129+
eventProcessorClient.stop();
130+
```
8131

9132
## Troubleshooting
10133

134+
### Enable client logging
135+
136+
Azure SDK for Java offers a consistent logging story to help aid in troubleshooting application errors and expedite
137+
their resolution. The logs produced will capture the flow of an application before reaching the terminal state to help
138+
locate the root issue. View the [logging][logging] wiki for guidance about enabling logging.
139+
140+
### Default SSL library
141+
142+
All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL
143+
operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides
144+
better performance compared to the default SSL implementation within the JDK. For more information, including how to
145+
reduce the dependency size, refer to the [performance tuning][performance_tuning] section of the wiki.
146+
11147
## Next steps
12148

149+
Get started by exploring the samples [here][samples_readme].
150+
13151
## Contributing
152+
153+
If you would like to become an active contributor to this project please refer to our [Contribution
154+
Guidelines](https://github.com/Azure/azure-sdk-for-java/blob/main/CONTRIBUTING.md) for more information.
155+
156+
<!-- Links -->
157+
[api_documentation]: https://azure.github.io/azure-sdk-for-java
158+
[event_hubs_create]: https://docs.microsoft.com/azure/event-hubs/event-hubs-create
159+
[event_hubs_product_docs]: https://docs.microsoft.com/azure/event-hubs/
160+
[java_8_sdk_javadocs]: https://docs.oracle.com/javase/8/docs/api/java/util/logging/package-summary.html
161+
[jdk_link]: https://docs.microsoft.com/java/azure/jdk/?view=azure-java-stable
162+
[key_concepts]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md#key-concepts
163+
[logging]: https://github.com/Azure/azure-sdk-for-java/wiki/Logging-with-Azure-SDK
164+
[maven]: https://maven.apache.org/
165+
[performance_tuning]: https://github.com/Azure/azure-sdk-for-java/wiki/Performance-Tuning
166+
[redis_cache]: https://docs.microsoft.com/azure/azure-cache-for-redis/cache-configure
167+
[samples_readme]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
168+
[//]: # ([sample_jedis_client]: ttps://github.com/Azure/azure-sdk-for-java/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis/JedisRedisCheckpointStoreSample.java)
169+
[//]: # ([sample_event_processor]: https://github.com/Azure/azure-sdk-for-java/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis/EventProcessorJedisRedisCheckpointStoreSample.java)
170+
[//]: # ([sample_examples]: https://github.com/Azure/azure-sdk-for-java/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/samples/java/com/azure/messaging/eventhubs/checkpointstore/jedis)
171+
[sample_jedis_client]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
172+
[sample_event_processor]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
173+
[sample_examples]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
174+
[source_code]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
175+
[source_eventprocessorclient]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventProcessorClient.java
176+
[source_jedisredischeckpointstore]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
<groupId>org.mockito</groupId>
7979
<artifactId>mockito-inline</artifactId>
8080
<version>4.5.1</version> <!-- {x-version-update;org.mockito:mockito-inline;external_dependency} -->
81+
<scope>test</scope>
8182
</dependency>
8283
</dependencies>
8384

sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/src/main/java/com/azure/messaging/eventhubs/checkpointstore/jedis/JedisRedisCheckpointStore.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,18 @@ public class JedisRedisCheckpointStore implements CheckpointStore {
3434
static final byte[] PARTITION_OWNERSHIP = "partitionOwnership".getBytes(StandardCharsets.UTF_8);
3535
private final JedisPool jedisPool;
3636

37-
JedisRedisCheckpointStore(JedisPool jedisPool) {
37+
/**
38+
* Constructor for JedisRedisCheckpointStore
39+
*
40+
* @param jedisPool a JedisPool object that creates a pool connected to the Azure Redis Cache
41+
* @throws IllegalArgumentException thrown when JedisPool object supplied is null
42+
*/
43+
public JedisRedisCheckpointStore(JedisPool jedisPool) throws IllegalArgumentException {
44+
if (jedisPool == null) {
45+
throw LOGGER.logExceptionAsError(Exceptions
46+
.propagate(new IllegalArgumentException(
47+
"JedisPool object supplied to constructor is null.")));
48+
}
3849
this.jedisPool = jedisPool;
3950
}
4051

@@ -48,33 +59,40 @@ public class JedisRedisCheckpointStore implements CheckpointStore {
4859
public Flux<PartitionOwnership> claimOwnership(List<PartitionOwnership> requestedPartitionOwnerships) {
4960

5061
return Flux.fromIterable(requestedPartitionOwnerships).handle(((partitionOwnership, sink) -> {
51-
5262
String partitionId = partitionOwnership.getPartitionId();
5363
byte[] key = keyBuilder(partitionOwnership.getFullyQualifiedNamespace(), partitionOwnership.getEventHubName(), partitionOwnership.getConsumerGroup(), partitionId);
5464

5565
try (Jedis jedis = jedisPool.getResource()) {
66+
5667
List<byte[]> keyInformation = jedis.hmget(key, PARTITION_OWNERSHIP);
5768
byte[] currentPartitionOwnership = keyInformation.get(0);
5869

5970
if (currentPartitionOwnership == null) {
6071
// if PARTITION_OWNERSHIP field does not exist for member we will get a null, and we must add the field
72+
Long lastModifiedTimeSeconds = Long.parseLong(jedis.time().get(0));
73+
partitionOwnership.setLastModifiedTime(lastModifiedTimeSeconds);
6174
jedis.hset(key, PARTITION_OWNERSHIP, DEFAULT_SERIALIZER.serializeToBytes(partitionOwnership));
75+
sink.next(partitionOwnership);
76+
sink.complete();
6277
} else {
6378
// otherwise we have to change the ownership and "watch" the transaction
6479
jedis.watch(key);
65-
80+
Long lastModifiedTimeSeconds = Long.parseLong(jedis.time().get(0)) - jedis.objectIdletime(key);
81+
partitionOwnership.setLastModifiedTime(lastModifiedTimeSeconds);
82+
partitionOwnership.setETag("default eTag");
6683
Transaction transaction = jedis.multi();
6784
transaction.hset(key, PARTITION_OWNERSHIP, DEFAULT_SERIALIZER.serializeToBytes(partitionOwnership));
6885
List<Object> executionResponse = transaction.exec();
6986

7087
if (executionResponse == null) {
7188
//This means that the transaction did not execute, which implies that another client has changed the ownership during this transaction
72-
sink.error(new RuntimeException("Ownership records were changed by another client"));
89+
LOGGER.verbose("Unable to claim partition with id: " + partitionId);
90+
} else {
91+
sink.next(partitionOwnership);
92+
sink.complete();
7393
}
7494
}
75-
jedisPool.returnResource(jedis);
7695
}
77-
sink.next(partitionOwnership);
7896
}));
7997
}
8098

@@ -96,7 +114,6 @@ public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String e
96114
Set<byte[]> members = jedis.smembers(prefix);
97115

98116
if (members.isEmpty()) {
99-
jedisPool.returnResource(jedis);
100117
return Flux.fromIterable(listStoredCheckpoints);
101118
}
102119
for (byte[] member : members) {
@@ -116,7 +133,6 @@ public Flux<Checkpoint> listCheckpoints(String fullyQualifiedNamespace, String e
116133
LOGGER.verbose("No checkpoint persists yet.");
117134
}
118135
}
119-
jedisPool.returnResource(jedis);
120136
return Flux.fromIterable(listStoredCheckpoints);
121137
}
122138
}
@@ -138,7 +154,6 @@ public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, St
138154
ArrayList<PartitionOwnership> listStoredOwnerships = new ArrayList<>();
139155

140156
if (members.isEmpty()) {
141-
jedisPool.returnResource(jedis);
142157
return Flux.fromIterable(listStoredOwnerships);
143158
}
144159
for (byte[] member : members) {
@@ -157,7 +172,6 @@ public Flux<PartitionOwnership> listOwnership(String fullyQualifiedNamespace, St
157172
listStoredOwnerships.add(partitionOwnership);
158173
}
159174
}
160-
jedisPool.returnResource(jedis);
161175
return Flux.fromIterable(listStoredOwnerships);
162176
}
163177
}
@@ -188,8 +202,6 @@ public Mono<Void> updateCheckpoint(Checkpoint checkpoint) {
188202
//Case 2: checkpoint already exists in Redis cache
189203
jedis.hset(key, CHECKPOINT, DEFAULT_SERIALIZER.serializeToBytes(checkpoint));
190204
}
191-
192-
jedisPool.returnResource(jedis);
193205
}
194206
});
195207
}
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
---
2+
page_type: sample
3+
languages:
4+
- java
5+
products:
6+
- azure
7+
- azure-event-hubs
8+
- azure-cache-redis
9+
urlFragment: eventhubs-checkpoint-store-jedis-samples
10+
---
11+
12+
# Azure Event Hubs Checkpoint Store client library samples for Java
13+
14+
Azure Event Hubs Checkpoint Store samples are a set of self-contained Java programs that demonstrate interacting
15+
with Azure Event Hubs Checkpoint Store using the client library.
16+
17+
## Key concepts
18+
Key concepts are explained in detail [here][sdk_readme_key_concepts].
19+
20+
## Getting started
21+
Please refer to the [Getting Started][sdk_readme_getting_started] section.
22+
23+
## Examples
24+
- [Create and run an instance of JedisRedisCheckpointStore][sample_jedis_client]
25+
- [Consume events from all Event Hub partitions][sample_event_processor]
26+
27+
## Troubleshooting
28+
See [Troubleshooting][sdk_readme_troubleshooting].
29+
30+
## Next steps
31+
See [Next steps][sdk_readme_next_steps].
32+
33+
## Contributing
34+
35+
If you would like to become an active contributor to this project please refer to our [Contribution
36+
Guidelines](https://github.com/Azure/azure-sdk-for-java/blob/main/CONTRIBUTING.md) for more information.
37+
38+
<!-- Links -->
39+
[sample_jedis_client]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
40+
[sample_event_processor]: https://github.com/Azure/azure-sdk-for-java/tree/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis
41+
[sdk_readme_getting_started]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/README.md#getting-started
42+
[sdk_readme_key_concepts]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-blob/README.md#key-concepts
43+
[sdk_readme_next_steps]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/README.md#next-steps
44+
[sdk_readme_troubleshooting]: https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/eventhubs/azure-messaging-eventhubs-checkpointstore-jedis/README.md#troubleshooting)

0 commit comments

Comments
 (0)