Skip to content

Commit 72624f4

Browse files
authored
Merge pull request #2210 from adamretter/hotfix/context-update-listener-concurrency
Fix a concurrency issue in ContextUpdateListener
2 parents 5b410da + 508ab9d commit 72624f4

File tree

2 files changed

+85
-62
lines changed

2 files changed

+85
-62
lines changed

src/org/exist/storage/NotificationService.java

Lines changed: 54 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -21,80 +21,81 @@
2121
*/
2222
package org.exist.storage;
2323

24+
import net.jcip.annotations.ThreadSafe;
2425
import org.apache.logging.log4j.LogManager;
2526
import org.apache.logging.log4j.Logger;
2627
import org.exist.dom.persistent.DocumentImpl;
2728
import org.exist.numbering.NodeId;
2829

2930
import java.util.IdentityHashMap;
31+
import java.util.Map;
32+
3033
import org.exist.dom.persistent.IStoredNode;
3134

3235
/**
3336
* Global notification service for document updates. Other classes
3437
* can subscribe to this service to be notified of document modifications,
3538
* removals or additions.
36-
*
37-
* @author wolf
3839
*
40+
* @author wolf
3941
*/
40-
public class NotificationService extends IdentityHashMap<UpdateListener, Object> implements BrokerPoolService {
42+
@ThreadSafe
43+
public class NotificationService implements BrokerPoolService {
44+
45+
private static final long serialVersionUID = -3629584664969740903L;
46+
private static final Logger LOG = LogManager.getLogger(NotificationService.class);
47+
48+
private final Map<UpdateListener, Object> listeners = new IdentityHashMap<>();
49+
50+
public NotificationService() {
51+
super();
52+
}
4153

42-
private static final long serialVersionUID = -3629584664969740903L;
54+
/**
55+
* Subscribe an {@link UpdateListener} to receive notifications.
56+
*
57+
* @param listener
58+
*/
59+
public synchronized void subscribe(final UpdateListener listener) {
60+
listeners.put(listener, new Object());
61+
}
4362

44-
private final static Logger LOG = LogManager.getLogger(NotificationService.class);
45-
46-
public NotificationService() {
47-
super();
48-
}
49-
50-
/**
51-
* Subscribe an {@link UpdateListener} to receive notifications.
52-
*
53-
* @param listener
54-
*/
55-
public synchronized void subscribe(UpdateListener listener) {
56-
put(listener, new Object());
57-
}
58-
59-
/**
60-
* Unsubscribe an {@link UpdateListener}.
61-
*
62-
* @param listener
63-
*/
64-
public synchronized void unsubscribe(UpdateListener listener) {
65-
final Object i = remove(listener);
66-
if (i == null)
67-
{throw new RuntimeException(hashCode() + " listener not found: " + listener.hashCode());}
63+
/**
64+
* Unsubscribe an {@link UpdateListener}.
65+
*
66+
* @param listener
67+
*/
68+
public synchronized void unsubscribe(final UpdateListener listener) {
69+
final Object i = listeners.remove(listener);
70+
if (i == null) {
71+
throw new RuntimeException(hashCode() + " listener not found: " + listener.hashCode());
72+
}
6873
listener.unsubscribe();
6974
}
7075

71-
/**
72-
* Notify all subscribers that a document has been updated/removed or
73-
* a new document has been added.
74-
*
75-
* @param document
76-
* @param event
77-
*/
78-
public synchronized void notifyUpdate(DocumentImpl document, int event) {
79-
for (final UpdateListener listener : keySet()) {
80-
listener.documentUpdated(document, event);
81-
}
82-
}
76+
/**
77+
* Notify all subscribers that a document has been updated/removed or
78+
* a new document has been added.
79+
*
80+
* @param document
81+
* @param event
82+
*/
83+
public synchronized void notifyUpdate(final DocumentImpl document, final int event) {
84+
listeners.keySet().forEach(listener -> listener.documentUpdated(document, event));
85+
}
8386

8487
/**
85-
* Notify all subscribers that a node has been moved. Nodes may be moved during a
88+
* Notify all subscribers that a node has been moved. Nodes may be moved during a
8689
* defragmentation run.
87-
*/
88-
public synchronized void notifyMove(NodeId oldNodeId, IStoredNode newNode) {
89-
for (final UpdateListener listener : keySet()) {
90-
listener.nodeMoved(oldNodeId, newNode);
91-
}
92-
}
90+
*/
91+
public synchronized void notifyMove(final NodeId oldNodeId, final IStoredNode newNode) {
92+
listeners.keySet().forEach(listener -> listener.nodeMoved(oldNodeId, newNode));
93+
}
9394

94-
public void debug() {
95-
LOG.debug("Registered UpdateListeners:");
96-
for (final UpdateListener listener : keySet()) {
97-
listener.debug();
98-
}
99-
}
95+
public synchronized void debug() {
96+
if (LOG.isDebugEnabled()) {
97+
LOG.debug("Registered UpdateListeners:");
98+
}
99+
listeners.keySet().forEach(UpdateListener::debug);
100+
}
100101
}

src/org/exist/xquery/XQueryContext.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import java.nio.charset.Charset;
3131
import java.nio.file.Path;
3232
import java.util.*;
33+
import java.util.concurrent.CopyOnWriteArrayList;
34+
import java.util.concurrent.atomic.AtomicReference;
3335
import java.util.function.Predicate;
3436
import java.util.stream.Collectors;
3537
import java.util.stream.Stream;
@@ -50,6 +52,7 @@
5052
import com.evolvedbinary.j8fu.tuple.Tuple2;
5153
import com.ibm.icu.text.Collator;
5254
import net.jcip.annotations.Immutable;
55+
import net.jcip.annotations.ThreadSafe;
5356
import org.apache.logging.log4j.LogManager;
5457
import org.apache.logging.log4j.Logger;
5558
import org.exist.Database;
@@ -3121,38 +3124,57 @@ public void setSource(final Source source) {
31213124
this.source = source;
31223125
}
31233126

3127+
/**
3128+
* NOTE: the {@link #unsubscribe()} method can be called
3129+
* from {@link org.exist.storage.NotificationService#unsubscribe(UpdateListener)}
3130+
* by another thread, so this class needs to be thread-safe.
3131+
*/
3132+
@ThreadSafe
31243133
private static class ContextUpdateListener implements UpdateListener {
3125-
private final List<UpdateListener> listeners = new ArrayList<>();
3134+
/*
3135+
* We use Concurrent safe data structures here, so that we don't have
3136+
* to block any calling threads.
3137+
*
3138+
* The AtomicReference enables us to quickly clear the listeners
3139+
* in #unsubscribe() and maintain happens-before integrity whilst
3140+
* unsubcribing them. The CopyOnWriteArrayList allows
3141+
* us to add listeners whilst iterating over a snapshot
3142+
* of existing iterators in other methods.
3143+
*/
3144+
private final AtomicReference<List<UpdateListener>> listeners = new AtomicReference<>(new CopyOnWriteArrayList<>());
31263145

31273146
private void addListener(final UpdateListener listener) {
3128-
listeners.add(listener);
3147+
listeners.get().add(listener);
31293148
}
31303149

31313150
@Override
31323151
public void documentUpdated(final DocumentImpl document, final int event) {
3133-
listeners.forEach(listener -> listener.documentUpdated(document, event));
3152+
listeners.get().forEach(listener -> listener.documentUpdated(document, event));
31343153
}
31353154

31363155
@Override
31373156
public void unsubscribe() {
3138-
listeners.forEach(UpdateListener::unsubscribe);
3139-
listeners.clear();
3157+
List<UpdateListener> prev = listeners.get();
3158+
while (!listeners.compareAndSet(prev, new CopyOnWriteArrayList<>())) {
3159+
prev = listeners.get();
3160+
}
3161+
3162+
prev.forEach(UpdateListener::unsubscribe);
31403163
}
31413164

31423165
@Override
31433166
public void nodeMoved(final NodeId oldNodeId, final NodeHandle newNode) {
3144-
listeners.forEach(listener -> listener.nodeMoved(oldNodeId, newNode));
3167+
listeners.get().forEach(listener -> listener.nodeMoved(oldNodeId, newNode));
31453168
}
31463169

31473170
@Override
31483171
public void debug() {
31493172
if (LOG.isDebugEnabled()) {
3150-
LOG.debug(String.format("XQueryContext: %s document update listeners", listeners.size()));
3173+
LOG.debug(String.format("XQueryContext: %s document update listeners", listeners.get().size()));
31513174
}
31523175

3153-
listeners.forEach(UpdateListener::debug);
3176+
listeners.get().forEach(UpdateListener::debug);
31543177
}
3155-
31563178
}
31573179

31583180
private final List<CleanupTask> cleanupTasks = new ArrayList<>();

0 commit comments

Comments
 (0)