Skip to content

Commit efa0f1b

Browse files
Make AbstractMomSubscriber thread-safe
The list of subscriptions should be thread-safe, so that registrations can be done in parallel. 411785
1 parent 0badaaf commit efa0f1b

File tree

2 files changed

+85
-6
lines changed

2 files changed

+85
-6
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright (c) 2010, 2026 BSI Business Systems Integration AG
3+
*
4+
* This program and the accompanying materials are made
5+
* available under the terms of the Eclipse Public License 2.0
6+
* which is available at https://www.eclipse.org/legal/epl-2.0/
7+
*
8+
* SPDX-License-Identifier: EPL-2.0
9+
*/
10+
package org.eclipse.scout.rt.mom.api;
11+
12+
import static org.mockito.Mockito.*;
13+
14+
import java.util.List;
15+
import java.util.stream.Collectors;
16+
import java.util.stream.Stream;
17+
18+
import org.eclipse.scout.rt.platform.job.IFuture;
19+
import org.eclipse.scout.rt.platform.job.Jobs;
20+
import org.junit.Test;
21+
22+
public class MomSubscriberTest {
23+
24+
@Test
25+
public void testDispose() {
26+
ISubscription subscriptionMock = mock(ISubscription.class);
27+
AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
28+
@Override
29+
public void subscribe() {
30+
registerSubscription(subscriptionMock);
31+
}
32+
};
33+
subscriber.subscribe();
34+
subscriber.dispose();
35+
verify(subscriptionMock, times(1)).dispose();
36+
}
37+
38+
@Test
39+
public void testConcurrentSubscriptions() {
40+
List<ISubscription> subscriptionMocks = Stream.generate(() -> mock(ISubscription.class))
41+
.limit(10000)
42+
.collect(Collectors.toList());
43+
44+
AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
45+
@Override
46+
public void subscribe() {
47+
subscriptionMocks.stream().parallel().forEach(this::registerSubscription);
48+
}
49+
};
50+
51+
subscriber.subscribe();
52+
subscriber.dispose();
53+
for (ISubscription subscriptionMock : subscriptionMocks) {
54+
verify(subscriptionMock, times(1)).dispose();
55+
}
56+
}
57+
58+
@Test
59+
public void testConcurrentModification() {
60+
List<ISubscription> subscriptionMocks = Stream.generate(() -> mock(ISubscription.class))
61+
.limit(100_000)
62+
.collect(Collectors.toList());
63+
64+
AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
65+
@Override
66+
public void subscribe() {
67+
for (ISubscription subscriptionMock : subscriptionMocks) {
68+
registerSubscription(subscriptionMock);
69+
}
70+
}
71+
};
72+
73+
subscriber.subscribe(); // insert initial subscriptions
74+
IFuture<Void> subscriptionFuture = Jobs.schedule(subscriber::subscribe, Jobs.newInput()); // insert more subscriptions async
75+
subscriber.dispose(); // start disposing while new subscriptions are being added
76+
subscriptionFuture.awaitDone();
77+
}
78+
}

org.eclipse.scout.rt.mom.api/src/main/java/org/eclipse/scout/rt/mom/api/AbstractMomSubscriber.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright (c) 2010, 2023 BSI Business Systems Integration AG
2+
* Copyright (c) 2010, 2026 BSI Business Systems Integration AG
33
*
44
* This program and the accompanying materials are made
55
* available under the terms of the Eclipse Public License 2.0
@@ -10,22 +10,21 @@
1010
package org.eclipse.scout.rt.mom.api;
1111

1212
import java.util.ArrayList;
13+
import java.util.Collections;
1314
import java.util.List;
1415

1516
import org.eclipse.scout.rt.platform.ApplicationScoped;
1617

1718
/**
1819
* Keeps a list of {@link ISubscription}s that are registered during {@link #subscribe()}. All registered subscriptions
1920
* are disposed when {@link #dispose()} is called.
20-
* <p>
21-
* This class is not thread-safe.
2221
*
2322
* @since 6.1
2423
*/
2524
@ApplicationScoped
2625
public abstract class AbstractMomSubscriber {
2726

28-
private final List<ISubscription> m_subscriptions = new ArrayList<>();
27+
private final List<ISubscription> m_subscriptions = Collections.synchronizedList(new ArrayList<>());
2928

3029
public abstract void subscribe();
3130

@@ -37,8 +36,10 @@ protected void registerSubscription(ISubscription subscription) {
3736
}
3837

3938
public void dispose() {
40-
for (ISubscription subscription : m_subscriptions) {
41-
subscription.dispose();
39+
synchronized (m_subscriptions) {
40+
for (ISubscription subscription : m_subscriptions) {
41+
subscription.dispose();
42+
}
4243
}
4344
}
4445
}

0 commit comments

Comments
 (0)