|
30 | 30 | import java.nio.charset.Charset;
|
31 | 31 | import java.nio.file.Path;
|
32 | 32 | import java.util.*;
|
| 33 | +import java.util.concurrent.CopyOnWriteArrayList; |
| 34 | +import java.util.concurrent.atomic.AtomicReference; |
33 | 35 | import java.util.function.Predicate;
|
34 | 36 | import java.util.stream.Collectors;
|
35 | 37 | import java.util.stream.Stream;
|
|
50 | 52 | import com.evolvedbinary.j8fu.tuple.Tuple2;
|
51 | 53 | import com.ibm.icu.text.Collator;
|
52 | 54 | import net.jcip.annotations.Immutable;
|
| 55 | +import net.jcip.annotations.ThreadSafe; |
53 | 56 | import org.apache.logging.log4j.LogManager;
|
54 | 57 | import org.apache.logging.log4j.Logger;
|
55 | 58 | import org.exist.Database;
|
@@ -3121,38 +3124,57 @@ public void setSource(final Source source) {
|
3121 | 3124 | this.source = source;
|
3122 | 3125 | }
|
3123 | 3126 |
|
| 3127 | + /** |
| 3128 | + * NOTE: the {@link #unsubscribe()} method can be called |
| 3129 | + * from {@link org.exist.storage.NotificationService#unsubscribe(UpdateListener)} |
| 3130 | + * by another thread, so this class needs to be thread-safe. |
| 3131 | + */ |
| 3132 | + @ThreadSafe |
3124 | 3133 | private static class ContextUpdateListener implements UpdateListener {
|
3125 |
| - private final List<UpdateListener> listeners = new ArrayList<>(); |
| 3134 | + /* |
| 3135 | + * We use Concurrent safe data structures here, so that we don't have |
| 3136 | + * to block any calling threads. |
| 3137 | + * |
| 3138 | + * The AtomicReference enables us to quickly clear the listeners |
| 3139 | + * in #unsubscribe() and maintain happens-before integrity whilst |
| 3140 | + * unsubcribing them. The CopyOnWriteArrayList allows |
| 3141 | + * us to add listeners whilst iterating over a snapshot |
| 3142 | + * of existing iterators in other methods. |
| 3143 | + */ |
| 3144 | + private final AtomicReference<List<UpdateListener>> listeners = new AtomicReference<>(new CopyOnWriteArrayList<>()); |
3126 | 3145 |
|
3127 | 3146 | private void addListener(final UpdateListener listener) {
|
3128 |
| - listeners.add(listener); |
| 3147 | + listeners.get().add(listener); |
3129 | 3148 | }
|
3130 | 3149 |
|
3131 | 3150 | @Override
|
3132 | 3151 | public void documentUpdated(final DocumentImpl document, final int event) {
|
3133 |
| - listeners.forEach(listener -> listener.documentUpdated(document, event)); |
| 3152 | + listeners.get().forEach(listener -> listener.documentUpdated(document, event)); |
3134 | 3153 | }
|
3135 | 3154 |
|
3136 | 3155 | @Override
|
3137 | 3156 | public void unsubscribe() {
|
3138 |
| - listeners.forEach(UpdateListener::unsubscribe); |
3139 |
| - listeners.clear(); |
| 3157 | + List<UpdateListener> prev = listeners.get(); |
| 3158 | + while (!listeners.compareAndSet(prev, new CopyOnWriteArrayList<>())) { |
| 3159 | + prev = listeners.get(); |
| 3160 | + } |
| 3161 | + |
| 3162 | + prev.forEach(UpdateListener::unsubscribe); |
3140 | 3163 | }
|
3141 | 3164 |
|
3142 | 3165 | @Override
|
3143 | 3166 | public void nodeMoved(final NodeId oldNodeId, final NodeHandle newNode) {
|
3144 |
| - listeners.forEach(listener -> listener.nodeMoved(oldNodeId, newNode)); |
| 3167 | + listeners.get().forEach(listener -> listener.nodeMoved(oldNodeId, newNode)); |
3145 | 3168 | }
|
3146 | 3169 |
|
3147 | 3170 | @Override
|
3148 | 3171 | public void debug() {
|
3149 | 3172 | if (LOG.isDebugEnabled()) {
|
3150 |
| - LOG.debug(String.format("XQueryContext: %s document update listeners", listeners.size())); |
| 3173 | + LOG.debug(String.format("XQueryContext: %s document update listeners", listeners.get().size())); |
3151 | 3174 | }
|
3152 | 3175 |
|
3153 |
| - listeners.forEach(UpdateListener::debug); |
| 3176 | + listeners.get().forEach(UpdateListener::debug); |
3154 | 3177 | }
|
3155 |
| - |
3156 | 3178 | }
|
3157 | 3179 |
|
3158 | 3180 | private final List<CleanupTask> cleanupTasks = new ArrayList<>();
|
|
0 commit comments