Skip to content

Commit f0f2c41

Browse files
artembilangaryrussell
authored andcommitted
GH-3446: Stream support in the MessageGroupStore
Fixes #3446 * For better resources utilization provide a `Stream<Message<?>>` API on the `MessageGroupStore`, `MessageGroup` and `MessageGroupQueue` * Use this API in the `DelayHandler` when it reschedules persisted messages
1 parent 51e240b commit f0f2c41

File tree

12 files changed

+157
-72
lines changed

12 files changed

+157
-72
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/DelayHandler.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.ConcurrentMap;
2727
import java.util.concurrent.atomic.AtomicBoolean;
2828
import java.util.concurrent.atomic.AtomicInteger;
29+
import java.util.stream.Stream;
2930

3031
import org.aopalliance.aop.Advice;
3132

@@ -321,7 +322,7 @@ protected boolean shouldCopyRequestHeaders() {
321322
}
322323

323324
/**
324-
* Checks if 'requestMessage' wasn't delayed before ({@link #releaseMessageAfterDelay}
325+
* Check if 'requestMessage' wasn't delayed before ({@link #releaseMessageAfterDelay}
325326
* and {@link DelayHandler.DelayedMessageWrapper}). Than determine 'delay' for
326327
* 'requestMessage' ({@link #determineDelayForMessage}) and if {@code delay > 0}
327328
* schedules 'releaseMessage' task after 'delay'.
@@ -562,9 +563,10 @@ public int getDelayedMessageCount() {
562563
@Override
563564
public synchronized void reschedulePersistedMessages() {
564565
MessageGroup messageGroup = this.messageStore.getMessageGroup(this.messageGroupId);
565-
for (final Message<?> message : messageGroup.getMessages()) {
566-
getTaskScheduler()
567-
.schedule(() -> {
566+
try (Stream<Message<?>> messageStream = messageGroup.streamMessages()) {
567+
TaskScheduler taskScheduler = getTaskScheduler();
568+
messageStream.forEach((message) ->
569+
taskScheduler.schedule(() -> {
568570
// This is fine to keep the reference to the message,
569571
// because the scheduled task is performed immediately.
570572
long delay = determineDelayForMessage(message);
@@ -574,12 +576,12 @@ public synchronized void reschedulePersistedMessages() {
574576
else {
575577
releaseMessage(message);
576578
}
577-
}, new Date());
579+
}, new Date()));
578580
}
579581
}
580582

581583
/**
582-
* Handles {@link ContextRefreshedEvent} to invoke
584+
* Handle {@link ContextRefreshedEvent} to invoke
583585
* {@link #reschedulePersistedMessages} as late as possible after application context
584586
* startup. Also it checks {@link #initialized} to ignore other
585587
* {@link ContextRefreshedEvent}s which may be published in the 'parent-child'

spring-integration-core/src/main/java/org/springframework/integration/store/AbstractKeyValueMessageStore.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -24,6 +24,7 @@
2424
import java.util.Set;
2525
import java.util.UUID;
2626
import java.util.stream.Collectors;
27+
import java.util.stream.Stream;
2728

2829
import org.springframework.jmx.export.annotation.ManagedAttribute;
2930
import org.springframework.messaging.Message;
@@ -160,7 +161,7 @@ public Message<?> removeMessage(UUID id) {
160161
@Override
161162
@ManagedAttribute
162163
public long getMessageCount() {
163-
Collection<?> messageIds = doListKeys(this.messagePrefix + "*");
164+
Collection<?> messageIds = doListKeys(this.messagePrefix + '*');
164165
return (messageIds != null) ? messageIds.size() : 0;
165166
}
166167

@@ -346,11 +347,19 @@ public Collection<Message<?>> getMessagesForGroup(Object groupId) {
346347
return messages;
347348
}
348349

350+
@Override
351+
public Stream<Message<?>> streamMessagesForGroup(Object groupId) {
352+
return getGroupMetadata(groupId)
353+
.getMessageIds()
354+
.stream()
355+
.map(this::getMessage);
356+
}
357+
349358
@Override
350359
@SuppressWarnings("unchecked")
351360
public Iterator<MessageGroup> iterator() {
352361
final Iterator<?> idIterator = normalizeKeys(
353-
(Collection<String>) doListKeys(this.groupPrefix + "*"))
362+
(Collection<String>) doListKeys(this.groupPrefix + '*'))
354363
.iterator();
355364
return new MessageGroupIterator(idIterator);
356365
}

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroup.java

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -17,14 +17,15 @@
1717
package org.springframework.integration.store;
1818

1919
import java.util.Collection;
20+
import java.util.stream.Stream;
2021

2122
import org.springframework.messaging.Message;
2223

2324
/**
2425
* A group of messages that are correlated with each other and should be processed in the same context.
2526
* <p>
26-
* The message group allows implementations to be mutable, but this behavior is optional. Implementations should take
27-
* care to document their thread safety and mutability.
27+
* The message group allows implementations to be mutable, but this behavior is optional.
28+
* Implementations should take care to document their thread safety and mutability.
2829
*
2930
* @author Dave Syer
3031
* @author Oleg Zhurakousky
@@ -35,7 +36,6 @@ public interface MessageGroup {
3536

3637
/**
3738
* Query if the message can be added.
38-
*
3939
* @param message The message.
4040
* @return true if the message can be added.
4141
*/
@@ -57,12 +57,20 @@ public interface MessageGroup {
5757
boolean remove(Message<?> messageToRemove);
5858

5959
/**
60-
* Returns all available Messages from the group at the time of invocation
61-
*
60+
* Return all available Messages from the group at the time of invocation
6261
* @return The messages.
6362
*/
6463
Collection<Message<?>> getMessages();
6564

65+
/**
66+
* Return a stream for messages stored in this group.
67+
* @return the {@link Stream} for messages in this group.
68+
* @since 5.5
69+
*/
70+
default Stream<Message<?>> streamMessages() {
71+
return getMessages().stream();
72+
}
73+
6674
/**
6775
* @return the key that links these messages together
6876
*/

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupQueue.java

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
2525
import java.util.concurrent.locks.Condition;
2626
import java.util.concurrent.locks.Lock;
2727
import java.util.concurrent.locks.ReentrantLock;
28+
import java.util.stream.Stream;
2829

2930
import org.apache.commons.logging.Log;
3031
import org.apache.commons.logging.LogFactory;
@@ -42,6 +43,7 @@
4243
* @author Oleg Zhurakousky
4344
* @author Gunnar Hillert
4445
* @author Gary Russell
46+
* @author Artem Bilan
4547
*
4648
* @since 2.0
4749
*
@@ -118,7 +120,7 @@ public void setPriority(boolean priority) {
118120

119121
@Override
120122
public Iterator<Message<?>> iterator() {
121-
return getMessages().iterator();
123+
return stream().iterator();
122124
}
123125

124126
/**
@@ -164,29 +166,24 @@ public int size() {
164166

165167
@Override
166168
public Message<?> peek() {
167-
Message<?> message = null;
168-
final Lock lock = this.storeLock;
169169
try {
170-
lock.lockInterruptibly();
171-
try {
172-
Collection<Message<?>> messages = getMessages();
173-
if (!messages.isEmpty()) {
174-
message = messages.iterator().next();
175-
}
170+
this.storeLock.lockInterruptibly();
171+
try (Stream<Message<?>> messageStream = stream()) {
172+
return messageStream.findFirst().orElse(null);
176173
}
177174
finally {
178-
lock.unlock();
175+
this.storeLock.unlock();
179176
}
180177
}
181178
catch (InterruptedException e) {
182179
Thread.currentThread().interrupt();
183180
}
184-
return message;
181+
return null;
185182
}
186183

187184
@Override
188185
public Message<?> poll(long timeout, TimeUnit unit) throws InterruptedException {
189-
Message<?> message = null;
186+
Message<?> message;
190187
long timeoutInNanos = unit.toNanos(timeout);
191188
final Lock lock = this.storeLock;
192189
lock.lockInterruptibly();
@@ -325,7 +322,7 @@ public int remainingCapacity() {
325322

326323
@Override
327324
public Message<?> take() throws InterruptedException {
328-
Message<?> message = null;
325+
Message<?> message;
329326
final Lock lock = this.storeLock;
330327
lock.lockInterruptibly();
331328

@@ -346,6 +343,11 @@ protected Collection<Message<?>> getMessages() {
346343
return this.messageGroupStore.getMessageGroup(this.groupId).getMessages();
347344
}
348345

346+
@Override
347+
public Stream<Message<?>> stream() {
348+
return this.messageGroupStore.getMessageGroup(this.groupId).streamMessages();
349+
}
350+
349351
/**
350352
* It is assumed that the 'storeLock' is being held by the caller, otherwise
351353
* IllegalMonitorStateException may be thrown

spring-integration-core/src/main/java/org/springframework/integration/store/MessageGroupStore.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@
1818

1919
import java.util.Collection;
2020
import java.util.Iterator;
21+
import java.util.stream.Stream;
2122

2223
import org.springframework.jmx.export.annotation.ManagedAttribute;
2324
import org.springframework.jmx.export.annotation.ManagedOperation;
@@ -142,6 +143,16 @@ public interface MessageGroupStore extends BasicMessageGroupStore {
142143
*/
143144
Collection<Message<?>> getMessagesForGroup(Object groupId);
144145

146+
/**
147+
* Return a stream for messages stored in the provided group.
148+
* @param groupId the group id to retrieve messages.
149+
* @return the {@link Stream} for messages in this group.
150+
* @since 5.5
151+
*/
152+
default Stream<Message<?>> streamMessagesForGroup(Object groupId) {
153+
return getMessagesForGroup(groupId).stream();
154+
}
155+
145156
/**
146157
* Invoked when a MessageGroupStore expires a group.
147158
*/

spring-integration-core/src/main/java/org/springframework/integration/store/PersistentMessageGroup.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,6 +20,8 @@
2020
import java.util.Collection;
2121
import java.util.Collections;
2222
import java.util.Iterator;
23+
import java.util.Spliterator;
24+
import java.util.stream.Stream;
2325

2426
import org.apache.commons.logging.Log;
2527
import org.apache.commons.logging.LogFactory;
@@ -29,6 +31,7 @@
2931

3032
/**
3133
* @author Artem Bilan
34+
*
3235
* @since 4.3
3336
*/
3437
class PersistentMessageGroup implements MessageGroup {
@@ -59,6 +62,16 @@ public Collection<Message<?>> getMessages() {
5962
return Collections.unmodifiableCollection(this.messages);
6063
}
6164

65+
/**
66+
* The resulting {@link Stream} must be closed after use,
67+
* it can be declared as a resource in a {@code try-with-resources} statement.
68+
* @return the stream of messages in this group.
69+
*/
70+
@Override
71+
public Stream<Message<?>> streamMessages() {
72+
return this.messageGroupStore.streamMessagesForGroup(this.original.getGroupId());
73+
}
74+
6275
@Override
6376
public Message<?> getOne() {
6477
if (this.oneMessage == null) {
@@ -224,6 +237,16 @@ public int size() {
224237
return PersistentMessageGroup.this.size();
225238
}
226239

240+
@Override
241+
public Spliterator<Message<?>> spliterator() {
242+
return streamMessages().spliterator();
243+
}
244+
245+
@Override
246+
public Stream<Message<?>> stream() {
247+
return streamMessages();
248+
}
249+
227250
}
228251

229252
}

spring-integration-core/src/test/java/org/springframework/integration/config/xml/DelayerParserTests.java

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2020 the original author or authors.
2+
* Copyright 2002-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,8 +22,7 @@
2222
import java.util.HashMap;
2323
import java.util.List;
2424

25-
import org.junit.Test;
26-
import org.junit.runner.RunWith;
25+
import org.junit.jupiter.api.Test;
2726

2827
import org.springframework.beans.DirectFieldAccessor;
2928
import org.springframework.beans.factory.annotation.Autowired;
@@ -35,8 +34,7 @@
3534
import org.springframework.integration.test.util.TestUtils;
3635
import org.springframework.messaging.Message;
3736
import org.springframework.messaging.MessageHandler;
38-
import org.springframework.test.context.ContextConfiguration;
39-
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
37+
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4038
import org.springframework.transaction.TransactionDefinition;
4139
import org.springframework.transaction.interceptor.MatchAlwaysTransactionAttributeSource;
4240
import org.springframework.transaction.interceptor.NameMatchTransactionAttributeSource;
@@ -51,8 +49,7 @@
5149
*
5250
* @since 1.0.3
5351
*/
54-
@RunWith(SpringJUnit4ClassRunner.class)
55-
@ContextConfiguration
52+
@SpringJUnitConfig
5653
public class DelayerParserTests {
5754

5855
@Autowired
@@ -70,7 +67,7 @@ public void defaultScheduler() {
7067
.isEqualTo("headers.foo");
7168
assertThat(TestUtils.getPropertyValue(delayHandler, "messagingTemplate.sendTimeout", Long.class))
7269
.isEqualTo(987L);
73-
assertThat(TestUtils.getPropertyValue(delayHandler, "taskScheduler")).isNull();
70+
assertThat(TestUtils.getPropertyValue(delayHandler, "taskScheduler")).isNotNull();
7471
}
7572

7673
@Test
@@ -110,7 +107,8 @@ public void transactionalSubElement() throws Exception {
110107
assertThat(adviceChain.size()).isEqualTo(1);
111108
Object advice = adviceChain.get(0);
112109
assertThat(advice instanceof TransactionInterceptor).isTrue();
113-
TransactionAttributeSource transactionAttributeSource = ((TransactionInterceptor) advice).getTransactionAttributeSource();
110+
TransactionAttributeSource transactionAttributeSource =
111+
((TransactionInterceptor) advice).getTransactionAttributeSource();
114112
assertThat(transactionAttributeSource instanceof MatchAlwaysTransactionAttributeSource).isTrue();
115113
Method method = MessageHandler.class.getMethod("handleMessage", Message.class);
116114
TransactionDefinition definition = transactionAttributeSource.getTransactionAttribute(method, null);
@@ -130,7 +128,8 @@ public void adviceChainSubElement() {
130128

131129
Object txAdvice = adviceChain.get(1);
132130
assertThat(txAdvice.getClass()).isEqualTo(TransactionInterceptor.class);
133-
TransactionAttributeSource transactionAttributeSource = ((TransactionInterceptor) txAdvice).getTransactionAttributeSource();
131+
TransactionAttributeSource transactionAttributeSource =
132+
((TransactionInterceptor) txAdvice).getTransactionAttributeSource();
134133
assertThat(transactionAttributeSource.getClass()).isEqualTo(NameMatchTransactionAttributeSource.class);
135134
HashMap<?, ?> nameMap = TestUtils.getPropertyValue(transactionAttributeSource, "nameMap", HashMap.class);
136135
assertThat(nameMap.toString()).isEqualTo("{*=PROPAGATION_REQUIRES_NEW,ISOLATION_DEFAULT,readOnly}");

0 commit comments

Comments
 (0)