Skip to content

Commit 5bc4779

Browse files
authored
Pipe: Fix NullPointerException in concurrent event access (apache#16849)
* fix * fix * fix * fix * fix
1 parent db02437 commit 5bc4779

File tree

2 files changed

+53
-27
lines changed

2 files changed

+53
-27
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ protected boolean executeOnce() {
142142
throw e;
143143
} else {
144144
LOGGER.info(
145-
"{} in pipe transfer, ignored because the connector subtask is dropped.",
145+
"{} in pipe transfer, ignored because the connector subtask is dropped.{}",
146146
e.getClass().getSimpleName(),
147-
e);
147+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
148148
clearReferenceCountAndReleaseLastEvent(event);
149149
}
150150
} catch (final Exception e) {
@@ -161,7 +161,8 @@ protected boolean executeOnce() {
161161
e);
162162
} else {
163163
LOGGER.info(
164-
"Exception in pipe transfer, ignored because the connector subtask is dropped.", e);
164+
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
165+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
165166
clearReferenceCountAndReleaseLastEvent(event);
166167
}
167168
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 49 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -157,16 +157,20 @@ public InsertNode getInsertNode() {
157157
}
158158

159159
public ByteBuffer getByteBuffer() throws WALPipeException {
160-
return insertNode.serializeToByteBuffer();
160+
final InsertNode node = insertNode;
161+
if (Objects.isNull(node)) {
162+
throw new PipeException("InsertNode has been released");
163+
}
164+
return node.serializeToByteBuffer();
161165
}
162166

163167
public String getDeviceId() {
164-
if (Objects.isNull(insertNode)) {
168+
final InsertNode node = insertNode;
169+
if (Objects.isNull(node)) {
165170
return null;
166171
}
167-
return Objects.nonNull(insertNode.getTargetPath())
168-
? insertNode.getTargetPath().getFullPath()
169-
: null;
172+
final PartialPath targetPath = node.getTargetPath();
173+
return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
170174
}
171175

172176
public long getExtractTime() {
@@ -244,10 +248,14 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
244248
final boolean skipIfNoPrivileges,
245249
final long startTime,
246250
final long endTime) {
251+
final InsertNode node = insertNode;
252+
if (Objects.isNull(node)) {
253+
throw new PipeException("InsertNode has been released");
254+
}
247255
return new PipeInsertNodeTabletInsertionEvent(
248256
getRawIsTableModelEvent(),
249257
getSourceDatabaseNameFromDataRegion(),
250-
insertNode,
258+
node,
251259
pipeName,
252260
creationTime,
253261
pipeTaskMeta,
@@ -263,25 +271,34 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
263271

264272
@Override
265273
public boolean isGeneratedByPipe() {
266-
return insertNode.isGeneratedByPipe();
274+
final InsertNode node = insertNode;
275+
if (Objects.isNull(node)) {
276+
throw new PipeException("InsertNode has been released");
277+
}
278+
return node.isGeneratedByPipe();
267279
}
268280

269281
@Override
270282
public void throwIfNoPrivilege() {
271283
if (skipIfNoPrivileges || !isTableModelEvent()) {
272284
return;
273285
}
274-
if (Objects.nonNull(insertNode.getTargetPath())) {
275-
checkTableName(
276-
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
277-
} else if (insertNode instanceof InsertRowsNode) {
286+
final InsertNode node = insertNode;
287+
if (Objects.isNull(node)) {
288+
// Event is released, skip privilege check
289+
return;
290+
}
291+
final PartialPath targetPath = node.getTargetPath();
292+
if (Objects.nonNull(targetPath)) {
293+
checkTableName(DeviceIDFactory.getInstance().getDeviceID(targetPath).getTableName());
294+
} else if (node instanceof InsertRowsNode) {
278295
for (final String tableName :
279-
((InsertRowsNode) insertNode)
296+
((InsertRowsNode) node)
280297
.getInsertRowNodeList().stream()
281298
.map(
282-
node ->
299+
insertRowNode ->
283300
DeviceIDFactory.getInstance()
284-
.getDeviceID(node.getTargetPath())
301+
.getDeviceID(insertRowNode.getTargetPath())
285302
.getTableName())
286303
.collect(Collectors.toSet())) {
287304
checkTableName(tableName);
@@ -443,6 +460,9 @@ private List<TabletInsertionEventParser> initEventParsers() {
443460

444461
eventParsers = new ArrayList<>();
445462
final InsertNode node = getInsertNode();
463+
if (Objects.isNull(node)) {
464+
throw new PipeException("InsertNode has been released");
465+
}
446466
switch (node.getType()) {
447467
case INSERT_ROW:
448468
case INSERT_TABLET:
@@ -526,23 +546,25 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
526546

527547
@Override
528548
public String toString() {
549+
final InsertNode node = insertNode;
529550
return String.format(
530551
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}",
531552
progressIndex,
532-
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
533-
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null,
553+
Objects.nonNull(node) ? node.isAligned() : null,
554+
Objects.nonNull(node) ? node.isGeneratedByPipe() : null,
534555
eventParsers)
535556
+ " - "
536557
+ super.toString();
537558
}
538559

539560
@Override
540561
public String coreReportMessage() {
562+
final InsertNode node = insertNode;
541563
return String.format(
542564
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
543565
progressIndex,
544-
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
545-
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null)
566+
Objects.nonNull(node) ? node.isAligned() : null,
567+
Objects.nonNull(node) ? node.isGeneratedByPipe() : null)
546568
+ " - "
547569
+ super.coreReportMessage();
548570
}
@@ -567,12 +589,15 @@ public PipeEventResource eventResourceBuilder() {
567589
// invoked, the event will soon be released.
568590
@Override
569591
public long ramBytesUsed() {
570-
return bytes > 0
571-
? bytes
572-
: (bytes =
573-
INSTANCE_SIZE
574-
+ (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
575-
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0));
592+
if (bytes > 0) {
593+
return bytes;
594+
}
595+
final InsertNode node = insertNode;
596+
bytes =
597+
INSTANCE_SIZE
598+
+ (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0)
599+
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
600+
return bytes;
576601
}
577602

578603
private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {

0 commit comments

Comments
 (0)