Skip to content

Commit 26aef7d

Browse files
Add service bus partition key sample. (#24)
1 parent 5a34a54 commit 26aef7d

File tree

6 files changed

+230
-5
lines changed

6 files changed

+230
-5
lines changed

servicebus/azure-spring-cloud-stream-binder-servicebus-queue/servicebus-queue-binder/README.md

Lines changed: 91 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -164,23 +164,110 @@ spring:
164164

165165

166166
## Examples
167+
### How to configure the partition key
168+
**Example: Manually set the partition key of the message through application.yml**
167169

168-
1. Run the `mvn spring-boot:run` in the root of the code sample to get the app running.
170+
This example demonstrates how to manually set the partition key for the message in the application.
169171

170-
1. Send a POST request
172+
**Way 1:**
173+
This example requires that `spring.cloud.stream.default.producer.partitionKeyExpression` be set `"'partitionKey-' + headers[<message-header-key>]"`.
174+
```yaml
175+
spring:
176+
cloud:
177+
azure:
178+
servicebus:
179+
connection-string: [servicebus-namespace-connection-string]
180+
stream:
181+
default:
182+
producer:
183+
partitionKeyExpression: "'partitionKey-' + headers[<message-header-key>]"
184+
```
185+
```java
186+
@PostMapping("/messages")
187+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
188+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
189+
many.emitNext(MessageBuilder.withPayload(message)
190+
.setHeader("<message-header-key>", "Customize partirion key")
191+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
192+
return ResponseEntity.ok("Sent!");
193+
}
194+
```
195+
196+
> **NOTE:** When using `application.yml` to configure the partition key, its priority will be the lowest.
197+
> It will take effect only when the `ServiceBusMessageHeaders.SESSION_ID`, `ServiceBusMessageHeaders.PARTITION_KEY`, `AzureHeaders.PARTITION_KEY` are not configured.
198+
199+
**Way 2:**
200+
Manually add the partition Key in the message header by code.
201+
202+
*Recommended:* Use `ServiceBusMessageHeaders.PARTITION_KEY` as the key of the header.
203+
```java
204+
@PostMapping("/messages")
205+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
206+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
207+
many.emitNext(MessageBuilder.withPayload(message)
208+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partirion key")
209+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
210+
return ResponseEntity.ok("Sent!");
211+
}
212+
```
213+
214+
*Not recommended but currently supported:* `AzureHeaders.PARTITION_KEY` as the key of the header.
215+
```java
216+
@PostMapping("/messages")
217+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
218+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
219+
many.emitNext(MessageBuilder.withPayload(message)
220+
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partirion key")
221+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
222+
return ResponseEntity.ok("Sent!");
223+
}
224+
```
225+
> **NOTE:** When both `ServiceBusMessageHeaders.PARTITION_KEY` and `AzureHeaders.PARTITION_KEY` are set in the message headers,
226+
> `ServiceBusMessageHeaders.PARTITION_KEY` is preferred.
227+
228+
### How to configure session id
229+
**Example: Set the session id for the message**
230+
231+
This example demonstrates how to manually set the session id of a message in the application.
232+
233+
```java
234+
@PostMapping("/messages")
235+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
236+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
237+
many.emitNext(MessageBuilder.withPayload(message)
238+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id")
239+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
240+
return ResponseEntity.ok("Sent!");
241+
}
242+
```
243+
244+
> **NOTE:** When the `ServiceBusMessageHeaders.SESSION_ID` is set in the message headers, and a different `ServiceBusMessageHeaders.PARTITION_KEY` (or `AzureHeaders.PARTITION_KEY`) header is also set,
245+
> the value of the session id will eventually be used to overwrite the value of the partition key.
246+
247+
### Run sample
248+
1. Run the `mvn spring-boot:run` in the root of the code sample to get the app running.
249+
250+
2. Send a POST request
171251

172252
$ curl -X POST http://localhost:8080/messages?message=hello
173253

174254
or when the app runs on App Service or VM
175255

176256
$ curl -d -X POST https://[your-app-URL]/messages?message=hello
177257

178-
1. Verify in your app’s logs that a similar message was posted:
258+
3. Verify in your app’s logs that a similar message was posted:
179259

180260
New message received: 'hello'
181261
Message 'hello' successfully checkpointed
182262

183-
1. Delete the resources on [Azure Portal][azure-portal] to avoid unexpected charges.
263+
4. Send a POST request
264+
265+
$ curl -X POST http://localhost:8080/setSessionId?message=hello
266+
267+
5. It will be visible on Azure Portal that sent messages are configured with session id and partition key.
268+
![Azure portal displays the session id.](docs/image-set-session-id.png )
269+
270+
6. Delete the resources on [Azure Portal][azure-portal] to avoid unexpected charges.
184271

185272
## Enhancement
186273
### Set Service Bus message headers
54 KB
Loading

servicebus/azure-spring-cloud-stream-binder-servicebus-queue/servicebus-queue-binder/src/main/java/com/azure/spring/sample/servicebus/queue/binder/ServiceProducerController.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.spring.sample.servicebus.queue.binder;
55

6+
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,6 +33,30 @@ public ResponseEntity<String> sendMessage(@RequestParam String message) {
3233
return ResponseEntity.ok("Sent!");
3334
}
3435

36+
/**
37+
* Set the session id scene.
38+
*/
39+
@PostMapping("/setSessionId")
40+
public ResponseEntity<String> setSessionId(@RequestParam String message) {
41+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
42+
many.emitNext(MessageBuilder.withPayload(message)
43+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "<custom-session-id>")
44+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
45+
return ResponseEntity.ok("Sent!");
46+
}
47+
48+
/**
49+
* Set the ServiceBusMessageHeaders partition key scene.
50+
*/
51+
@PostMapping("/setServiceBusMessageHeadersPartitionKey")
52+
public ResponseEntity<String> setServiceBusMessageHeadersPartitionKey(@RequestParam String message) {
53+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
54+
many.emitNext(MessageBuilder.withPayload(message)
55+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "<custom-partition-key>")
56+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
57+
return ResponseEntity.ok("Sent!");
58+
}
59+
3560
@GetMapping("/")
3661
public String welcome() {
3762
return "welcome";

servicebus/azure-spring-cloud-stream-binder-servicebus-topic/servicebus-topic-binder/README.md

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,87 @@ spring:
165165

166166

167167
## Examples
168+
### How to configure the partition key
169+
**Example: Manually set the partition key of the message through application.yml**
170+
171+
This example demonstrates how to manually set the partition key for the message in the application.
172+
173+
**Way 1:**
174+
This example requires that `spring.cloud.stream.default.producer.partitionKeyExpression` be set `"'partitionKey-' + headers[<message-header-key>]"`.
175+
```yaml
176+
spring:
177+
cloud:
178+
azure:
179+
servicebus:
180+
connection-string: [servicebus-namespace-connection-string]
181+
stream:
182+
default:
183+
producer:
184+
partitionKeyExpression: "'partitionKey-' + headers[<message-header-key>]"
185+
```
186+
```java
187+
@PostMapping("/messages")
188+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
189+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
190+
many.emitNext(MessageBuilder.withPayload(message)
191+
.setHeader("<message-header-key>", "Customize partirion key")
192+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
193+
return ResponseEntity.ok("Sent!");
194+
}
195+
```
196+
197+
> **NOTE:** When using `application.yml` to configure the partition key, its priority will be the lowest.
198+
> It will take effect only when the `ServiceBusMessageHeaders.SESSION_ID`, `ServiceBusMessageHeaders.PARTITION_KEY`, `AzureHeaders.PARTITION_KEY` are not configured.
199+
200+
**Way 2:**
201+
Manually add the partition Key in the message header by code.
202+
203+
*Recommended:* Use `ServiceBusMessageHeaders.PARTITION_KEY` as the key of the header.
204+
```java
205+
@PostMapping("/messages")
206+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
207+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
208+
many.emitNext(MessageBuilder.withPayload(message)
209+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "Customize partirion key")
210+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
211+
return ResponseEntity.ok("Sent!");
212+
}
213+
```
214+
215+
*Not recommended but currently supported:* `AzureHeaders.PARTITION_KEY` as the key of the header.
216+
```java
217+
@PostMapping("/messages")
218+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
219+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
220+
many.emitNext(MessageBuilder.withPayload(message)
221+
.setHeader(AzureHeaders.PARTITION_KEY, "Customize partirion key")
222+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
223+
return ResponseEntity.ok("Sent!");
224+
}
225+
```
226+
> **NOTE:** When both `ServiceBusMessageHeaders.PARTITION_KEY` and `AzureHeaders.PARTITION_KEY` are set in the message headers,
227+
> `ServiceBusMessageHeaders.PARTITION_KEY` is preferred.
228+
229+
### How to configure session id
230+
**Example: Set the session id for the message**
231+
232+
This example demonstrates how to manually set the session id of a message in the application.
233+
234+
```java
235+
@PostMapping("/messages")
236+
public ResponseEntity<String> sendMessage(@RequestParam String message) {
237+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
238+
many.emitNext(MessageBuilder.withPayload(message)
239+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "Customize session id")
240+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
241+
return ResponseEntity.ok("Sent!");
242+
}
243+
```
244+
245+
> **NOTE:** When the `ServiceBusMessageHeaders.SESSION_ID` is set in the message headers, and a different `ServiceBusMessageHeaders.PARTITION_KEY` (or `AzureHeaders.PARTITION_KEY`) header is also set,
246+
> the value of the session id will eventually be used to overwrite the value of the partition key.
247+
248+
### Run sample
168249

169250
1. Run the `mvn spring-boot:run` in the root of the code sample to get the app running.
170251

@@ -181,7 +262,14 @@ spring:
181262
New message received: 'hello'
182263
Message 'hello' successfully checkpointed
183264

184-
1. Delete the resources on [Azure Portal][azure-portal] to avoid unexpected charges.
265+
4. Send a POST request
266+
267+
$ curl -X POST http://localhost:8080/setSessionId?message=hello
268+
269+
5. It will be visible on Azure Portal that sent messages are configured with session id and partition key.
270+
![Azure portal displays the session id.](docs/image-set-session-id.png )
271+
272+
6. Delete the resources on [Azure Portal][azure-portal] to avoid unexpected charges.
185273

186274
## Enhancement
187275
### Set Service Bus message headers
58.1 KB
Loading

servicebus/azure-spring-cloud-stream-binder-servicebus-topic/servicebus-topic-binder/src/main/java/com/azure/spring/sample/servicebus/topic/binder/ServiceProducerController.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
package com.azure.spring.sample.servicebus.topic.binder;
55

6+
import com.azure.spring.integration.servicebus.converter.ServiceBusMessageHeaders;
67
import org.slf4j.Logger;
78
import org.slf4j.LoggerFactory;
89
import org.springframework.beans.factory.annotation.Autowired;
@@ -32,6 +33,30 @@ public ResponseEntity<String> sendMessage(@RequestParam String message) {
3233
return ResponseEntity.ok("Sent!");
3334
}
3435

36+
/**
37+
* Set the session id scene.
38+
*/
39+
@PostMapping("/setSessionId")
40+
public ResponseEntity<String> setSessionId(@RequestParam String message) {
41+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
42+
many.emitNext(MessageBuilder.withPayload(message)
43+
.setHeader(ServiceBusMessageHeaders.SESSION_ID, "<custom-session-id>")
44+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
45+
return ResponseEntity.ok("Sent!");
46+
}
47+
48+
/**
49+
* Set the ServiceBusMessageHeaders partition key scene.
50+
*/
51+
@PostMapping("/setServiceBusMessageHeadersPartitionKey")
52+
public ResponseEntity<String> setServiceBusMessageHeadersPartitionKey(@RequestParam String message) {
53+
LOGGER.info("Going to add message {} to Sinks.Many.", message);
54+
many.emitNext(MessageBuilder.withPayload(message)
55+
.setHeader(ServiceBusMessageHeaders.PARTITION_KEY, "<custom-partition-key>")
56+
.build(), Sinks.EmitFailureHandler.FAIL_FAST);
57+
return ResponseEntity.ok("Sent!");
58+
}
59+
3560
@GetMapping("/")
3661
public String welcome() {
3762
return "welcome";

0 commit comments

Comments
 (0)