Skip to content

Commit 24209d5

Browse files
authored
IoTV2: Remove events that do not retry in leader's transfer buffer & fix concurrency bug of retryQueue & fix retry interval & refine log (apache#15684)
* remove events that do not retry * refine log & fix retry interval & fix concurrency issue of retryQueue
1 parent c6d5e29 commit 24209d5

File tree

7 files changed

+37
-22
lines changed

7 files changed

+37
-22
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,10 @@
7272
import java.util.Comparator;
7373
import java.util.Iterator;
7474
import java.util.Objects;
75-
import java.util.PriorityQueue;
7675
import java.util.Queue;
7776
import java.util.concurrent.BlockingQueue;
7877
import java.util.concurrent.LinkedBlockingDeque;
78+
import java.util.concurrent.PriorityBlockingQueue;
7979
import java.util.concurrent.ScheduledExecutorService;
8080
import java.util.concurrent.TimeUnit;
8181
import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,7 +101,7 @@ public class PipeConsensusAsyncConnector extends IoTDBConnector implements Conse
101101
private static final long PIPE_CONSENSUS_EVENT_ENQUEUE_TIMEOUT_IN_MS =
102102
IOTDB_CONFIG.getConnectionTimeoutInMS() / 6;
103103
private final Queue<EnrichedEvent> retryEventQueue =
104-
new PriorityQueue<>(
104+
new PriorityBlockingQueue<>(
105105
IOTDB_CONFIG.getIotConsensusV2PipelineSize(),
106106
Comparator.comparingLong(EnrichedEvent::getReplicateIndexForIoTV2));
107107
// We use enrichedEvent here to make use of EnrichedEvent.equalsInPipeConsensus
@@ -535,9 +535,10 @@ private void asyncTransferQueuedEventsIfNecessary() {
535535
? peekedEvent.getRetryInterval()
536536
: 0L;
537537
LOGGER.info(
538-
"PipeConsensus-ConsensusGroup-{}: retry with interval {} for {}",
538+
"PipeConsensus-ConsensusGroup-{}: retry with interval {} for index {} {}",
539539
consensusGroupId,
540540
retryInterval,
541+
peekedEvent.getReplicateIndexForIoTV2(),
541542
peekedEvent);
542543
// need to retry in background service, otherwise the retryInterval will block the sender
543544
// procedure.
@@ -616,7 +617,7 @@ private void retryTransfer(final PipeDeleteDataNodeEvent deleteDataNodeEvent) {
616617
* @param event event to retry
617618
*/
618619
@SuppressWarnings("java:S899")
619-
public void addFailureEventToRetryQueue(final EnrichedEvent event) {
620+
public synchronized void addFailureEventToRetryQueue(final EnrichedEvent event) {
620621
if (event.isReleased()) {
621622
return;
622623
}
@@ -630,11 +631,20 @@ public void addFailureEventToRetryQueue(final EnrichedEvent event) {
630631
return;
631632
}
632633

633-
retryEventQueue.offer(event);
634-
LOGGER.info(
635-
"PipeConsensus-ConsensusGroup-{}: Event {} transfer failed, will be added to retry queue.",
636-
consensusGroupId,
637-
event);
634+
boolean res = retryEventQueue.offer(event);
635+
if (res) {
636+
LOGGER.info(
637+
"PipeConsensus-ConsensusGroup-{}: Event {} replicate index {} transfer failed, will be added to retry queue.",
638+
consensusGroupId,
639+
event,
640+
event.getReplicateIndexForIoTV2());
641+
} else {
642+
LOGGER.warn(
643+
"PipeConsensus-ConsensusGroup-{}: Event {} replicate index {} transfer failed, added to retry queue failed, this event will be ignored.",
644+
consensusGroupId,
645+
event,
646+
event.getReplicateIndexForIoTV2());
647+
}
638648

639649
if (isClosed.get()) {
640650
event.clearReferenceCount(PipeConsensusAsyncConnector.class.getName());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusDeleteEventHandler.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,10 @@ public void onComplete(TPipeConsensusTransferResp response) {
8787
LOGGER.info(
8888
"DeleteNodeTransfer: no.{} event successfully processed!",
8989
event.getReplicateIndexForIoTV2());
90-
connector.removeEventFromBuffer(event);
9190
}
91+
// if code flow reach here, meaning the file will not be resent and will be ignored.
92+
// events that don't need to be retried will be removed from the buffer
93+
connector.removeEventFromBuffer(event);
9294

9395
long duration = System.nanoTime() - createTime;
9496
metric.recordConnectorWalTransferTimer(duration);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTabletInsertionEventHandler.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,9 +95,12 @@ public void onComplete(TPipeConsensusTransferResp response) {
9595
LOGGER.info(
9696
"InsertNodeTransfer: no.{} event successfully processed!",
9797
((EnrichedEvent) event).getReplicateIndexForIoTV2());
98-
connector.removeEventFromBuffer((EnrichedEvent) event);
9998
}
10099

100+
// if code flow reach here, meaning the file will not be resent and will be ignored.
101+
// events that don't need to be retried will be removed from the buffer
102+
connector.removeEventFromBuffer((EnrichedEvent) event);
103+
101104
long duration = System.nanoTime() - createTime;
102105
metric.recordConnectorWalTransferTimer(duration);
103106
} catch (Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/handler/PipeConsensusTsFileInsertionEventHandler.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -210,9 +210,9 @@ public void onComplete(final TPipeConsensusTransferResp response) {
210210
tsFile.getName());
211211
}
212212

213-
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
214-
connector.removeEventFromBuffer(event);
215-
}
213+
// if code flow reach here, meaning the file will not be resent and will be ignored.
214+
// events that don't need to be retried will be removed from the buffer
215+
connector.removeEventFromBuffer(event);
216216
} catch (final Exception e) {
217217
onError(e);
218218
return;
@@ -295,10 +295,10 @@ public void onError(final Exception exception) {
295295

296296
if (RetryUtils.needRetryWithIncreasingInterval(exception)) {
297297
// just in case for overflow
298-
if (event.getRetryInterval() << 2 <= 0) {
298+
if (event.getRetryInterval() << 1 <= 0) {
299299
event.setRetryInterval(1000L * 20);
300300
} else {
301-
event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() << 2));
301+
event.setRetryInterval(Math.min(1000L * 20, event.getRetryInterval() << 1));
302302
}
303303
}
304304

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
6161
&& PipeConsensusProcessor.isShouldReplicate(tsFileInsertionEvent)) {
6262
tsFileInsertionEvent.setReplicateIndexForIoTV2(
6363
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
64-
LOGGER.debug(
64+
LOGGER.info(
6565
"[Region{}]Set {} for event {}",
6666
dataRegionId,
6767
tsFileInsertionEvent.getReplicateIndexForIoTV2(),
@@ -103,7 +103,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
103103
&& PipeConsensusProcessor.isShouldReplicate(insertionEvent)) {
104104
insertionEvent.setReplicateIndexForIoTV2(
105105
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
106-
LOGGER.debug(
106+
LOGGER.info(
107107
"[Region{}]Set {} for event {}",
108108
dataRegionId,
109109
insertionEvent.getReplicateIndexForIoTV2(),
@@ -130,7 +130,7 @@ public static PipeRealtimeEvent createRealtimeEvent(
130130
&& PipeConsensusProcessor.isShouldReplicate(deleteDataNodeEvent)) {
131131
deleteDataNodeEvent.setReplicateIndexForIoTV2(
132132
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
133-
LOGGER.debug(
133+
LOGGER.info(
134134
"[Region{}]Set {} for event {}",
135135
dataRegionId,
136136
deleteDataNodeEvent.getReplicateIndexForIoTV2(),

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -879,7 +879,7 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
879879
event.setReplicateIndexForIoTV2(
880880
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
881881
resource.getDataRegionId()));
882-
LOGGER.debug(
882+
LOGGER.info(
883883
"[Region{}]Set {} for event {}",
884884
resource.getDataRegionId(),
885885
event.getReplicateIndexForIoTV2(),

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/event/EnrichedEvent.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -435,8 +435,8 @@ public long getRetryInterval() {
435435
return this.retryInterval;
436436
}
437437

438-
public long setRetryInterval(final long retryInterval) {
439-
return retryInterval;
438+
public void setRetryInterval(final long retryInterval) {
439+
this.retryInterval = retryInterval;
440440
}
441441

442442
public CommitterKey getCommitterKey() {

0 commit comments

Comments
 (0)