Skip to content

Commit cbe859d

Browse files
committed
remove() refactored as suggested on jbarrett's review
1 parent 43194d0 commit cbe859d

File tree

2 files changed

+20
-45
lines changed

2 files changed

+20
-45
lines changed

geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderQueue.java

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -284,10 +284,10 @@ public synchronized void remove() throws CacheException {
284284
if (peekedIds.isEmpty()) {
285285
return;
286286
}
287-
boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
288-
Long key = peekedIds.remove();
289-
updateHeadKey(key);
290-
lastDispatchedKey = key;
287+
final boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
288+
final Long key = peekedIds.remove();
289+
290+
preProcessRemovedKey(key);
291291

292292
removeIndex(key);
293293
// Remove the entry at that key with a callback arg signifying it is
@@ -307,6 +307,8 @@ public synchronized void remove() throws CacheException {
307307
}
308308
}
309309

310+
postProcessRemovedKey();
311+
310312
if (wasEmpty) {
311313
synchronized (this) {
312314
notifyAll();
@@ -320,6 +322,14 @@ public synchronized void remove() throws CacheException {
320322
}
321323
}
322324

325+
protected void preProcessRemovedKey(final Long key) {
326+
updateHeadKey(key);
327+
lastDispatchedKey = key;
328+
}
329+
330+
protected void postProcessRemovedKey() {}
331+
332+
323333
/**
324334
* This method removes batchSize entries from the queue. It will only remove entries that were
325335
* previously peeked.

geode-wan-txgrouping/src/main/java/org/apache/geode/cache/wan/internal/txgrouping/serial/TxGroupingSerialGatewaySenderQueue.java

Lines changed: 6 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,7 @@
2727
import java.util.function.Predicate;
2828

2929
import org.apache.geode.annotations.VisibleForTesting;
30-
import org.apache.geode.cache.CacheException;
3130
import org.apache.geode.cache.CacheListener;
32-
import org.apache.geode.cache.EntryNotFoundException;
3331
import org.apache.geode.cache.TransactionId;
3432
import org.apache.geode.cache.asyncqueue.AsyncEvent;
3533
import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
@@ -198,37 +196,17 @@ protected List<KeyAndEventPair> getElementsMatching(Predicate<GatewaySenderEvent
198196
}
199197

200198
@Override
201-
public synchronized void remove() throws CacheException {
202-
if (peekedIds.isEmpty()) {
203-
return;
204-
}
205-
boolean wasEmpty = lastDispatchedKey == lastDestroyedKey;
206-
Long key = peekedIds.remove();
199+
protected void preProcessRemovedKey(Long key) {
207200
boolean isExtraPeekedId = extraPeekedIds.contains(key);
208201
if (!isExtraPeekedId) {
209-
updateHeadKey(key);
210-
lastDispatchedKey = key;
202+
super.preProcessRemovedKey(key);
211203
} else {
212204
extraPeekedIdsRemovedButPreviousIdNotRemoved.add(key);
213205
}
214-
removeIndex(key);
215-
// Remove the entry at that key with a callback arg signifying it is
216-
// a WAN queue so that AbstractRegionEntry.destroy can get the value
217-
// even if it has been evicted to disk. In the normal case, the
218-
// AbstractRegionEntry.destroy only gets the value in the VM.
219-
try {
220-
this.region.localDestroy(key, WAN_QUEUE_TOKEN);
221-
this.stats.decQueueSize();
222-
} catch (EntryNotFoundException ok) {
223-
// this is acceptable because the conflation can remove entries
224-
// out from underneath us.
225-
if (logger.isDebugEnabled()) {
226-
logger.debug(
227-
"{}: Did not destroy entry at {} it was not there. It should have been removed by conflation.",
228-
this, key);
229-
}
230-
}
206+
}
231207

208+
@Override
209+
protected void postProcessRemovedKey() {
232210
// For those extraPeekedIds removed that are consecutive to lastDispatchedKey:
233211
// - Update lastDispatchedKey with them so that they are removed
234212
// by the batch removal thread.
@@ -238,20 +216,7 @@ public synchronized void remove() throws CacheException {
238216
while (extraPeekedIdsRemovedButPreviousIdNotRemoved.contains(tmpKey = inc(tmpKey))) {
239217
extraPeekedIdsRemovedButPreviousIdNotRemoved.remove(tmpKey);
240218
extraPeekedIds.remove(tmpKey);
241-
updateHeadKey(tmpKey);
242-
lastDispatchedKey = tmpKey;
243-
}
244-
245-
if (wasEmpty) {
246-
synchronized (this) {
247-
notifyAll();
248-
}
249-
}
250-
251-
if (logger.isDebugEnabled()) {
252-
logger.debug(
253-
"{}: Destroyed entry at key {} setting the lastDispatched Key to {}. The last destroyed entry was {}",
254-
this, key, this.lastDispatchedKey, this.lastDestroyedKey);
219+
super.preProcessRemovedKey(tmpKey);
255220
}
256221
}
257222

0 commit comments

Comments
 (0)