Skip to content

Commit f8e24dc

Browse files
authored
Feature/retry on disk failure (#15676)
* Skip faulty disk * fix some issues * Adjusted the log messages related to disk failures. * Adjusted the log messages related to disk failures. * Format code using Spotless * Added FolderManager.getNextWithRetry(Consumer folderConsumer) * fix ut failures
1 parent 65b18ff commit f8e24dc

File tree

21 files changed

+389
-179
lines changed

21 files changed

+389
-179
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1000,6 +1000,9 @@ protected String getReceiverFileBaseDir() {
10001000
return ConfigNodeDescriptor.getInstance().getConf().getPipeReceiverFileDir();
10011001
}
10021002

1003+
@Override
1004+
protected void markFileBaseDirStateAbnormal(String dir) {}
1005+
10031006
@Override
10041007
protected String getSenderHost() {
10051008
final IClientSession session = SESSION_MANAGER.getCurrSession();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/exception/load/LoadFileException.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,4 +31,8 @@ public LoadFileException(String message) {
3131
public LoadFileException(Exception exception) {
3232
super(exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
3333
}
34+
35+
public LoadFileException(String message, Exception exception) {
36+
super(message, exception, TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
37+
}
3438
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/util/builder/PipeTsFileBuilder.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,27 +81,35 @@ private File getNextBaseDir() throws DiskSpaceInsufficientException {
8181
}
8282
}
8383
}
84-
85-
final File baseDir =
86-
new File(FOLDER_MANAGER.get().getNextFolder(), Long.toString(currentBatchId.get()));
87-
if (baseDir.exists()) {
88-
FileUtils.deleteQuietly(baseDir);
89-
}
90-
if (!baseDir.exists() && !baseDir.mkdirs()) {
91-
LOGGER.warn(
92-
"Batch id = {}: Failed to create batch file dir {}.",
93-
currentBatchId.get(),
94-
baseDir.getPath());
84+
synchronized (FOLDER_MANAGER) {
85+
File baseDir =
86+
FOLDER_MANAGER
87+
.get()
88+
.getNextWithRetry(
89+
folder -> {
90+
File dir = new File(folder, Long.toString(currentBatchId.get()));
91+
FileUtils.deleteQuietly(dir);
92+
if (dir.mkdirs()) {
93+
LOGGER.info(
94+
"Batch id = {}: Create batch dir successfully, batch file dir = {}.",
95+
currentBatchId.get(),
96+
dir.getPath());
97+
return dir;
98+
}
99+
LOGGER.warn(
100+
"Batch id = {}: Failed to create batch file dir {}.",
101+
currentBatchId.get(),
102+
dir.getPath());
103+
return null;
104+
});
105+
if (baseDir != null) {
106+
return baseDir;
107+
}
95108
throw new PipeException(
96109
String.format(
97110
"Failed to create batch file dir %s. (Batch id = %s)",
98111
baseDir.getPath(), currentBatchId.get()));
99112
}
100-
LOGGER.info(
101-
"Batch id = {}: Create batch dir successfully, batch file dir = {}.",
102-
currentBatchId.get(),
103-
baseDir.getPath());
104-
return baseDir;
105113
}
106114

107115
public abstract void bufferTableModelTablet(String dataBase, Tablet tablet);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java

Lines changed: 37 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1002,11 +1002,6 @@ private void updateWritingFileIfNeeded(
10021002
tsFileWriter.getWritingFile().getPath());
10031003
}
10041004

1005-
private String getReceiverFileBaseDir() throws DiskSpaceInsufficientException {
1006-
// Get next receiver file base dir by folder manager
1007-
return Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
1008-
}
1009-
10101005
private void initiateTsFileBufferFolder(List<String> receiverBaseDirsName) throws IOException {
10111006
// initiate receiverFileDirs
10121007
for (String receiverFileBaseDir : receiverBaseDirsName) {
@@ -1230,44 +1225,48 @@ public PipeConsensusTsFileWriter(int index, ConsensusPipeName consensusPipeName)
12301225
}
12311226

12321227
public void rollToNextWritingPath() throws IOException, DiskSpaceInsufficientException {
1233-
String receiverBasePath;
1234-
try {
1235-
receiverBasePath = getReceiverFileBaseDir();
1236-
} catch (Exception e) {
1237-
LOGGER.warn(
1238-
"Failed to init pipeConsensus receiver file folder manager because all disks of folders are full.",
1239-
e);
1240-
throw e;
1241-
}
1242-
if (Objects.isNull(receiverBasePath)) {
1243-
LOGGER.warn(
1244-
"PipeConsensus-PipeName-{}: Failed to get pipeConsensus receiver file base directory, because folderManager is null. May because the disk is full.",
1245-
consensusPipeName.toString());
1246-
throw new DiskSpaceInsufficientException(receiveDirs);
1228+
if (folderManager == null) {
1229+
throw new IOException(
1230+
String.format(
1231+
"PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d receiver file dir",
1232+
consensusPipeName, index));
12471233
}
1234+
this.localWritingDir =
1235+
folderManager.getNextWithRetry(
1236+
receiverBasePath -> {
1237+
if (receiverBasePath == null) {
1238+
LOGGER.warn(
1239+
"PipeConsensus-PipeName-{}: Failed to get base directory", consensusPipeName);
1240+
return null;
1241+
}
1242+
File writingDir = new File(receiverBasePath + File.separator + index);
1243+
deleteFileOrDirectoryIfExists(
1244+
writingDir,
1245+
String.format(
1246+
"TsFileWriter-%s roll to new dir and delete last writing dir", index));
12481247

1249-
String localWritingDirPath = receiverBasePath + File.separator + index;
1250-
this.localWritingDir = new File(localWritingDirPath);
1251-
// Remove exists dir
1252-
deleteFileOrDirectoryIfExists(
1253-
this.localWritingDir,
1254-
String.format("TsFileWriter-%s roll to new dir and delete last writing dir", index));
1255-
if (!this.localWritingDir.mkdirs()) {
1256-
LOGGER.warn(
1257-
"PipeConsensus-PipeName-{}: Failed to create receiver tsFileWriter-{} file dir {}. May because authority or dir already exists etc.",
1258-
consensusPipeName,
1259-
index,
1260-
this.localWritingDir.getPath());
1248+
if (writingDir.mkdirs()) {
1249+
LOGGER.info(
1250+
"PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
1251+
consensusPipeName,
1252+
index,
1253+
writingDir.getPath());
1254+
return writingDir;
1255+
}
1256+
LOGGER.warn(
1257+
"PipeConsensus-PipeName-{}: Failed to create receiver tsFileWriter-{} file dir {}",
1258+
consensusPipeName,
1259+
index,
1260+
writingDir.getPath());
1261+
return null;
1262+
});
1263+
1264+
if (this.localWritingDir == null) {
12611265
throw new IOException(
12621266
String.format(
1263-
"PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d receiver file dir %s. May because authority or dir already exists etc.",
1264-
consensusPipeName, index, this.localWritingDir.getPath()));
1267+
"PipeConsensus-PipeName-%s: Failed to create tsFileWriter-%d receiver file dir",
1268+
consensusPipeName, index));
12651269
}
1266-
LOGGER.info(
1267-
"PipeConsensus-PipeName-{}: tsfileWriter-{} roll to writing path {}",
1268-
consensusPipeName,
1269-
index,
1270-
localWritingDirPath);
12711270
}
12721271

12731272
public File getLocalWritingDir() {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -508,6 +508,11 @@ protected String getReceiverFileBaseDir() throws DiskSpaceInsufficientException
508508
return Objects.isNull(folderManager) ? null : folderManager.getNextFolder();
509509
}
510510

511+
@Override
512+
protected void markFileBaseDirStateAbnormal(String dir) {
513+
folderManager.updateFolderState(dir, FolderManager.FolderState.ABNORMAL);
514+
}
515+
511516
@Override
512517
protected String getSenderHost() {
513518
final IClientSession session = SESSION_MANAGER.getCurrSession();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/snapshot/SnapshotLoader.java

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -288,28 +288,57 @@ private void createLinksFromSnapshotToSourceDir(
288288
Map<String, String> fileTarget = new HashMap<>();
289289
for (File file : files) {
290290
String fileKey = file.getName().split("\\.")[0];
291-
String dataDir;
292-
if (fileTarget.containsKey(fileKey)) {
293-
dataDir = fileTarget.get(fileKey);
294-
} else {
295-
dataDir = folderManager.getNextFolder();
296-
fileTarget.put(fileKey, dataDir);
297-
}
298-
File targetFile =
299-
new File(dataDir + File.separator + targetSuffix + File.separator + file.getName());
300-
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
291+
String dataDir = fileTarget.get(fileKey);
292+
293+
try {
294+
folderManager.getNextWithRetry(
295+
currentDataDir -> {
296+
String effectiveDir = (dataDir != null) ? dataDir : currentDataDir;
297+
File targetFile =
298+
new File(
299+
effectiveDir
300+
+ File.separator
301+
+ targetSuffix
302+
+ File.separator
303+
+ file.getName());
304+
305+
try {
306+
if (!targetFile.getParentFile().exists() && !targetFile.getParentFile().mkdirs()) {
307+
throw new IOException(
308+
String.format(
309+
"Cannot create directory %s",
310+
targetFile.getParentFile().getAbsolutePath()));
311+
}
312+
313+
try {
314+
Files.createLink(targetFile.toPath(), file.toPath());
315+
LOGGER.debug("Created hard link from {} to {}", file, targetFile);
316+
return targetFile;
317+
} catch (IOException e) {
318+
LOGGER.info(
319+
"Cannot create link from {} to {}, fallback to copy", file, targetFile);
320+
}
321+
322+
Files.copy(file.toPath(), targetFile.toPath());
323+
fileTarget.put(fileKey, effectiveDir);
324+
return targetFile;
325+
} catch (Exception e) {
326+
LOGGER.warn(
327+
"Failed to process file {} in dir {}: {}",
328+
file.getName(),
329+
effectiveDir,
330+
e.getMessage(),
331+
e);
332+
throw e;
333+
}
334+
});
335+
} catch (Exception e) {
301336
throw new IOException(
302337
String.format(
303-
"Cannot create directory %s", targetFile.getParentFile().getAbsolutePath()));
338+
"Failed to process file after retries. Source: %s, Target suffix: %s",
339+
file.getAbsolutePath(), targetSuffix),
340+
e);
304341
}
305-
try {
306-
Files.createLink(targetFile.toPath(), file.toPath());
307-
continue;
308-
} catch (IOException e) {
309-
LOGGER.info("Cannot create link from {} to {}, try to copy it", file, targetFile);
310-
}
311-
312-
Files.copy(file.toPath(), targetFile.toPath());
313342
}
314343
}
315344

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/generator/TsFileNameGenerator.java

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@
2424
import org.apache.iotdb.db.exception.DiskSpaceInsufficientException;
2525
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
2626
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
27+
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
2728
import org.apache.iotdb.db.storageengine.rescon.disk.TierManager;
2829

2930
import org.apache.tsfile.common.constant.TsFileConstant;
3031
import org.apache.tsfile.fileSystem.FSFactoryProducer;
3132
import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3235

3336
import java.io.File;
3437
import java.io.IOException;
@@ -43,6 +46,7 @@
4346
public class TsFileNameGenerator {
4447

4548
private static FSFactory fsFactory = FSFactoryProducer.getFSFactory();
49+
private static final Logger LOGGER = LoggerFactory.getLogger(TsFileNameGenerator.class);
4650

4751
public static String generateNewTsFilePath(
4852
String tsFileDir,
@@ -65,7 +69,7 @@ public static String generateNewTsFilePathWithMkdir(
6569
long version,
6670
int innerSpaceCompactionCount,
6771
int crossSpaceCompactionCount)
68-
throws DiskSpaceInsufficientException {
72+
throws DiskSpaceInsufficientException, IOException {
6973
return generateNewTsFilePathWithMkdir(
7074
sequence,
7175
logicalStorageGroup,
@@ -90,33 +94,36 @@ public static String generateNewTsFilePathWithMkdir(
9094
int crossSpaceCompactionCount,
9195
int tierLevel,
9296
String customSuffix)
93-
throws DiskSpaceInsufficientException {
94-
String tsFileDir =
95-
generateTsFileDir(
96-
sequence, logicalStorageGroup, virtualStorageGroup, timePartitionId, tierLevel);
97-
fsFactory.getFile(tsFileDir).mkdirs();
98-
return tsFileDir
99-
+ File.separator
100-
+ generateNewTsFileName(
101-
time, version, innerSpaceCompactionCount, crossSpaceCompactionCount, customSuffix);
102-
}
103-
104-
public static String generateTsFileDir(
105-
boolean sequence,
106-
String logicalStorageGroup,
107-
String virtualStorageGroup,
108-
long timePartitionId,
109-
int tierLevel)
110-
throws DiskSpaceInsufficientException {
111-
TierManager tierManager = TierManager.getInstance();
112-
String baseDir = tierManager.getNextFolderForTsFile(tierLevel, sequence);
113-
return baseDir
114-
+ File.separator
115-
+ logicalStorageGroup
116-
+ File.separator
117-
+ virtualStorageGroup
118-
+ File.separator
119-
+ timePartitionId;
97+
throws DiskSpaceInsufficientException, IOException {
98+
FolderManager folderManager = TierManager.getInstance().getFolderManager(tierLevel, sequence);
99+
try {
100+
return folderManager.getNextWithRetry(
101+
baseDir -> {
102+
String tsFileDir =
103+
baseDir
104+
+ File.separator
105+
+ logicalStorageGroup
106+
+ File.separator
107+
+ virtualStorageGroup
108+
+ File.separator
109+
+ timePartitionId;
110+
fsFactory.getFile(tsFileDir).mkdirs();
111+
return tsFileDir
112+
+ File.separator
113+
+ generateNewTsFileName(
114+
time,
115+
version,
116+
innerSpaceCompactionCount,
117+
crossSpaceCompactionCount,
118+
customSuffix);
119+
});
120+
} catch (DiskSpaceInsufficientException e) {
121+
LOGGER.error("All disks are full, cannot create tsfile directory", e);
122+
throw new IOException("Disk space insufficient", e);
123+
} catch (Exception e) {
124+
LOGGER.warn("Failed to create tsfile directory after retries", e);
125+
throw new IOException("Failed to create directory after retries", e);
126+
}
120127
}
121128

122129
public static String generateNewTsFileName(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/allocation/AbstractNodeAllocationStrategy.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -56,17 +56,18 @@ protected AbstractNodeAllocationStrategy() {
5656
}
5757

5858
protected IWALNode createWALNode(String identifier) {
59-
String folder;
60-
// get wal folder
6159
try {
62-
folder = folderManager.getNextFolder();
60+
return folderManager.getNextWithRetry(
61+
folder -> new WALNode(identifier, folder + File.separator + identifier));
6362
} catch (DiskSpaceInsufficientException e) {
6463
logger.error("Fail to create wal node because all disks of wal folders are full.", e);
6564
return WALFakeNode.getFailureInstance(e);
65+
} catch (Exception e) {
66+
logger.warn("Failed to create WAL node after retries for identifier: " + identifier, e);
67+
return WALFakeNode.getFailureInstance(
68+
new IOException(
69+
"Failed to create WAL node after retries for identifier: " + identifier, e));
6670
}
67-
folder = folder + File.separator + identifier;
68-
// create new wal node
69-
return createWALNode(identifier, folder);
7071
}
7172

7273
protected IWALNode createWALNode(String identifier, String folder) {

0 commit comments

Comments
 (0)