Skip to content

Commit 293d6eb

Browse files
authored
Load: Check whether TsFile is a tree model when DataBase is not specified (apache#15592)
1 parent 87a37c2 commit 293d6eb

File tree

4 files changed

+101
-65
lines changed

4 files changed

+101
-65
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import java.nio.file.SimpleFileVisitor;
3939
import java.nio.file.attribute.BasicFileAttributes;
4040
import java.util.Arrays;
41+
import java.util.Objects;
4142
import java.util.Set;
4243
import java.util.concurrent.CopyOnWriteArraySet;
4344
import java.util.concurrent.atomic.AtomicBoolean;
@@ -100,8 +101,9 @@ private void scan() throws IOException {
100101

101102
final boolean isGeneratedByPipe =
102103
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
104+
final File listeningDirFile = new File(listeningDir);
103105
try (final Stream<File> fileStream =
104-
FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)) {
106+
FileUtils.streamFiles(listeningDirFile, true, (String[]) null)) {
105107
try {
106108
fileStream
107109
.filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
@@ -114,7 +116,15 @@ private void scan() throws IOException {
114116
.filter(this::isTsFileCompleted)
115117
.limit(currentAllowedPendingSize)
116118
.forEach(
117-
file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, isGeneratedByPipe));
119+
file -> {
120+
final File parentFile = new File(file).getParentFile();
121+
activeLoadTsFileLoader.tryTriggerTsFileLoad(
122+
file,
123+
parentFile != null
124+
&& !Objects.equals(
125+
parentFile.getAbsoluteFile(), listeningDirFile.getAbsoluteFile()),
126+
isGeneratedByPipe);
127+
});
118128
} catch (final Exception e) {
119129
LOGGER.warn("Exception occurred during scanning dir: {}", listeningDir, e);
120130
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
2323

24-
import org.apache.tsfile.utils.Pair;
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

@@ -44,41 +43,41 @@ public class ActiveLoadFailedMessageHandler {
4443
filePair ->
4544
LOGGER.info(
4645
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to memory constraints, will retry later.",
47-
filePair.getLeft(),
48-
filePair.getRight()));
46+
filePair.getFile(),
47+
filePair.isGeneratedByPipe()));
4948
// system is read only
5049
put(
5150
"read only",
5251
filePair ->
5352
LOGGER.info(
5453
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the system is read only, will retry later.",
55-
filePair.getLeft(),
56-
filePair.getRight()));
54+
filePair.getFile(),
55+
filePair.isGeneratedByPipe()));
5756
// Timed out to wait for procedure return. The procedure is still running.
5857
put(
5958
"procedure return",
6059
filePair ->
6160
LOGGER.info(
6261
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to time out to wait for procedure return, will retry later.",
63-
filePair.getLeft(),
64-
filePair.getRight()));
62+
filePair.getFile(),
63+
filePair.isGeneratedByPipe()));
6564
// DataNode is not enough, please register more.
6665
put(
6766
"not enough",
6867
filePair ->
6968
LOGGER.info(
7069
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the datanode is not enough, will retry later.",
71-
filePair.getLeft(),
72-
filePair.getRight()));
70+
filePair.getFile(),
71+
filePair.isGeneratedByPipe()));
7372
// Fail to connect to any config node. Please check status of ConfigNodes or logs of
7473
// connected DataNode.
7574
put(
7675
"any config node",
7776
filePair ->
7877
LOGGER.info(
7978
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to fail to connect to any config node, will retry later.",
80-
filePair.getLeft(),
81-
filePair.getRight()));
79+
filePair.getFile(),
80+
filePair.isGeneratedByPipe()));
8281
// Current query is time out, query start time is 1729653161797, ddl is
8382
// -3046040214706, current time is 1729653184210, please check your statement or
8483
// modify timeout parameter
@@ -87,26 +86,26 @@ public class ActiveLoadFailedMessageHandler {
8786
filePair ->
8887
LOGGER.info(
8988
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to current query is time out, will retry later.",
90-
filePair.getLeft(),
91-
filePair.getRight()));
89+
filePair.getFile(),
90+
filePair.isGeneratedByPipe()));
9291
}
9392
});
9493

9594
@FunctionalInterface
9695
private interface ExceptionMessageHandler {
97-
void handle(final Pair<String, Boolean> filePair);
96+
void handle(final ActiveLoadPendingQueue.ActiveLoadEntry entry);
9897
}
9998

10099
public static boolean isExceptionMessageShouldRetry(
101-
final Pair<String, Boolean> filePair, final String message) {
100+
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String message) {
102101
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
103-
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(filePair);
102+
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(entry);
104103
return true;
105104
}
106105

