34
34
import io .awspring .cloud .sqs .support .observation .SqsListenerObservation ;
35
35
import io .awspring .cloud .sqs .support .observation .SqsTemplateObservation ;
36
36
import io .micrometer .observation .ObservationRegistry ;
37
+ import java .util .concurrent .Executors ;
37
38
import org .springframework .beans .factory .ObjectProvider ;
38
39
import org .springframework .boot .autoconfigure .AutoConfiguration ;
39
40
import org .springframework .boot .autoconfigure .AutoConfigureAfter ;
52
53
import software .amazon .awssdk .services .sqs .batchmanager .SqsAsyncBatchManager ;
53
54
import software .amazon .awssdk .services .sqs .model .Message ;
54
55
55
- import java .util .concurrent .Executors ;
56
-
57
56
/**
58
57
* {@link EnableAutoConfiguration Auto-configuration} for SQS integration.
59
58
*
@@ -84,10 +83,10 @@ public SqsAsyncClient sqsAsyncClient(AwsClientBuilderConfigurer awsClientBuilder
84
83
ObjectProvider <AwsConnectionDetails > connectionDetails ,
85
84
ObjectProvider <SqsAsyncClientCustomizer > sqsAsyncClientCustomizers ,
86
85
ObjectProvider <AwsAsyncClientCustomizer > awsAsyncClientCustomizers ) {
87
- return awsClientBuilderConfigurer .configureAsyncClient (SqsAsyncClient .builder (), this .sqsProperties ,
88
- connectionDetails .getIfAvailable (), configurer .getIfAvailable (),
89
- sqsAsyncClientCustomizers .orderedStream (), awsAsyncClientCustomizers .orderedStream ()).build ();
90
- }
86
+ return awsClientBuilderConfigurer .configureAsyncClient (SqsAsyncClient .builder (), this .sqsProperties ,
87
+ connectionDetails .getIfAvailable (), configurer .getIfAvailable (),
88
+ sqsAsyncClientCustomizers .orderedStream (), awsAsyncClientCustomizers .orderedStream ()).build ();
89
+ }
91
90
92
91
@ ConditionalOnProperty (name = "spring.cloud.aws.sqs.batch.enabled" , havingValue = "true" )
93
92
@ Bean
@@ -98,11 +97,9 @@ public SqsAsyncClient batchSqsAsyncClient(SqsAsyncClient sqsAsyncClient) {
98
97
}
99
98
100
99
private SqsAsyncBatchManager createBatchManager (SqsAsyncClient sqsAsyncClient ) {
101
- return SqsAsyncBatchManager .builder ()
102
- .client (sqsAsyncClient )
103
- .scheduledExecutor (Executors .newScheduledThreadPool (5 ))
104
- .overrideConfiguration (this ::configurationProperties )
105
- .build ();
100
+ return SqsAsyncBatchManager .builder ().client (sqsAsyncClient )
101
+ .scheduledExecutor (Executors .newScheduledThreadPool (5 ))
102
+ .overrideConfiguration (this ::configurationProperties ).build ();
106
103
}
107
104
108
105
private void configurationProperties (BatchOverrideConfiguration .Builder options ) {
@@ -112,7 +109,7 @@ private void configurationProperties(BatchOverrideConfiguration.Builder options)
112
109
mapper .from (this .sqsProperties .getBatch ().getWaitTimeSeconds ()).to (options ::receiveMessageMinWaitDuration );
113
110
mapper .from (this .sqsProperties .getBatch ().getVisibilityTimeout ()).to (options ::receiveMessageVisibilityTimeout );
114
111
mapper .from (this .sqsProperties .getBatch ().getSystemAttributeNames ())
115
- .to (options ::receiveMessageSystemAttributeNames );
112
+ .to (options ::receiveMessageSystemAttributeNames );
116
113
mapper .from (this .sqsProperties .getBatch ().getAttributeNames ()).to (options ::receiveMessageAttributeNames );
117
114
}
118
115
@@ -122,15 +119,15 @@ public SqsTemplate sqsTemplate(SqsAsyncClient sqsAsyncClient, ObjectProvider<Obj
122
119
ObjectProvider <ObservationRegistry > observationRegistryProvider ,
123
120
ObjectProvider <SqsTemplateObservation .Convention > observationConventionProvider ,
124
121
MessagingMessageConverter <Message > messageConverter ) {
125
- SqsTemplateBuilder builder = SqsTemplate .builder ().sqsAsyncClient (sqsAsyncClient )
126
- .messageConverter (messageConverter );
122
+ SqsTemplateBuilder builder = SqsTemplate .builder ().sqsAsyncClient (sqsAsyncClient )
123
+ .messageConverter (messageConverter );
127
124
objectMapperProvider .ifAvailable (om -> setMapperToConverter (messageConverter , om ));
128
- if (this .sqsProperties .isObservationEnabled ()) {
129
- observationRegistryProvider
130
- .ifAvailable (registry -> builder .configure (options -> options .observationRegistry (registry )));
131
- observationConventionProvider
132
- .ifAvailable (convention -> builder .configure (options -> options .observationConvention (convention )));
133
- }
125
+ if (this .sqsProperties .isObservationEnabled ()) {
126
+ observationRegistryProvider
127
+ .ifAvailable (registry -> builder .configure (options -> options .observationRegistry (registry )));
128
+ observationConventionProvider
129
+ .ifAvailable (convention -> builder .configure (options -> options .observationConvention (convention )));
130
+ }
134
131
if (sqsProperties .getQueueNotFoundStrategy () != null ) {
135
132
builder .configure ((options ) -> options .queueNotFoundStrategy (sqsProperties .getQueueNotFoundStrategy ()));
136
133
}
@@ -156,12 +153,12 @@ public SqsMessageListenerContainerFactory<Object> defaultSqsListenerContainerFac
156
153
interceptors .forEach (factory ::addMessageInterceptor );
157
154
asyncInterceptors .forEach (factory ::addMessageInterceptor );
158
155
objectMapperProvider .ifAvailable (om -> setMapperToConverter (messagingMessageConverter , om ));
159
- if (this .sqsProperties .isObservationEnabled ()) {
160
- observationRegistry
161
- .ifAvailable (registry -> factory .configure (options -> options .observationRegistry (registry )));
162
- observationConventionProvider
163
- .ifAvailable (convention -> factory .configure (options -> options .observationConvention (convention )));
164
- }
156
+ if (this .sqsProperties .isObservationEnabled ()) {
157
+ observationRegistry
158
+ .ifAvailable (registry -> factory .configure (options -> options .observationRegistry (registry )));
159
+ observationConventionProvider
160
+ .ifAvailable (convention -> factory .configure (options -> options .observationConvention (convention )));
161
+ }
165
162
factory .configure (options -> options .messageConverter (messagingMessageConverter ));
166
163
return factory ;
167
164
}
0 commit comments