Skip to content

Commit 705f73d

Browse files
committed
f
2 parents 3d6ff40 + 5bc4779 commit 705f73d

File tree

2 files changed

+52
-26
lines changed

2 files changed

+52
-26
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/sink/PipeSinkSubtask.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ protected boolean executeOnce() {
142142
throw e;
143143
} else {
144144
LOGGER.info(
145-
"{} in pipe transfer, ignored because the connector subtask is dropped.",
145+
"{} in pipe transfer, ignored because the connector subtask is dropped.{}",
146146
e.getClass().getSimpleName(),
147-
e);
147+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
148148
clearReferenceCountAndReleaseLastEvent(event);
149149
}
150150
} catch (final Exception e) {
@@ -161,7 +161,8 @@ protected boolean executeOnce() {
161161
e);
162162
} else {
163163
LOGGER.info(
164-
"Exception in pipe transfer, ignored because the connector subtask is dropped.", e);
164+
"Exception in pipe transfer, ignored because the connector subtask is dropped.{}",
165+
e.getMessage() != null ? " Message: " + e.getMessage() : "");
165166
clearReferenceCountAndReleaseLastEvent(event);
166167
}
167168
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tablet/PipeInsertNodeTabletInsertionEvent.java

Lines changed: 48 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -163,16 +163,20 @@ public InsertNode getInsertNode() {
163163
}
164164

165165
public ByteBuffer getByteBuffer() throws WALPipeException {
166-
return insertNode.serializeToByteBuffer();
166+
final InsertNode node = insertNode;
167+
if (Objects.isNull(node)) {
168+
throw new PipeException("InsertNode has been released");
169+
}
170+
return node.serializeToByteBuffer();
167171
}
168172

169173
public String getDeviceId() {
170-
if (Objects.isNull(insertNode)) {
174+
final InsertNode node = insertNode;
175+
if (Objects.isNull(node)) {
171176
return null;
172177
}
173-
return Objects.nonNull(insertNode.getTargetPath())
174-
? insertNode.getTargetPath().getFullPath()
175-
: null;
178+
final PartialPath targetPath = node.getTargetPath();
179+
return Objects.nonNull(targetPath) ? targetPath.getFullPath() : null;
176180
}
177181

178182
public long getExtractTime() {
@@ -250,10 +254,14 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
250254
final boolean skipIfNoPrivileges,
251255
final long startTime,
252256
final long endTime) {
257+
final InsertNode node = insertNode;
258+
if (Objects.isNull(node)) {
259+
throw new PipeException("InsertNode has been released");
260+
}
253261
return new PipeInsertNodeTabletInsertionEvent(
254262
getRawIsTableModelEvent(),
255263
getSourceDatabaseNameFromDataRegion(),
256-
insertNode,
264+
node,
257265
pipeName,
258266
creationTime,
259267
pipeTaskMeta,
@@ -269,28 +277,37 @@ public PipeInsertNodeTabletInsertionEvent shallowCopySelfAndBindPipeTaskMetaForP
269277

270278
@Override
271279
public boolean isGeneratedByPipe() {
272-
return insertNode.isGeneratedByPipe();
280+
final InsertNode node = insertNode;
281+
if (Objects.isNull(node)) {
282+
throw new PipeException("InsertNode has been released");
283+
}
284+
return node.isGeneratedByPipe();
273285
}
274286

275287
@Override
276288
public void throwIfNoPrivilege() throws Exception {
277289
if (skipIfNoPrivileges) {
278290
return;
279291
}
280-
if (Objects.nonNull(insertNode.getTargetPath())) {
292+
final InsertNode node = insertNode;
293+
if (Objects.isNull(node)) {
294+
// Event is released, skip privilege check
295+
return;
296+
}
297+
if (Objects.nonNull(node.getTargetPath())) {
281298
if (isTableModelEvent()) {
282299
checkTableName(
283-
DeviceIDFactory.getInstance().getDeviceID(insertNode.getTargetPath()).getTableName());
300+
DeviceIDFactory.getInstance().getDeviceID(node.getTargetPath()).getTableName());
284301
} else {
285-
checkTreePattern(insertNode.getDeviceID(), insertNode.getMeasurements());
302+
checkTreePattern(node.getDeviceID(), node.getMeasurements());
286303
}
287304
} else if (insertNode instanceof InsertRowsNode) {
288-
for (final InsertNode node : ((InsertRowsNode) insertNode).getInsertRowNodeList()) {
305+
for (final InsertNode subNode : ((InsertRowsNode) node).getInsertRowNodeList()) {
289306
if (isTableModelEvent()) {
290307
checkTableName(
291-
DeviceIDFactory.getInstance().getDeviceID(node.getTargetPath()).getTableName());
308+
DeviceIDFactory.getInstance().getDeviceID(subNode.getTargetPath()).getTableName());
292309
} else {
293-
checkTreePattern(node.getDeviceID(), node.getMeasurements());
310+
checkTreePattern(subNode.getDeviceID(), subNode.getMeasurements());
294311
}
295312
}
296313
}
@@ -473,6 +490,9 @@ private List<TabletInsertionEventParser> initEventParsers() {
473490

474491
eventParsers = new ArrayList<>();
475492
final InsertNode node = getInsertNode();
493+
if (Objects.isNull(node)) {
494+
throw new PipeException("InsertNode has been released");
495+
}
476496
switch (node.getType()) {
477497
case INSERT_ROW:
478498
case INSERT_TABLET:
@@ -569,23 +589,25 @@ public List<PipeRawTabletInsertionEvent> toRawTabletInsertionEvents() {
569589

570590
@Override
571591
public String toString() {
592+
final InsertNode node = insertNode;
572593
return String.format(
573594
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s, eventParsers=%s}",
574595
progressIndex,
575-
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
576-
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null,
596+
Objects.nonNull(node) ? node.isAligned() : null,
597+
Objects.nonNull(node) ? node.isGeneratedByPipe() : null,
577598
eventParsers)
578599
+ " - "
579600
+ super.toString();
580601
}
581602

582603
@Override
583604
public String coreReportMessage() {
605+
final InsertNode node = insertNode;
584606
return String.format(
585607
"PipeInsertNodeTabletInsertionEvent{progressIndex=%s, isAligned=%s, isGeneratedByPipe=%s}",
586608
progressIndex,
587-
Objects.nonNull(insertNode) ? insertNode.isAligned() : null,
588-
Objects.nonNull(insertNode) ? insertNode.isGeneratedByPipe() : null)
609+
Objects.nonNull(node) ? node.isAligned() : null,
610+
Objects.nonNull(node) ? node.isGeneratedByPipe() : null)
589611
+ " - "
590612
+ super.coreReportMessage();
591613
}
@@ -610,12 +632,15 @@ public PipeEventResource eventResourceBuilder() {
610632
// invoked, the event will soon be released.
611633
@Override
612634
public long ramBytesUsed() {
613-
return bytes > 0
614-
? bytes
615-
: (bytes =
616-
INSTANCE_SIZE
617-
+ (Objects.nonNull(insertNode) ? InsertNodeMemoryEstimator.sizeOf(insertNode) : 0)
618-
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0));
635+
if (bytes > 0) {
636+
return bytes;
637+
}
638+
final InsertNode node = insertNode;
639+
bytes =
640+
INSTANCE_SIZE
641+
+ (Objects.nonNull(node) ? InsertNodeMemoryEstimator.sizeOf(node) : 0)
642+
+ (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
643+
return bytes;
619644
}
620645

621646
private static class PipeInsertNodeTabletInsertionEventResource extends PipeEventResource {

0 commit comments

Comments
 (0)