Skip to content

Commit 7806f84

Browse files
Pipe: Allow STATEMENT_STATUS_VISITOR and STATEMENT_EXCEPTION_VISITOR for data conversion statements on data sync receiver side (apache#14220)
Co-authored-by: Steve Yurong Su <[email protected]>
1 parent b34b714 commit 7806f84

File tree

3 files changed

+31
-8
lines changed

3 files changed

+31
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,9 +138,9 @@ public class IoTDBDataNodeReceiver extends IoTDBFileReceiver {
138138

139139
public static final PipePlanToStatementVisitor PLAN_TO_STATEMENT_VISITOR =
140140
new PipePlanToStatementVisitor();
141-
private static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
141+
public static final PipeStatementTSStatusVisitor STATEMENT_STATUS_VISITOR =
142142
new PipeStatementTSStatusVisitor();
143-
private static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR =
143+
public static final PipeStatementExceptionVisitor STATEMENT_EXCEPTION_VISITOR =
144144
new PipeStatementExceptionVisitor();
145145
private static final PipeStatementPatternParseVisitor STATEMENT_PATTERN_PARSE_VISITOR =
146146
new PipeStatementPatternParseVisitor();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 27 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
2424
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
2525
import org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
26+
import org.apache.iotdb.db.pipe.receiver.protocol.thrift.IoTDBDataNodeReceiver;
2627
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertRowStatement;
2728
import org.apache.iotdb.db.pipe.receiver.transform.statement.PipeConvertedInsertTabletStatement;
2829
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
@@ -105,15 +106,36 @@ file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
105106
PipeTransferTabletRawReq.toTPipeTransferRawReq(
106107
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
107108
.constructStatement());
108-
TSStatus result = statementExecutor.execute(statement);
109109

110-
// Retry once if the write process is rejected
111-
if (result.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
112-
result = statementExecutor.execute(statement);
110+
TSStatus result;
111+
try {
112+
result =
113+
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
114+
statement, statementExecutor.execute(statement));
115+
116+
// Retry max 5 times if the write process is rejected
117+
for (int i = 0;
118+
i < 5
119+
&& result.getCode()
120+
== TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION
121+
.getStatusCode();
122+
i++) {
123+
Thread.sleep(100L * (i + 1));
124+
result =
125+
IoTDBDataNodeReceiver.STATEMENT_STATUS_VISITOR.visitStatement(
126+
statement, statementExecutor.execute(statement));
127+
}
128+
} catch (final Exception e) {
129+
if (e instanceof InterruptedException) {
130+
Thread.currentThread().interrupt();
131+
}
132+
result = statement.accept(IoTDBDataNodeReceiver.STATEMENT_EXCEPTION_VISITOR, e);
113133
}
114134

115135
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
116-
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode())) {
136+
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
137+
|| result.getCode()
138+
== TSStatusCode.PIPE_RECEIVER_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
117139
return Optional.empty();
118140
}
119141
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/visitor/PipeStatementTSStatusVisitor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public TSStatus visitInsertMultiTablets(
9797

9898
private TSStatus visitInsertBase(
9999
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
100-
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()) {
100+
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()
101+
|| context.getCode() == TSStatusCode.WRITE_PROCESS_REJECT.getStatusCode()) {
101102
return new TSStatus(
102103
TSStatusCode.PIPE_RECEIVER_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode())
103104
.setMessage(context.getMessage());

0 commit comments

Comments
 (0)