Skip to content

Commit 0fa9fa1

Browse files
authored
[IoTConsensusV2] Fix NPE when transfer tsfile mods #14840
1 parent b990b9e commit 0fa9fa1

File tree

2 files changed

+30
-29
lines changed

2 files changed

+30
-29
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ public class PipeConsensusReceiver {
105105
private final List<String> receiverBaseDirsName;
106106
// Used to buffer TsFile when transfer TsFile asynchronously.
107107
private final PipeConsensusTsFileWriterPool pipeConsensusTsFileWriterPool;
108-
private final AtomicReference<File> receiverFileDirWithIdSuffix = new AtomicReference<>();
108+
private final AtomicReference<File> receiverFileBaseDir = new AtomicReference<>();
109109
private final PipeConsensusReceiverMetrics pipeConsensusReceiverMetrics;
110110
private final FolderManager folderManager;
111111

@@ -137,8 +137,7 @@ public PipeConsensusReceiver(
137137
try {
138138
initiateTsFileBufferFolder();
139139
this.pipeConsensusTsFileWriterPool =
140-
new PipeConsensusTsFileWriterPool(
141-
consensusPipeName, receiverFileDirWithIdSuffix.get().getPath());
140+
new PipeConsensusTsFileWriterPool(consensusPipeName, receiverFileBaseDir.get().getPath());
142141
} catch (Exception e) {
143142
LOGGER.error("Fail to initiate file buffer folder, Error msg: {}", e.getMessage());
144143
throw new RuntimeException(e);
@@ -509,9 +508,11 @@ private TPipeConsensusTransferResp handleTransferFileSealWithMods(
509508
File writingFile = tsFileWriter.getWritingFile();
510509
RandomAccessFile writingFileWriter = tsFileWriter.getWritingFileWriter();
511510

511+
File currentWritingDirPath = new File(tsFileWriter.getLocalWritingDirPath());
512+
512513
final List<File> files =
513514
req.getFileNames().stream()
514-
.map(fileName -> new File(receiverFileDirWithIdSuffix.get(), fileName))
515+
.map(fileName -> new File(currentWritingDirPath, fileName))
515516
.collect(Collectors.toList());
516517
try {
517518
if (isWritingFileNonAvailable(tsFileWriter)) {
@@ -607,7 +608,7 @@ private TPipeConsensusTransferResp handleTransferFileSealWithMods(
607608
closeCurrentWritingFileWriter(tsFileWriter, false);
608609
// Clear the directory instead of only deleting the referenced files in seal request
609610
// to avoid previously undeleted file being redundant when transferring multi files
610-
IoTDBReceiverAgent.cleanPipeReceiverDir(receiverFileDirWithIdSuffix.get());
611+
IoTDBReceiverAgent.cleanPipeReceiverDir(currentWritingDirPath);
611612
}
612613
}
613614

@@ -812,13 +813,13 @@ private boolean isWritingFileOffsetNonCorrect(
812813
}
813814

814815
private void closeCurrentWritingFileWriter(
815-
PipeConsensusTsFileWriter tsFileWriter, boolean fsyncAfterClose) {
816+
PipeConsensusTsFileWriter tsFileWriter, boolean fsyncBeforeClose) {
816817
if (tsFileWriter.getWritingFileWriter() != null) {
817818
try {
818-
tsFileWriter.getWritingFileWriter().close();
819-
if (fsyncAfterClose) {
819+
if (fsyncBeforeClose) {
820820
tsFileWriter.getWritingFileWriter().getFD().sync();
821821
}
822+
tsFileWriter.getWritingFileWriter().close();
822823
LOGGER.info(
823824
"PipeConsensus-PipeName-{}: Current writing file writer {} was closed.",
824825
consensusPipeName,
@@ -909,17 +910,17 @@ private void updateWritingFileIfNeeded(
909910
// Make sure receiver file dir exists
910911
// This may be useless, because receiver file dir is created when receiver is initiated. just in
911912
// case.
912-
if (!receiverFileDirWithIdSuffix.get().exists()) {
913-
if (receiverFileDirWithIdSuffix.get().mkdirs()) {
913+
if (!receiverFileBaseDir.get().exists()) {
914+
if (receiverFileBaseDir.get().mkdirs()) {
914915
LOGGER.info(
915916
"PipeConsensus-PipeName-{}: Receiver file dir {} was created.",
916917
consensusPipeName,
917-
receiverFileDirWithIdSuffix.get().getPath());
918+
receiverFileBaseDir.get().getPath());
918919
} else {
919920
LOGGER.error(
920921
"PipeConsensus-PipeName-{}: Failed to create receiver file dir {}.",
921922
consensusPipeName,
922-
receiverFileDirWithIdSuffix.get().getPath());
923+
receiverFileBaseDir.get().getPath());
923924
}
924925
}
925926
// Every tsFileWriter has its own writing path.
@@ -939,23 +940,23 @@ private String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
939940

940941
private void initiateTsFileBufferFolder() throws DiskSpaceInsufficientException, IOException {
941942
// Clear the original receiver file dir if exists
942-
if (receiverFileDirWithIdSuffix.get() != null) {
943-
if (receiverFileDirWithIdSuffix.get().exists()) {
943+
if (receiverFileBaseDir.get() != null) {
944+
if (receiverFileBaseDir.get().exists()) {
944945
try {
945946
RetryUtils.retryOnException(
946947
() -> {
947-
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
948+
FileUtils.deleteDirectory(receiverFileBaseDir.get());
948949
return null;
949950
});
950951
LOGGER.info(
951952
"PipeConsensus-PipeName-{}: Original receiver file dir {} was deleted successfully.",
952953
consensusPipeName,
953-
receiverFileDirWithIdSuffix.get().getPath());
954+
receiverFileBaseDir.get().getPath());
954955
} catch (IOException e) {
955956
LOGGER.warn(
956957
"PipeConsensus-PipeName-{}: Failed to delete original receiver file dir {}, because {}.",
957958
consensusPipeName,
958-
receiverFileDirWithIdSuffix.get().getPath(),
959+
receiverFileBaseDir.get().getPath(),
959960
e.getMessage(),
960961
e);
961962
}
@@ -964,10 +965,10 @@ private void initiateTsFileBufferFolder() throws DiskSpaceInsufficientException,
964965
LOGGER.debug(
965966
"PipeConsensus-PipeName-{}: Original receiver file dir {} is not existed. No need to delete.",
966967
consensusPipeName,
967-
receiverFileDirWithIdSuffix.get().getPath());
968+
receiverFileBaseDir.get().getPath());
968969
}
969970
}
970-
receiverFileDirWithIdSuffix.set(null);
971+
receiverFileBaseDir.set(null);
971972
} else {
972973
LOGGER.debug(
973974
"PipeConsensus-PipeName-{}: Current receiver file dir is null. No need to delete.",
@@ -1028,7 +1029,7 @@ private void initiateTsFileBufferFolder() throws DiskSpaceInsufficientException,
10281029
"PipeConsensus-PipeName-%s: Failed to create receiver file dir %s. May because authority or dir already exists etc.",
10291030
consensusPipeName, newReceiverDir.getPath()));
10301031
}
1031-
receiverFileDirWithIdSuffix.set(newReceiverDir);
1032+
this.receiverFileBaseDir.set(newReceiverDir);
10321033
}
10331034

10341035
public PipeConsensusRequestVersion getVersion() {
@@ -1040,34 +1041,34 @@ public synchronized void handleExit() {
10401041
pipeConsensusTsFileWriterPool.handleExit(consensusPipeName);
10411042

10421043
// Clear the original receiver file dir if exists
1043-
if (receiverFileDirWithIdSuffix.get() != null) {
1044-
if (receiverFileDirWithIdSuffix.get().exists()) {
1044+
if (receiverFileBaseDir.get() != null) {
1045+
if (receiverFileBaseDir.get().exists()) {
10451046
try {
10461047
RetryUtils.retryOnException(
10471048
() -> {
1048-
FileUtils.deleteDirectory(receiverFileDirWithIdSuffix.get());
1049+
FileUtils.deleteDirectory(receiverFileBaseDir.get());
10491050
return null;
10501051
});
10511052
LOGGER.info(
10521053
"PipeConsensus-PipeName-{}: Receiver exit: Original receiver file dir {} was deleted.",
10531054
consensusPipeName,
1054-
receiverFileDirWithIdSuffix.get().getPath());
1055+
receiverFileBaseDir.get().getPath());
10551056
} catch (IOException e) {
10561057
LOGGER.warn(
10571058
"PipeConsensus-PipeName-{}: Receiver exit: Delete original receiver file dir {} error.",
10581059
consensusPipeName,
1059-
receiverFileDirWithIdSuffix.get().getPath(),
1060+
receiverFileBaseDir.get().getPath(),
10601061
e);
10611062
}
10621063
} else {
10631064
if (LOGGER.isDebugEnabled()) {
10641065
LOGGER.debug(
10651066
"PipeConsensus-PipeName-{}: Receiver exit: Original receiver file dir {} does not exist. No need to delete.",
10661067
consensusPipeName,
1067-
receiverFileDirWithIdSuffix.get().getPath());
1068+
receiverFileBaseDir.get().getPath());
10681069
}
10691070
}
1070-
receiverFileDirWithIdSuffix.set(null);
1071+
receiverFileBaseDir.set(null);
10711072
} else {
10721073
if (LOGGER.isDebugEnabled()) {
10731074
LOGGER.debug(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/receiver/IoTDBFileReceiver.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,10 +407,10 @@ private boolean isFileExistedAndNameCorrect(final String fileName) {
407407
return writingFile != null && writingFile.exists() && writingFile.getName().equals(fileName);
408408
}
409409

410-
private void closeCurrentWritingFileWriter(final boolean fsyncAfterClose) {
410+
private void closeCurrentWritingFileWriter(final boolean fsyncBeforeClose) {
411411
if (writingFileWriter != null) {
412412
try {
413-
if (IS_FSYNC_ENABLED && fsyncAfterClose) {
413+
if (IS_FSYNC_ENABLED && fsyncBeforeClose) {
414414
writingFileWriter.getFD().sync();
415415
}
416416
writingFileWriter.close();

0 commit comments

Comments
 (0)