Skip to content

Commit d5c1834

Browse files
authored
Pipe: Fix NullPointerException in concurrent event access (#16849) (#16872)
* fix * fix * fix * fix * fix (cherry picked from commit 5bc4779)
1 parent 80d5aea commit d5c1834

File tree

2 files changed

+29
-8
lines changed

2 files changed

+29
-8
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ protected boolean executeOnce() {
142142
throw e;
143143
} else {
144144
LOGGER.info(
145-
"{} in pipe transfer, ignored because the connector subtask is dropped.",
145+
"{} in pipe transfer, ignored because the connector subtask is dropped.{}",
146146
e.getClass().getSimpleName(),
147-
e);
147+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
148148
clearReferenceCountAndReleaseLastEvent(event);
149149
}
150150
} catch (final Exception e) {
@@ -161,7 +161,8 @@ protected boolean executeOnce() {
161161
e);
162162
} else {
163163
LOGGER.info(
164-
"Exception in pipe transfer, ignored because the connector subtask is dropped.", e);
164+
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
165+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
165166
clearReferenceCountAndReleaseLastEvent(event);
166167
}
167168
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,13 +106,20 @@ public InsertNode getInsertNode() {
106106
}
107107

108108
public ByteBuffer getByteBuffer() throws WALPipeException {
109-
return insertNode.serializeToByteBuffer();
109+
final InsertNode node = insertNode;
110+
if (Objects.isNull(node)) {
111+
throw new PipeException("InsertNode has been released");
112+
}
113+
return node.serializeToByteBuffer();
110114
}
111115

112116
public String getDeviceId() {
113-
return Objects.nonNull(insertNode.getDevicePath())
114-
? insertNode.getDevicePath().getFullPath()
115-
: null;
117+
final InsertNode node = insertNode;
118+
if (Objects.isNull(node)) {
119+
return null;
120+
}
121+
final PartialPath targetPath = node.getDevicePath();
122+
return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
116123
}
117124

118125
public long getExtractTime() {
@@ -185,13 +192,21 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
185192
final PipePattern pattern,
186193
final long startTime,
187194
final long endTime) {
195+
final InsertNode node = insertNode;
196+
if (Objects.isNull(node)) {
197+
throw new PipeException("InsertNode has been released");
198+
}
188199
return new PipeInsertNodeTabletInsertionEvent(
189200
insertNode, pipeName, creationTime, pipeTaskMeta, pattern, startTime, endTime);
190201
}
191202

192203
@Override
193204
public boolean isGeneratedByPipe() {
194-
return insertNode.isGeneratedByPipe();
205+
final InsertNode node = insertNode;
206+
if (Objects.isNull(node)) {
207+
throw new PipeException("InsertNode has been released");
208+
}
209+
return node.isGeneratedByPipe();
195210
}
196211

197212
@Override
@@ -328,6 +343,9 @@ private List<TabletInsertionDataContainer> initDataContainers() {
328343

329344
dataContainers = new ArrayList<>();
330345
final InsertNode node = getInsertNode();
346+
if (Objects.isNull(node)) {
347+
throw new PipeException("InsertNode has been released");
348+
}
331349
switch (node.getType()) {
332350
case INSERT_ROW:
333351
case INSERT_TABLET:
@@ -393,6 +411,7 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
393411

394412
@Override
395413
public String toString() {
414+
final InsertNode insertNode = this.insertNode;
396415
return String.format(
397416
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, dataContainers=%s}",
398417
progressIndex,
@@ -405,6 +424,7 @@ public String toString() {
405424

406425
@Override
407426
public String coreReportMessage() {
427+
final InsertNode insertNode = this.insertNode;
408428
return String.format(
409429
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
410430
progressIndex,

0 commit comments

Comments
 (0)