Skip to content

Commit c28e50f

Browse files
Pipe: Add retry when TsFile parsing failed to avoid race among processor threads (follow up #15624) (#15644)
* Pipe: Add retry when TsFile parsing failed to avoid race among processor threads * refactor * refactor * refactor
1 parent b4419bb commit c28e50f

File tree

7 files changed

+135
-68
lines changed

7 files changed

+135
-68
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java

Lines changed: 2 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task.connection;
2121

22-
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
2322
import org.apache.iotdb.commons.pipe.agent.task.connection.UnboundedBlockingPendingQueue;
2423
import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
2524
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
@@ -36,13 +35,11 @@
3635
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.DeleteDataNode;
3736
import org.apache.iotdb.pipe.api.collector.EventCollector;
3837
import org.apache.iotdb.pipe.api.event.Event;
39-
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
4038
import org.apache.iotdb.pipe.api.exception.PipeException;
4139

4240
import org.slf4j.Logger;
4341
import org.slf4j.LoggerFactory;
4442

45-
import java.util.Iterator;
4643
import java.util.concurrent.atomic.AtomicInteger;
4744

4845
public class PipeEventCollector implements EventCollector {
@@ -144,32 +141,8 @@ private void parseAndCollectEvent(final PipeTsFileInsertionEvent sourceEvent) th
144141
}
145142

146143
try {
147-
final Iterable<TabletInsertionEvent> iterable = sourceEvent.toTabletInsertionEvents();
148-
final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
149-
while (iterator.hasNext()) {
150-
final TabletInsertionEvent parsedEvent = iterator.next();
151-
int retryCount = 0;
152-
while (true) {
153-
try {
154-
collectParsedRawTableEvent((PipeRawTabletInsertionEvent) parsedEvent);
155-
break;
156-
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
157-
if (retryCount++ % 100 == 0) {
158-
LOGGER.warn(
159-
"parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.",
160-
sourceEvent.getTsFile(),
161-
retryCount,
162-
e);
163-
} else if (LOGGER.isDebugEnabled()) {
164-
LOGGER.debug(
165-
"parseAndCollectEvent: failed to allocate memory for parsing TsFile {}, retry count is {}, will keep retrying.",
166-
sourceEvent.getTsFile(),
167-
retryCount,
168-
e);
169-
}
170-
}
171-
}
172-
}
144+
sourceEvent.consumeTabletInsertionEventsWithRetry(
145+
this::collectParsedRawTableEvent, "PipeEventCollector::parseAndCollectEvent");
173146
} finally {
174147
sourceEvent.close();
175148
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,18 @@ protected boolean executeOnce() throws Exception {
149149
&& ((PipeTsFileInsertionEvent) event).shouldParse4Privilege()) {
150150
try (final PipeTsFileInsertionEvent tsFileInsertionEvent =
151151
(PipeTsFileInsertionEvent) event) {
152-
for (final TabletInsertionEvent tabletInsertionEvent :
153-
tsFileInsertionEvent.toTabletInsertionEvents()) {
154-
pipeProcessor.process(tabletInsertionEvent, outputEventCollector);
152+
final AtomicReference<Exception> ex = new AtomicReference<>();
153+
tsFileInsertionEvent.consumeTabletInsertionEventsWithRetry(
154+
event1 -> {
155+
try {
156+
pipeProcessor.process(event1, outputEventCollector);
157+
} catch (Exception e) {
158+
ex.set(e);
159+
}
160+
},
161+
"PipeProcessorSubtask::executeOnce");
162+
if (ex.get() != null) {
163+
throw ex.get();
155164
}
156165
}
157166
} else {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/websocket/WebSocketConnector.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,14 @@ public void transfer(TsFileInsertionEvent tsFileInsertionEvent) throws Exception
141141
}
142142

143143
try {
144-
for (TabletInsertionEvent event : tsFileInsertionEvent.toTabletInsertionEvents()) {
145-
// Skip report if any tablet events is added
146-
((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
147-
transfer(event);
148-
}
144+
((PipeTsFileInsertionEvent) tsFileInsertionEvent)
145+
.consumeTabletInsertionEventsWithRetry(
146+
event -> {
147+
// Skip report if any tablet events is added
148+
((PipeTsFileInsertionEvent) tsFileInsertionEvent).skipReportOnCommit();
149+
transfer(event);
150+
},
151+
"WebSocketConnector::transfer");
149152
} finally {
150153
tsFileInsertionEvent.close();
151154
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java

Lines changed: 56 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2323
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2424
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
25+
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
2526
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeTaskMeta;
2627
import org.apache.iotdb.commons.pipe.config.PipeConfig;
2728
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
@@ -55,11 +56,13 @@
5556
import java.io.File;
5657
import java.io.IOException;
5758
import java.util.Collections;
59+
import java.util.Iterator;
5860
import java.util.Map;
5961
import java.util.Objects;
6062
import java.util.Set;
6163
import java.util.concurrent.atomic.AtomicBoolean;
6264
import java.util.concurrent.atomic.AtomicInteger;
65+
import java.util.concurrent.atomic.AtomicLong;
6366
import java.util.concurrent.atomic.AtomicReference;
6467

6568
import static org.apache.tsfile.common.constant.TsFileConstant.PATH_ROOT;
@@ -562,6 +565,49 @@ public boolean isTableModelEvent() {
562565

563566
/////////////////////////// TsFileInsertionEvent ///////////////////////////
564567

568+
@FunctionalInterface
569+
public interface TabletInsertionEventConsumer {
570+
void consume(final PipeRawTabletInsertionEvent event);
571+
}
572+
573+
public void consumeTabletInsertionEventsWithRetry(
574+
final TabletInsertionEventConsumer consumer, final String callerName) throws PipeException {
575+
final Iterable<TabletInsertionEvent> iterable = toTabletInsertionEvents();
576+
final Iterator<TabletInsertionEvent> iterator = iterable.iterator();
577+
int tabletEventCount = 0;
578+
while (iterator.hasNext()) {
579+
final TabletInsertionEvent parsedEvent = iterator.next();
580+
tabletEventCount++;
581+
int retryCount = 0;
582+
while (true) {
583+
// If failed due do insufficient memory, retry until success to avoid race among multiple
584+
// processor threads
585+
try {
586+
consumer.consume((PipeRawTabletInsertionEvent) parsedEvent);
587+
break;
588+
} catch (final PipeRuntimeOutOfMemoryCriticalException e) {
589+
if (retryCount++ % 100 == 0) {
590+
LOGGER.warn(
591+
"{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.",
592+
callerName,
593+
getTsFile(),
594+
tabletEventCount,
595+
retryCount,
596+
e);
597+
} else if (LOGGER.isDebugEnabled()) {
598+
LOGGER.debug(
599+
"{}: failed to allocate memory for parsing TsFile {}, tablet event no. {}, retry count is {}, will keep retrying.",
600+
callerName,
601+
getTsFile(),
602+
tabletEventCount,
603+
retryCount,
604+
e);
605+
}
606+
}
607+
}
608+
}
609+
}
610+
565611
@Override
566612
public Iterable<TabletInsertionEvent> toTabletInsertionEvents() throws PipeException {
567613
// 20 - 40 seconds for waiting
@@ -685,18 +731,19 @@ private TsFileInsertionEventParser initEventParser() {
685731
}
686732

687733
public long count(final boolean skipReportOnCommit) throws IOException {
688-
long count = 0;
734+
AtomicLong count = new AtomicLong();
689735

690736
if (shouldParseTime()) {
691737
try {
692-
for (final TabletInsertionEvent event : toTabletInsertionEvents()) {
693-
final PipeRawTabletInsertionEvent rawEvent = ((PipeRawTabletInsertionEvent) event);
694-
count += rawEvent.count();
695-
if (skipReportOnCommit) {
696-
rawEvent.skipReportOnCommit();
697-
}
698-
}
699-
return count;
738+
consumeTabletInsertionEventsWithRetry(
739+
event -> {
740+
count.addAndGet(event.count());
741+
if (skipReportOnCommit) {
742+
event.skipReportOnCommit();
743+
}
744+
},
745+
"PipeTsFileInsertionEvent::count");
746+
return count.get();
700747
} finally {
701748
close();
702749
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,9 +524,26 @@ public void process(
524524
final TsFileInsertionEvent tsFileInsertionEvent, final EventCollector eventCollector)
525525
throws Exception {
526526
try {
527-
for (final TabletInsertionEvent tabletInsertionEvent :
528-
tsFileInsertionEvent.toTabletInsertionEvents()) {
529-
process(tabletInsertionEvent, eventCollector);
527+
if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
528+
final AtomicReference<Exception> ex = new AtomicReference<>();
529+
((PipeTsFileInsertionEvent) tsFileInsertionEvent)
530+
.consumeTabletInsertionEventsWithRetry(
531+
event -> {
532+
try {
533+
process(event, eventCollector);
534+
} catch (Exception e) {
535+
ex.set(e);
536+
}
537+
},
538+
"AggregateProcessor::process");
539+
if (ex.get() != null) {
540+
throw ex.get();
541+
}
542+
} else {
543+
for (final TabletInsertionEvent tabletInsertionEvent :
544+
tsFileInsertionEvent.toTabletInsertionEvents()) {
545+
process(tabletInsertionEvent, eventCollector);
546+
}
530547
}
531548
} finally {
532549
tsFileInsertionEvent.close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/downsampling/DownSamplingProcessor.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
2424
import org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
2525
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
26+
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2627
import org.apache.iotdb.db.storageengine.StorageEngine;
2728
import org.apache.iotdb.pipe.api.PipeProcessor;
2829
import org.apache.iotdb.pipe.api.access.Row;
@@ -45,7 +46,6 @@
4546
import static org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant.PROCESSOR_DOWN_SAMPLING_SPLIT_FILE_KEY;
4647

4748
public abstract class DownSamplingProcessor implements PipeProcessor {
48-
4949
protected long memoryLimitInBytes;
5050

5151
protected boolean shouldSplitFile;
@@ -149,9 +149,26 @@ public void process(TsFileInsertionEvent tsFileInsertionEvent, EventCollector ev
149149
throws Exception {
150150
if (shouldSplitFile) {
151151
try {
152-
for (final TabletInsertionEvent tabletInsertionEvent :
153-
tsFileInsertionEvent.toTabletInsertionEvents()) {
154-
process(tabletInsertionEvent, eventCollector);
152+
if (tsFileInsertionEvent instanceof PipeTsFileInsertionEvent) {
153+
final AtomicReference<Exception> ex = new AtomicReference<>();
154+
((PipeTsFileInsertionEvent) tsFileInsertionEvent)
155+
.consumeTabletInsertionEventsWithRetry(
156+
event -> {
157+
try {
158+
process(event, eventCollector);
159+
} catch (Exception e) {
160+
ex.set(e);
161+
}
162+
},
163+
"DownSamplingProcessor::process");
164+
if (ex.get() != null) {
165+
throw ex.get();
166+
}
167+
} else {
168+
for (final TabletInsertionEvent tabletInsertionEvent :
169+
tsFileInsertionEvent.toTabletInsertionEvents()) {
170+
process(tabletInsertionEvent, eventCollector);
171+
}
155172
}
156173
} finally {
157174
tsFileInsertionEvent.close();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/subscription/event/batch/SubscriptionPipeTsFileEventBatch.java

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
2323
import org.apache.iotdb.db.pipe.connector.payload.evolvable.batch.PipeTabletEventTsFileBatch;
24-
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
2524
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
2625
import org.apache.iotdb.db.subscription.broker.SubscriptionPrefetchingTsFileQueue;
2726
import org.apache.iotdb.db.subscription.event.SubscriptionEvent;
@@ -96,20 +95,22 @@ protected void onTabletInsertionEvent(final TabletInsertionEvent event) {
9695
protected void onTsFileInsertionEvent(final TsFileInsertionEvent event) {
9796
// TODO: parse tsfile event on the fly like SubscriptionPipeTabletEventBatch
9897
try {
99-
for (final TabletInsertionEvent parsedEvent : event.toTabletInsertionEvents()) {
100-
if (!((PipeRawTabletInsertionEvent) parsedEvent)
101-
.increaseReferenceCount(this.getClass().getName())) {
102-
LOGGER.warn(
103-
"SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.",
104-
((PipeRawTabletInsertionEvent) parsedEvent).coreReportMessage());
105-
} else {
106-
try {
107-
batch.onEvent(parsedEvent);
108-
} catch (final Exception ignored) {
109-
// no exceptions will be thrown
110-
}
111-
}
112-
}
98+
((PipeTsFileInsertionEvent) event)
99+
.consumeTabletInsertionEventsWithRetry(
100+
event1 -> {
101+
if (!event1.increaseReferenceCount(this.getClass().getName())) {
102+
LOGGER.warn(
103+
"SubscriptionPipeTsFileEventBatch: Failed to increase the reference count of event {}, skipping it.",
104+
event1.coreReportMessage());
105+
} else {
106+
try {
107+
batch.onEvent(event1);
108+
} catch (final Exception ignored) {
109+
// no exceptions will be thrown
110+
}
111+
}
112+
},
113+
"SubscriptionPipeTsFileEventBatch::onTsFileInsertionEvent");
113114
} finally {
114115
try {
115116
event.close();

0 commit comments

Comments
 (0)