107106
for (String key : EXCEPTION_MESSAGE_HANDLER_MAP.keySet()) {
108107
if (message != null && message.contains(key)) {
109-
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(filePair);
108+
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(entry);
110109
return true;
111110
}
112111
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadPendingQueue.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121

2222
import org.apache.iotdb.db.storageengine.load.metrics.ActiveLoadingFilesNumberMetricsSet;
2323

24-
import org.apache.tsfile.utils.Pair;
25-
2624
import java.util.HashSet;
2725
import java.util.Queue;
2826
import java.util.Set;
@@ -31,30 +29,31 @@
3129
public class ActiveLoadPendingQueue {
3230

3331
private final Set<String> pendingFileSet = new HashSet<>();
34-
private final Queue<Pair<String, Boolean>> pendingFileQueue = new ConcurrentLinkedQueue<>();
32+
private final Queue<ActiveLoadEntry> pendingFileQueue = new ConcurrentLinkedQueue<>();
3533

3634
private final Set<String> loadingFileSet = new HashSet<>();
3735

38-
public synchronized boolean enqueue(final String file, final boolean isGeneratedByPipe) {
36+
public synchronized boolean enqueue(
37+
final String file, final boolean isGeneratedByPipe, final boolean isTableModel) {
3938
if (!loadingFileSet.contains(file) && pendingFileSet.add(file)) {
40-
pendingFileQueue.offer(new Pair<>(file, isGeneratedByPipe));
39+
pendingFileQueue.offer(new ActiveLoadEntry(file, isGeneratedByPipe, isTableModel));
4140

4241
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(1);
4342
return true;
4443
}
4544
return false;
4645
}
4746

48-
public synchronized Pair<String, Boolean> dequeueFromPending() {
49-
final Pair<String, Boolean> pair = pendingFileQueue.poll();
50-
if (pair != null) {
51-
pendingFileSet.remove(pair.left);
52-
loadingFileSet.add(pair.left);
47+
public synchronized ActiveLoadEntry dequeueFromPending() {
48+
final ActiveLoadEntry entry = pendingFileQueue.poll();
49+
if (entry != null) {
50+
pendingFileSet.remove(entry.getFile());
51+
loadingFileSet.add(entry.getFile());
5352

5453
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseLoadingFileCounter(1);
5554
ActiveLoadingFilesNumberMetricsSet.getInstance().increaseQueuingFileCounter(-1);
5655
}
57-
return pair;
56+
return entry;
5857
}
5958

6059
public synchronized void removeFromLoading(final String file) {
@@ -74,4 +73,28 @@ public int size() {
7473
public boolean isEmpty() {
7574
return pendingFileQueue.isEmpty() && loadingFileSet.isEmpty();
7675
}
76+
77+
public static class ActiveLoadEntry {
78+
private final String file;
79+
private final boolean isGeneratedByPipe;
80+
private final boolean isTableModel;
81+
82+
public ActiveLoadEntry(String file, boolean isGeneratedByPipe, boolean isTableModel) {
83+
this.file = file;
84+
this.isGeneratedByPipe = isGeneratedByPipe;
85+
this.isTableModel = isTableModel;
86+
}
87+
88+
public String getFile() {
89+
return file;
90+
}
91+
92+
public boolean isGeneratedByPipe() {
93+
return isGeneratedByPipe;
94+
}
95+
96+
public boolean isTableModel() {
97+
return isTableModel;
98+
}
99+
}
77100
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java

Lines changed: 39 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
import org.apache.iotdb.rpc.TSStatusCode;
4444

4545
import org.apache.commons.io.FileUtils;
46-
import org.apache.tsfile.utils.Pair;
4746
import org.slf4j.Logger;
4847
import org.slf4j.LoggerFactory;
4948

@@ -84,12 +83,13 @@ public int getCurrentAllowedPendingSize() {
8483
return MAX_PENDING_SIZE - pendingQueue.size();
8584
}
8685

87-
public void tryTriggerTsFileLoad(String absolutePath, boolean isGeneratedByPipe) {
86+
public void tryTriggerTsFileLoad(
87+
String absolutePath, boolean isTabletMode, boolean isGeneratedByPipe) {
8888
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
8989
return;
9090
}
9191

92-
if (pendingQueue.enqueue(absolutePath, isGeneratedByPipe)) {
92+
if (pendingQueue.enqueue(absolutePath, isTabletMode, isGeneratedByPipe)) {
9393
initFailDirIfNecessary();
9494
adjustExecutorIfNecessary();
9595
}
@@ -165,44 +165,44 @@ private void tryLoadPendingTsFiles() {
165165

166166
try {
167167
while (true) {
168-
final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
169-
if (!filePair.isPresent()) {
168+
final Optional<ActiveLoadPendingQueue.ActiveLoadEntry> loadEntry = tryGetNextPendingFile();
169+
if (!loadEntry.isPresent()) {
170170
return;
171171
}
172172

173173
try {
174-
final TSStatus result = loadTsFile(filePair.get(), session);
174+
final TSStatus result = loadTsFile(loadEntry.get(), session);
175175
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
176176
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
177177
LOGGER.info(
178178
"Successfully auto load tsfile {} (isGeneratedByPipe = {})",
179-
filePair.get().getLeft(),
180-
filePair.get().getRight());
179+
loadEntry.get().getFile(),
180+
loadEntry.get().isGeneratedByPipe());
181181
} else {
182-
handleLoadFailure(filePair.get(), result);
182+
handleLoadFailure(loadEntry.get(), result);
183183
}
184184
} catch (final FileNotFoundException e) {
185-
handleFileNotFoundException(filePair.get());
185+
handleFileNotFoundException(loadEntry.get());
186186
} catch (final Exception e) {
187-
handleOtherException(filePair.get(), e);
187+
handleOtherException(loadEntry.get(), e);
188188
} finally {
189-
pendingQueue.removeFromLoading(filePair.get().getLeft());
189+
pendingQueue.removeFromLoading(loadEntry.get().getFile());
190190
}
191191
}
192192
} finally {
193193
SESSION_MANAGER.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
194194
}
195195
}
196196

197-
private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
197+
private Optional<ActiveLoadPendingQueue.ActiveLoadEntry> tryGetNextPendingFile() {
198198
final long maxRetryTimes =
199199
Math.max(1, IOTDB_CONFIG.getLoadActiveListeningCheckIntervalSeconds() << 1);
200200
long currentRetryTimes = 0;
201201

202202
while (true) {
203-
final Pair<String, Boolean> filePair = pendingQueue.dequeueFromPending();
204-
if (Objects.nonNull(filePair)) {
205-
return Optional.of(filePair);
203+
final ActiveLoadPendingQueue.ActiveLoadEntry entry = pendingQueue.dequeueFromPending();
204+
if (Objects.nonNull(entry)) {
205+
return Optional.of(entry);
206206
}
207207

208208
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(1));
@@ -213,17 +213,20 @@ private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
213213
}
214214
}
215215

216-
private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final IClientSession session)
216+
private TSStatus loadTsFile(
217+
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final IClientSession session)
217218
throws FileNotFoundException {
218-
final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft());
219+
final LoadTsFileStatement statement = new LoadTsFileStatement(entry.getFile());
219220
final List<File> files = statement.getTsFiles();
220221

221222
// It should be noted here that the instructions in this code block do not need to use the
222223
// DataBase, so the DataBase is assigned a value of null. If the DataBase is used later, an
223224
// exception will be thrown.
224225
final File parentFile;
225226
statement.setDatabase(
226-
files.isEmpty() || (parentFile = files.get(0).getParentFile()) == null
227+
files.isEmpty()
228+
|| !entry.isTableModel()
229+
|| (parentFile = files.get(0).getParentFile()) == null
227230
? null
228231
: parentFile.getName());
229232
statement.setDeleteAfterLoad(true);
@@ -232,7 +235,7 @@ private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final IClientS
232235
statement.setAutoCreateDatabase(
233236
IoTDBDescriptor.getInstance().getConfig().isAutoCreateSchemaEnabled());
234237
return executeStatement(
235-
filePair.getRight() ? new PipeEnrichedStatement(statement) : statement, session);
238+
entry.isGeneratedByPipe() ? new PipeEnrichedStatement(statement) : statement, session);
236239
}
237240

238241
private TSStatus executeStatement(final Statement statement, final IClientSession session) {
@@ -254,34 +257,35 @@ private TSStatus executeStatement(final Statement statement, final IClientSessio
254257
}
255258
}
256259

257-
private void handleLoadFailure(final Pair<String, Boolean> filePair, final TSStatus status) {
258-
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(
259-
filePair, status.getMessage())) {
260+
private void handleLoadFailure(
261+
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final TSStatus status) {
262+
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, status.getMessage())) {
260263
LOGGER.warn(
261264
"Failed to auto load tsfile {} (isGeneratedByPipe = {}), status: {}. File will be moved to fail directory.",
262-
filePair.getLeft(),
263-
filePair.getRight(),
265+
entry.getFile(),
266+
entry.isGeneratedByPipe(),
264267
status);
265-
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
268+
removeFileAndResourceAndModsToFailDir(entry.getFile());
266269
}
267270
}
268271

269-
private void handleFileNotFoundException(final Pair<String, Boolean> filePair) {
272+
private void handleFileNotFoundException(final ActiveLoadPendingQueue.ActiveLoadEntry entry) {
270273
LOGGER.warn(
271274
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) due to file not found, will skip this file.",
272-
filePair.getLeft(),
273-
filePair.getRight());
274-
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
275+
entry.getFile(),
276+
entry.isGeneratedByPipe());
277+
removeFileAndResourceAndModsToFailDir(entry.getFile());
275278
}
276279

277-
private void handleOtherException(final Pair<String, Boolean> filePair, final Exception e) {
278-
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(filePair, e.getMessage())) {
280+
private void handleOtherException(
281+
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final Exception e) {
282+
if (!ActiveLoadFailedMessageHandler.isExceptionMessageShouldRetry(entry, e.getMessage())) {
279283
LOGGER.warn(
280284
"Failed to auto load tsfile {} (isGeneratedByPipe = {}) because of an unexpected exception. File will be moved to fail directory.",
281-
filePair.getLeft(),
282-
filePair.getRight(),
285+
entry.getFile(),
286+
entry.isGeneratedByPipe(),
283287
e);
284-
removeFileAndResourceAndModsToFailDir(filePair.getLeft());
288+
removeFileAndResourceAndModsToFailDir(entry.getFile());
285289
}
286290
}
287291

0 commit comments

Comments
 (0)