Skip to content

Commit f66794b

Browse files
authored
[To dev/1.3] feat: encode load attributes in active load directories (#16722) (#16758)
1 parent 72a3f45 commit f66794b

File tree

11 files changed

+527
-119
lines changed

11 files changed

+527
-119
lines changed

integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadTsFileIT.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -998,6 +998,47 @@ public void testLoadLocally() throws Exception {
998998
}
999999
}
10001000

1001+
@Test
1002+
public void testAsyncLoadLocally() throws Exception {
1003+
registerSchema();
1004+
1005+
final long writtenPoint1;
1006+
// device 0, device 1, sg 0
1007+
try (final TsFileGenerator generator =
1008+
new TsFileGenerator(new File(tmpDir, "1-0-0-0.tsfile"))) {
1009+
generator.registerTimeseries(
1010+
SchemaConfig.DEVICE_0, Collections.singletonList(SchemaConfig.MEASUREMENT_00));
1011+
generator.generateData(SchemaConfig.DEVICE_0, 1, PARTITION_INTERVAL / 10_000, false);
1012+
writtenPoint1 = generator.getTotalNumber();
1013+
}
1014+
1015+
try (final Connection connection = EnvFactory.getEnv().getConnection();
1016+
final Statement statement = connection.createStatement()) {
1017+
1018+
statement.execute(
1019+
String.format(
1020+
"load \"%s\" with ('async'='true','database-level'='2')", tmpDir.getAbsolutePath()));
1021+
1022+
for (int i = 0; i < 20; i++) {
1023+
try (final ResultSet resultSet =
1024+
statement.executeQuery("select count(*) from root.** group by level=1,2")) {
1025+
if (resultSet.next()) {
1026+
final long sg1Count = resultSet.getLong("count(root.sg.test_0.*.*)");
1027+
Assert.assertEquals(writtenPoint1, sg1Count);
1028+
} else {
1029+
Assert.fail("This ResultSet is empty.");
1030+
}
1031+
} catch (final Throwable e) {
1032+
if (i < 19) {
1033+
Thread.sleep(1000);
1034+
} else {
1035+
throw e;
1036+
}
1037+
}
1038+
}
1039+
}
1040+
}
1041+
10011042
@Test
10021043
@Ignore("Load with conversion is currently banned")
10031044
public void testLoadWithConvertOnTypeMismatch() throws Exception {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
8484
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
8585
import org.apache.iotdb.db.queryengine.plan.statement.pipe.PipeEnrichedStatement;
86+
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
8687
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
8788
import org.apache.iotdb.db.storageengine.rescon.disk.FolderManager;
8889
import org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyType;
@@ -475,7 +476,14 @@ protected TSStatus loadFileV2(
475476
}
476477

477478
private TSStatus loadTsFileAsync(final List<String> absolutePaths) throws IOException {
478-
if (!ActiveLoadUtil.loadFilesToActiveDir(null, absolutePaths, true)) {
479+
final Map<String, String> loadAttributes =
480+
ActiveLoadPathHelper.buildAttributes(
481+
null,
482+
shouldConvertDataTypeOnTypeMismatch,
483+
validateTsFile.get(),
484+
null,
485+
shouldMarkAsPipeRequest.get());
486+
if (!ActiveLoadUtil.loadFilesToActiveDir(loadAttributes, absolutePaths, true)) {
479487
throw new PipeException("Load active listening pipe dir is not set.");
480488
}
481489
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.FileTimeIndex;
6666
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
6767
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
68+
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadPathHelper;
6869
import org.apache.iotdb.db.storageengine.load.active.ActiveLoadUtil;
6970
import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
7071
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
@@ -240,7 +241,15 @@ public Analysis analyzeFileByFile(Analysis analysis) {
240241
private boolean doAsyncLoad(final Analysis analysis) {
241242
final long startTime = System.nanoTime();
242243
try {
243-
if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(tsFiles, null, isDeleteAfterLoad)) {
244+
final Map<String, String> activeLoadAttributes =
245+
ActiveLoadPathHelper.buildAttributes(
246+
databaseLevel,
247+
isConvertOnTypeMismatch,
248+
isVerifySchema,
249+
tabletConversionThresholdBytes,
250+
isGeneratedByPipe);
251+
if (ActiveLoadUtil.loadTsFileAsyncToActiveDir(
252+
tsFiles, activeLoadAttributes, isDeleteAfterLoad)) {
244253
analysis.setFinishQueryAfterAnalyze(true);
245254
analysis.setFailStatus(RpcUtils.getStatus(TSStatusCode.SUCCESS_STATUS));
246255
analysis.setStatement(loadTsFileStatement);

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,6 @@ public class LoadTsFileStatement extends Statement {
5151
private boolean isGeneratedByPipe = false;
5252
private boolean isAsyncLoad = false;
5353

54-
private Map<String, String> loadAttributes;
55-
5654
private List<File> tsFiles;
5755
private List<TsFileResource> resources;
5856
private List<Long> writePointCountList;
@@ -212,15 +210,14 @@ public long getWritePointCount(int resourceIndex) {
212210
}
213211

214212
public void setLoadAttributes(final Map<String, String> loadAttributes) {
215-
this.loadAttributes = loadAttributes;
216-
initAttributes();
213+
initAttributes(loadAttributes);
217214
}
218215

219216
public boolean isAsyncLoad() {
220217
return isAsyncLoad;
221218
}
222219

223-
private void initAttributes() {
220+
private void initAttributes(final Map<String, String> loadAttributes) {
224221
this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
225222
this.deleteAfterLoad = LoadTsFileConfigurator.parseOrGetDefaultOnSuccess(loadAttributes);
226223
this.convertOnTypeMismatch =
@@ -229,6 +226,9 @@ private void initAttributes() {
229226
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
230227
this.verifySchema = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
231228
this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
229+
if (LoadTsFileConfigurator.parseOrGetDefaultPipeGenerated(loadAttributes)) {
230+
markIsGeneratedByPipe();
231+
}
232232
}
233233

234234
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadDirScanner.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.nio.file.SimpleFileVisitor;
4040
import java.nio.file.attribute.BasicFileAttributes;
4141
import java.util.Arrays;
42+
import java.util.Map;
4243
import java.util.Set;
4344
import java.util.concurrent.CopyOnWriteArraySet;
4445
import java.util.concurrent.atomic.AtomicBoolean;
@@ -101,8 +102,9 @@ private void scan() throws IOException {
101102

102103
final boolean isGeneratedByPipe =
103104
listeningDir.equals(IOTDB_CONFIG.getLoadActiveListeningPipeDir());
105+
final File listeningDirFile = new File(listeningDir);
104106
try (final Stream<File> fileStream =
105-
FileUtils.streamFiles(new File(listeningDir), true, (String[]) null)) {
107+
FileUtils.streamFiles(listeningDirFile, true, (String[]) null)) {
106108
try {
107109
fileStream
108110
.filter(file -> !activeLoadTsFileLoader.isFilePendingOrLoading(file))
@@ -115,7 +117,18 @@ private void scan() throws IOException {
115117
.filter(this::isTsFileCompleted)
116118
.limit(currentAllowedPendingSize)
117119
.forEach(
118-
file -> activeLoadTsFileLoader.tryTriggerTsFileLoad(file, isGeneratedByPipe));
120+
filePath -> {
121+
final File tsFile = new File(filePath);
122+
final Map<String, String> attributes =
123+
ActiveLoadPathHelper.parseAttributes(tsFile, listeningDirFile);
124+
125+
final File parentFile = tsFile.getParentFile();
126+
127+
activeLoadTsFileLoader.tryTriggerTsFileLoad(
128+
tsFile.getAbsolutePath(),
129+
listeningDirFile.getAbsolutePath(),
130+
isGeneratedByPipe);
131+
});
119132
} catch (UncheckedIOException e) {
120133
LOGGER.debug("The file has been deleted. Ignore this exception.");
121134
} catch (final Exception e) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadFailedMessageHandler.java

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121

2222
import org.apache.iotdb.commons.conf.CommonDescriptor;
2323

24-
import org.apache.tsfile.utils.Pair;
2524
import org.slf4j.Logger;
2625
import org.slf4j.LoggerFactory;
2726

@@ -41,72 +40,72 @@ public class ActiveLoadFailedMessageHandler {
4140
// system is memory constrains
4241
put(
4342
"memory",
44-
filePair ->
43+
entry ->
4544
LOGGER.info(
4645
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to memory constraints, will retry later.",
47-
filePair.getLeft(),
48-
filePair.getRight()));
46+
entry.getFile(),
47+
entry.isGeneratedByPipe()));
4948
// system is read only
5049
put(
5150
"read only",
52-
filePair ->
51+
entry ->
5352
LOGGER.info(
5453
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the system is read only, will retry later.",
55-
filePair.getLeft(),
56-
filePair.getRight()));
54+
entry.getFile(),
55+
entry.isGeneratedByPipe()));
5756
// Timed out to wait for procedure return. The procedure is still running.
5857
put(
5958
"procedure return",
60-
filePair ->
59+
entry ->
6160
LOGGER.info(
6261
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to time out to wait for procedure return, will retry later.",
63-
filePair.getLeft(),
64-
filePair.getRight()));
62+
entry.getFile(),
63+
entry.isGeneratedByPipe()));
6564
// DataNode is not enough, please register more.
6665
put(
6766
"not enough",
68-
filePair ->
67+
entry ->
6968
LOGGER.info(
7069
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to the datanode is not enough, will retry later.",
71-
filePair.getLeft(),
72-
filePair.getRight()));
70+
entry.getFile(),
71+
entry.isGeneratedByPipe()));
7372
// Fail to connect to any config node. Please check status of ConfigNodes or logs of
7473
// connected DataNode.
7574
put(
7675
"any config node",
77-
filePair ->
76+
entry ->
7877
LOGGER.info(
7978
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to fail to connect to any config node, will retry later.",
80-
filePair.getLeft(),
81-
filePair.getRight()));
79+
entry.getFile(),
80+
entry.isGeneratedByPipe()));
8281
// Current query is time out, query start time is 1729653161797, ddl is
8382
// -3046040214706, current time is 1729653184210, please check your statement or
8483
// modify timeout parameter
8584
put(
8685
"query is time out",
87-
filePair ->
86+
entry ->
8887
LOGGER.info(
8988
"Rejecting auto load tsfile {} (isGeneratedByPipe = {}) due to current query is time out, will retry later.",
90-
filePair.getLeft(),
91-
filePair.getRight()));
89+
entry.getFile(),
90+
entry.isGeneratedByPipe()));
9291
}
9392
});
9493

9594
@FunctionalInterface
9695
private interface ExceptionMessageHandler {
97-
void handle(final Pair<String, Boolean> filePair);
96+
void handle(final ActiveLoadPendingQueue.ActiveLoadEntry activeLoadEntry);
9897
}
9998

10099
public static boolean isExceptionMessageShouldRetry(
101-
final Pair<String, Boolean> filePair, final String message) {
100+
final ActiveLoadPendingQueue.ActiveLoadEntry entry, final String message) {
102101
if (CommonDescriptor.getInstance().getConfig().isReadOnly()) {
103-
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(filePair);
102+
EXCEPTION_MESSAGE_HANDLER_MAP.get("read only").handle(entry);
104103
return true;
105104
}
106105

107106
for (String key : EXCEPTION_MESSAGE_HANDLER_MAP.keySet()) {
108107
if (message != null && message.contains(key)) {
109-
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(filePair);
108+
EXCEPTION_MESSAGE_HANDLER_MAP.get(key).handle(entry);
110109
return true;
111110
}
112111
}

0 commit comments

Comments
 (0)