Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (c) 2010, 2026 BSI Business Systems Integration AG
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.scout.rt.mom.api;

import static org.mockito.Mockito.*;

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.eclipse.scout.rt.platform.job.IFuture;
import org.eclipse.scout.rt.platform.job.Jobs;
import org.junit.Test;

public class MomSubscriberTest {

@Test
public void testDispose() {
ISubscription subscriptionMock = mock(ISubscription.class);
AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
@Override
public void subscribe() {
registerSubscription(subscriptionMock);
}
};
subscriber.subscribe();
subscriber.dispose();
verify(subscriptionMock, times(1)).dispose();
}

@Test
public void testConcurrentSubscriptions() {
List<ISubscription> subscriptionMocks = Stream.generate(() -> mock(ISubscription.class))
.limit(10000)
.collect(Collectors.toList());

AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
@Override
public void subscribe() {
subscriptionMocks.stream().parallel().forEach(this::registerSubscription);
}
};

subscriber.subscribe();
subscriber.dispose();
for (ISubscription subscriptionMock : subscriptionMocks) {
verify(subscriptionMock, times(1)).dispose();
}
}

@Test
public void testConcurrentModification() {
List<ISubscription> subscriptionMocks = Stream.generate(() -> mock(ISubscription.class))
.limit(100_000)
.collect(Collectors.toList());

AbstractMomSubscriber subscriber = new AbstractMomSubscriber() {
@Override
public void subscribe() {
for (ISubscription subscriptionMock : subscriptionMocks) {
registerSubscription(subscriptionMock);
}
}
};

subscriber.subscribe(); // insert initial subscriptions
IFuture<Void> subscriptionFuture = Jobs.schedule(subscriber::subscribe, Jobs.newInput()); // insert more subscriptions async
subscriber.dispose(); // start disposing while new subscriptions are being added
subscriptionFuture.awaitDone();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2010, 2023 BSI Business Systems Integration AG
* Copyright (c) 2010, 2026 BSI Business Systems Integration AG
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -10,22 +10,21 @@
package org.eclipse.scout.rt.mom.api;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.eclipse.scout.rt.platform.ApplicationScoped;

/**
* Keeps a list of {@link ISubscription}s that are registered during {@link #subscribe()}. All registered subscriptions
* are disposed when {@link #dispose()} is called.
* <p>
* This class is not thread-safe.
*
* @since 6.1
*/
@ApplicationScoped
public abstract class AbstractMomSubscriber {

private final List<ISubscription> m_subscriptions = new ArrayList<>();
private final List<ISubscription> m_subscriptions = Collections.synchronizedList(new ArrayList<>());

public abstract void subscribe();

Expand All @@ -37,8 +36,10 @@ protected void registerSubscription(ISubscription subscription) {
}

public void dispose() {
for (ISubscription subscription : m_subscriptions) {
subscription.dispose();
synchronized (m_subscriptions) {
for (ISubscription subscription : m_subscriptions) {
subscription.dispose();
}
}
}
}