1717package org .springframework .rabbit .stream .producer ;
1818
1919import java .util .concurrent .CompletableFuture ;
20- import java .util .concurrent .locks .Lock ;
21- import java .util .concurrent .locks .ReentrantLock ;
20+ import java .util .concurrent .atomic .AtomicBoolean ;
2221import java .util .function .Function ;
2322
2423import org .springframework .amqp .core .Message ;
5554 *
5655 * @author Gary Russell
5756 * @author Christian Tzolov
57+ * @author Jeonggi Kim
5858 * @since 2.4
5959 *
6060 */
6161public class RabbitStreamTemplate implements RabbitStreamOperations , ApplicationContextAware , BeanNameAware {
6262
6363 protected final LogAccessor logger = new LogAccessor (getClass ()); // NOSONAR
6464
65- private final Lock lock = new ReentrantLock ();
66-
6765 private ApplicationContext applicationContext ;
6866
6967 private final Environment environment ;
@@ -74,9 +72,9 @@ public class RabbitStreamTemplate implements RabbitStreamOperations, Application
7472
7573 private MessageConverter messageConverter = new SimpleMessageConverter ();
7674
77- private StreamMessageConverter streamConverter = new DefaultStreamMessageConverter ();
75+ private StreamMessageConverter streamMessageConverter = new DefaultStreamMessageConverter ();
7876
79- private boolean streamConverterSet ;
77+ private final AtomicBoolean producerInitialized = new AtomicBoolean ( false ) ;
8078
8179 private Producer producer ;
8280
@@ -107,29 +105,23 @@ public RabbitStreamTemplate(Environment environment, String streamName) {
107105
108106
109107 private Producer createOrGetProducer () {
110- this .lock .lock ();
111- try {
112- if (this .producer == null ) {
113- ProducerBuilder builder = this .environment .producerBuilder ();
114- if (this .superStreamRouting == null ) {
115- builder .stream (this .streamName );
116- }
117- else {
118- builder .superStream (this .streamName )
119- .routing (this .superStreamRouting );
120- }
121- this .producerCustomizer .accept (this .beanName , builder );
122- this .producer = builder .build ();
123- if (!this .streamConverterSet ) {
124- ((DefaultStreamMessageConverter ) this .streamConverter ).setBuilderSupplier (
125- () -> this .producer .messageBuilder ());
126- }
108+ if (this .producerInitialized .compareAndSet (false , true )) {
109+ ProducerBuilder builder = this .environment .producerBuilder ();
110+ if (this .superStreamRouting == null ) {
111+ builder .stream (this .streamName );
112+ }
113+ else {
114+ builder .superStream (this .streamName )
115+ .routing (this .superStreamRouting );
116+ }
117+ this .producerCustomizer .accept (this .beanName , builder );
118+ this .producer = builder .build ();
119+ if (this .streamMessageConverter instanceof DefaultStreamMessageConverter ) {
120+ ((DefaultStreamMessageConverter ) this .streamMessageConverter ).setBuilderSupplier (
121+ () -> this .producer .messageBuilder ());
127122 }
128- return this .producer ;
129- }
130- finally {
131- this .lock .unlock ();
132123 }
124+ return this .producer ;
133125 }
134126
135127 @ Override
@@ -139,13 +131,8 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
139131
140132 @ Override
141133 public void setBeanName (String name ) {
142- this .lock .lock ();
143- try {
144- this .beanName = name ;
145- }
146- finally {
147- this .lock .unlock ();
148- }
134+ throwIfProducerAlreadyInitialized ();
135+ this .beanName = name ;
149136 }
150137
151138 /**
@@ -154,13 +141,8 @@ public void setBeanName(String name) {
154141 * @since 3.0
155142 */
156143 public void setSuperStreamRouting (Function <com .rabbitmq .stream .Message , String > superStreamRouting ) {
157- this .lock .lock ();
158- try {
159- this .superStreamRouting = superStreamRouting ;
160- }
161- finally {
162- this .lock .unlock ();
163- }
144+ throwIfProducerAlreadyInitialized ();
145+ this .superStreamRouting = superStreamRouting ;
164146 }
165147
166148
@@ -176,18 +158,12 @@ public void setMessageConverter(MessageConverter messageConverter) {
176158 /**
177159 * Set a converter to convert from {@link Message} to {@link com.rabbitmq.stream.Message}
178160 * for {@link #send(Message)} and {@link #convertAndSend(Object)} methods.
179- * @param streamConverter the converter.
161+ * @param streamMessageConverter the converter.
180162 */
181- public void setStreamConverter (StreamMessageConverter streamConverter ) {
182- Assert .notNull (streamConverter , "'streamConverter' cannot be null" );
183- this .lock .lock ();
184- try {
185- this .streamConverter = streamConverter ;
186- this .streamConverterSet = true ;
187- }
188- finally {
189- this .lock .unlock ();
190- }
163+ public void setStreamConverter (StreamMessageConverter streamMessageConverter ) {
164+ Assert .notNull (streamMessageConverter , "'streamMessageConverter' cannot be null" );
165+ throwIfProducerAlreadyInitialized ();
166+ this .streamMessageConverter = streamMessageConverter ;
191167 }
192168
193169 /**
@@ -196,12 +172,13 @@ public void setStreamConverter(StreamMessageConverter streamConverter) {
196172 */
197173 public void setProducerCustomizer (ProducerCustomizer producerCustomizer ) {
198174 Assert .notNull (producerCustomizer , "'producerCustomizer' cannot be null" );
199- this .lock .lock ();
200- try {
201- this .producerCustomizer = producerCustomizer ;
202- }
203- finally {
204- this .lock .unlock ();
175+ throwIfProducerAlreadyInitialized ();
176+ this .producerCustomizer = producerCustomizer ;
177+ }
178+
179+ private void throwIfProducerAlreadyInitialized () {
180+ if (producerInitialized .get ()) {
181+ throw new IllegalStateException ("producer is already initialized" );
205182 }
206183 }
207184
@@ -223,14 +200,14 @@ public MessageConverter messageConverter() {
223200
224201 @ Override
225202 public StreamMessageConverter streamMessageConverter () {
226- return this .streamConverter ;
203+ return this .streamMessageConverter ;
227204 }
228205
229206
230207 @ Override
231208 public CompletableFuture <Boolean > send (Message message ) {
232209 CompletableFuture <Boolean > future = new CompletableFuture <>();
233- observeSend (this .streamConverter .fromMessage (message ), future );
210+ observeSend (this .streamMessageConverter .fromMessage (message ), future );
234211 return future ;
235212 }
236213
@@ -339,15 +316,9 @@ private ConfirmationHandler handleConfirm(CompletableFuture<Boolean> future, Obs
339316 */
340317 @ Override
341318 public void close () {
342- this .lock .lock ();
343- try {
344- if (this .producer != null ) {
345- this .producer .close ();
346- this .producer = null ;
347- }
348- }
349- finally {
350- this .lock .unlock ();
319+ Producer producer = this .producer ;
320+ if (this .producerInitialized .compareAndSet (true , false )) {
321+ producer .close ();
351322 }
352323 }
353324
0 commit comments