Skip to content

Commit d6d58ec

Browse files
committed
migrate web rabbitmq to azure service bus
1 parent f0fb5f9 commit d6d58ec

File tree

7 files changed

+106
-84
lines changed

7 files changed

+106
-84
lines changed

asset-manager/web/pom.xml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
<properties>
1313
<aws-sdk.version>2.25.13</aws-sdk.version>
14+
<version.spring.cloud.azure>5.19.0</version.spring.cloud.azure>
1415
</properties>
1516

1617
<artifactId>assets-manager-web</artifactId>
@@ -27,8 +28,12 @@
2728
<artifactId>spring-boot-starter-web</artifactId>
2829
</dependency>
2930
<dependency>
30-
<groupId>org.springframework.boot</groupId>
31-
<artifactId>spring-boot-starter-amqp</artifactId>
31+
<groupId>com.azure.spring</groupId>
32+
<artifactId>spring-cloud-azure-starter</artifactId>
33+
</dependency>
34+
<dependency>
35+
<groupId>com.azure.spring</groupId>
36+
<artifactId>spring-messaging-azure-servicebus</artifactId>
3237
</dependency>
3338
<dependency>
3439
<groupId>com.azure</groupId>
@@ -79,6 +84,13 @@
7984
<artifactId>azure-storage-blob-batch</artifactId>
8085
<version>12.25.0</version>
8186
</dependency>
87+
<dependency>
88+
<groupId>com.azure.spring</groupId>
89+
<artifactId>spring-cloud-azure-dependencies</artifactId>
90+
<version>${version.spring.cloud.azure}</version>
91+
<type>pom</type>
92+
<scope>import</scope>
93+
</dependency>
8294
</dependencies>
8395
</dependencyManagement>
8496

asset-manager/web/src/main/java/com/microsoft/migration/assets/AssetsManagerApplication.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
package com.microsoft.migration.assets;
22

3-
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
3+
import com.azure.spring.messaging.implementation.annotation.EnableAzureMessaging;
44
import org.springframework.boot.SpringApplication;
55
import org.springframework.boot.autoconfigure.SpringBootApplication;
66
import org.springframework.boot.context.ApplicationPidFileWriter;
77

88
@SpringBootApplication
9-
@EnableRabbit
9+
@EnableAzureMessaging
1010
public class AssetsManagerApplication {
1111
public static void main(String[] args) {
1212
SpringApplication application = new SpringApplication(AssetsManagerApplication.class);
Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,84 @@
11
package com.microsoft.migration.assets.config;
22

3-
import org.springframework.amqp.core.AcknowledgeMode;
4-
import org.springframework.amqp.core.Binding;
5-
import org.springframework.amqp.core.BindingBuilder;
6-
import org.springframework.amqp.core.DirectExchange;
7-
import org.springframework.amqp.core.Queue;
8-
import org.springframework.amqp.core.QueueBuilder;
9-
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
10-
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
11-
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
12-
import org.springframework.amqp.support.converter.MessageConverter;
13-
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
3+
import com.azure.core.credential.TokenCredential;
4+
import com.azure.core.exception.ResourceExistsException;
5+
import com.azure.core.exception.ResourceNotFoundException;
6+
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClient;
7+
import com.azure.messaging.servicebus.administration.ServiceBusAdministrationClientBuilder;
8+
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions;
9+
import com.azure.messaging.servicebus.administration.models.QueueProperties;
10+
import com.azure.spring.cloud.autoconfigure.implementation.servicebus.properties.AzureServiceBusProperties;
11+
import com.azure.spring.messaging.ConsumerIdentifier;
12+
import com.azure.spring.messaging.PropertiesSupplier;
13+
import com.azure.spring.messaging.servicebus.core.properties.ProcessorProperties;
1414
import org.springframework.context.annotation.Bean;
1515
import org.springframework.context.annotation.Configuration;
1616

17+
import java.time.Duration;
18+
1719
@Configuration
1820
public class RabbitConfig {
1921
public static final String IMAGE_PROCESSING_QUEUE = "image-processing";
20-
21-
// Dead letter queue configuration for the retry mechanism
22-
public static final String RETRY_EXCHANGE = "image-processing.retry";
23-
public static final String RETRY_QUEUE = "image-processing.retry";
24-
public static final String RETRY_ROUTING_KEY = "retry";
25-
public static final int RETRY_DELAY_MS = 60000; // 1 minute delay
26-
27-
@Bean
28-
public Queue imageProcessingQueue() {
29-
return QueueBuilder.durable(IMAGE_PROCESSING_QUEUE
30-
)
31-
.withArgument("x-dead-letter-exchange", RETRY_EXCHANGE)
32-
.withArgument("x-dead-letter-routing-key", RETRY_ROUTING_KEY)
33-
.build();
34-
}
35-
36-
@Bean
37-
public Queue retryQueue() {
38-
return QueueBuilder.durable(RETRY_QUEUE)
39-
.withArgument("x-dead-letter-exchange", "")
40-
.withArgument("x-dead-letter-routing-key", IMAGE_PROCESSING_QUEUE
41-
)
42-
.withArgument("x-message-ttl", RETRY_DELAY_MS)
43-
.build();
44-
}
22+
public static final String RETRY_QUEUE = "retry-queue";
23+
public static final Duration RETRY_QUEUE_TTL = Duration.ofMinutes(1);
4524

4625
@Bean
47-
public DirectExchange retryExchange() {
48-
return new DirectExchange(RETRY_EXCHANGE);
26+
public ServiceBusAdministrationClient adminClient(AzureServiceBusProperties properties, TokenCredential credential) {
27+
return new ServiceBusAdministrationClientBuilder()
28+
.credential(properties.getFullyQualifiedNamespace(), credential)
29+
.buildClient();
4930
}
5031

5132
@Bean
52-
public Binding retryBinding() {
53-
return BindingBuilder
54-
.bind(retryQueue())
55-
.to(retryExchange())
56-
.with(RETRY_ROUTING_KEY);
33+
public QueueProperties retryQueue(ServiceBusAdministrationClient adminClient) {
34+
try {
35+
return adminClient.getQueue(RETRY_QUEUE);
36+
} catch (ResourceNotFoundException e) {
37+
try {
38+
CreateQueueOptions options = new CreateQueueOptions()
39+
.setDefaultMessageTimeToLive(RETRY_QUEUE_TTL)
40+
.setDeadLetteringOnMessageExpiration(true);
41+
return adminClient.createQueue(RETRY_QUEUE, options);
42+
} catch (ResourceExistsException ex) {
43+
// Queue was created by another instance in the meantime
44+
return adminClient.getQueue(RETRY_QUEUE);
45+
}
46+
}
5747
}
5848

5949
@Bean
60-
public MessageConverter jsonMessageConverter() {
61-
return new Jackson2JsonMessageConverter();
50+
public QueueProperties imageProcessingQueue(ServiceBusAdministrationClient adminClient, QueueProperties retryQueue) {
51+
QueueProperties queue;
52+
try {
53+
queue = adminClient.getQueue(IMAGE_PROCESSING_QUEUE);
54+
} catch (ResourceNotFoundException e) {
55+
try {
56+
CreateQueueOptions options = new CreateQueueOptions()
57+
.setForwardDeadLetteredMessagesTo(RETRY_QUEUE);
58+
queue = adminClient.createQueue(IMAGE_PROCESSING_QUEUE, options);
59+
} catch (ResourceExistsException ex) {
60+
// Queue was created by another instance in the meantime
61+
queue = adminClient.getQueue(IMAGE_PROCESSING_QUEUE);
62+
}
63+
}
64+
65+
// Configure retry queue's DLQ forwarding now that image processing queue exists
66+
try {
67+
retryQueue.setForwardDeadLetteredMessagesTo(IMAGE_PROCESSING_QUEUE);
68+
adminClient.updateQueue(retryQueue);
69+
} catch (Exception ex) {
70+
// Ignore update errors since basic functionality will still work
71+
}
72+
73+
return queue;
6274
}
6375

6476
@Bean
65-
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
66-
ConnectionFactory connectionFactory,
67-
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
68-
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
69-
configurer.configure(factory, connectionFactory);
70-
factory.setMessageConverter(jsonMessageConverter());
71-
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
72-
factory.setDefaultRequeueRejected(false);
73-
return factory;
77+
public PropertiesSupplier<ConsumerIdentifier, ProcessorProperties> propertiesSupplier() {
78+
return identifier -> {
79+
ProcessorProperties processorProperties = new ProcessorProperties();
80+
processorProperties.setAutoComplete(false);
81+
return processorProperties;
82+
};
7483
}
7584
}

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/AwsS3Service.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
import com.microsoft.migration.assets.model.S3StorageItem;
1212
import com.microsoft.migration.assets.repository.ImageMetadataRepository;
1313
import lombok.RequiredArgsConstructor;
14-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
14+
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
15+
import org.springframework.messaging.support.MessageBuilder;
1516
import org.springframework.beans.factory.annotation.Value;
1617
import org.springframework.context.annotation.Profile;
1718
import org.springframework.stereotype.Service;
@@ -32,7 +33,7 @@
3233
public class AwsS3Service implements StorageService {
3334

3435
private final BlobServiceClient blobServiceClient;
35-
private final RabbitTemplate rabbitTemplate;
36+
private final ServiceBusTemplate serviceBusTemplate;
3637
private final ImageMetadataRepository imageMetadataRepository;
3738

3839
@Value("${azure.storage.blob.container-name}")
@@ -77,7 +78,7 @@ public void uploadObject(MultipartFile file) throws IOException {
7778
getStorageType(),
7879
file.getSize()
7980
);
80-
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
81+
serviceBusTemplate.send(IMAGE_PROCESSING_QUEUE, MessageBuilder.withPayload(message).build());
8182

8283
// Create and save metadata to database
8384
ImageMetadata metadata = new ImageMetadata();
@@ -87,7 +88,7 @@ public void uploadObject(MultipartFile file) throws IOException {
8788
metadata.setSize(file.getSize());
8889
metadata.setS3Key(key);
8990
metadata.setS3Url(generateUrl(key));
90-
91+
9192
imageMetadataRepository.save(metadata);
9293
}
9394

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/BackupMessageProcessor.java

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,16 @@
11
package com.microsoft.migration.assets.service;
22

33
import com.microsoft.migration.assets.model.ImageProcessingMessage;
4-
import com.rabbitmq.client.Channel;
4+
import com.azure.spring.messaging.servicebus.implementation.core.annotation.ServiceBusListener;
5+
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
6+
import com.azure.spring.messaging.servicebus.support.ServiceBusMessageHeaders;
57
import lombok.extern.slf4j.Slf4j;
6-
import org.springframework.amqp.rabbit.annotation.RabbitListener;
7-
import org.springframework.amqp.support.AmqpHeaders;
88
import org.springframework.messaging.handler.annotation.Header;
99
import org.springframework.context.annotation.Profile;
1010
import org.springframework.stereotype.Component;
1111

1212
import static com.microsoft.migration.assets.config.RabbitConfig.IMAGE_PROCESSING_QUEUE;
1313

14-
import java.io.IOException;
1514

1615
/**
1716
* A backup message processor that serves as a monitoring and logging service.
@@ -25,28 +24,28 @@ public class BackupMessageProcessor {
2524

2625
/**
2726
* Processes image messages from a backup queue for monitoring and resilience purposes.
28-
* Uses the same RabbitMQ API pattern as the worker module.
27+
* Uses the same Azure Service Bus API pattern as the worker module.
2928
*/
30-
@RabbitListener(queues = IMAGE_PROCESSING_QUEUE)
29+
@ServiceBusListener(destination = IMAGE_PROCESSING_QUEUE)
3130
public void processBackupMessage(final ImageProcessingMessage message,
32-
Channel channel,
33-
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
31+
@Header(ServiceBusMessageHeaders.RECEIVED_MESSAGE_CONTEXT) ServiceBusReceivedMessageContext context) {
32+
3433
try {
3534
log.info("[BACKUP] Monitoring message: {}", message.getKey());
3635
log.info("[BACKUP] Content type: {}, Storage: {}, Size: {}",
3736
message.getContentType(), message.getStorageType(), message.getSize());
3837

3938
// Acknowledge the message
40-
channel.basicAck(deliveryTag, false);
39+
context.complete();
4140
log.info("[BACKUP] Successfully processed message: {}", message.getKey());
4241
} catch (Exception e) {
4342
log.error("[BACKUP] Failed to process message: " + message.getKey(), e);
4443

4544
try {
46-
// Reject the message and requeue it
47-
channel.basicNack(deliveryTag, false, true);
45+
// Reject the message and dead letter it
46+
context.deadLetter();
4847
log.warn("[BACKUP] Message requeued: {}", message.getKey());
49-
} catch (IOException ackEx) {
48+
} catch (Exception ackEx) {
5049
log.error("[BACKUP] Error handling RabbitMQ acknowledgment: {}", message.getKey(), ackEx);
5150
}
5251
}

asset-manager/web/src/main/java/com/microsoft/migration/assets/service/LocalFileStorageService.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
import com.microsoft.migration.assets.model.S3StorageItem;
55
import org.slf4j.Logger;
66
import org.slf4j.LoggerFactory;
7-
import org.springframework.amqp.rabbit.core.RabbitTemplate;
7+
import com.azure.spring.messaging.servicebus.core.ServiceBusTemplate;
8+
import org.springframework.messaging.support.MessageBuilder;
89
import org.springframework.beans.factory.annotation.Value;
910
import org.springframework.context.annotation.Profile;
1011
import org.springframework.stereotype.Service;
@@ -27,15 +28,15 @@ public class LocalFileStorageService implements StorageService {
2728

2829
private static final Logger logger = LoggerFactory.getLogger(LocalFileStorageService.class);
2930

30-
private final RabbitTemplate rabbitTemplate;
31+
private final ServiceBusTemplate serviceBusTemplate;
3132

3233
@Value("${local.storage.directory:../storage}")
3334
private String storageDirectory;
3435

3536
private Path rootLocation;
3637

37-
public LocalFileStorageService(RabbitTemplate rabbitTemplate) {
38-
this.rabbitTemplate = rabbitTemplate;
38+
public LocalFileStorageService(ServiceBusTemplate serviceBusTemplate) {
39+
this.serviceBusTemplate = serviceBusTemplate;
3940
}
4041

4142
@PostConstruct
@@ -102,7 +103,7 @@ public void uploadObject(MultipartFile file) throws IOException {
102103
getStorageType(),
103104
file.getSize()
104105
);
105-
rabbitTemplate.convertAndSend(IMAGE_PROCESSING_QUEUE, message);
106+
serviceBusTemplate.send(IMAGE_PROCESSING_QUEUE, MessageBuilder.withPayload(message).build());
106107
}
107108

108109
@Override

asset-manager/web/src/main/resources/application.properties

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ azure.storage.blob.container-name=${AZURE_STORAGE_BLOB_CONTAINER_NAME}
88
spring.servlet.multipart.max-file-size=10MB
99
spring.servlet.multipart.max-request-size=10MB
1010

11-
# RabbitMQ Configuration
12-
spring.rabbitmq.host=localhost
13-
spring.rabbitmq.port=5672
14-
spring.rabbitmq.username=guest
15-
spring.rabbitmq.password=guest
11+
#Servicebus
12+
spring.cloud.azure.credential.managed-identity-enabled=true
13+
spring.cloud.azure.credential.client-id=${AZURE_CLIENT_ID}
14+
spring.cloud.azure.servicebus.namespace=${AZURE_SERVICEBUS_NAMESPACE}
15+
spring.cloud.azure.servicebus.entity-type=queue
1616

1717
# Database Configuration
1818
spring.datasource.url=jdbc:postgresql://localhost:5432/assets_manager

0 commit comments

Comments
 (0)