Skip to content

Commit 7e4e01c

Browse files
Load & Pipe: Support Active Load Table Model TsFiles & Support Async Load in SQL & Support Async Load Strategy in Pipe (apache#15208)
Co-authored-by: Steve Yurong Su <[email protected]>
1 parent 8889255 commit 7e4e01c

File tree

7 files changed

+239
-44
lines changed

7 files changed

+239
-44
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -567,27 +567,31 @@ protected TSStatus loadFileV2(
567567

568568
private TSStatus loadTsFileAsync(final String dataBaseName, final List<String> absolutePaths)
569569
throws IOException {
570-
if (Objects.nonNull(dataBaseName)) {
571-
throw new PipeException(
572-
"Async load tsfile does not support table model tsfile. Given database name: "
573-
+ dataBaseName);
574-
}
575-
576570
final String loadActiveListeningPipeDir = IOTDB_CONFIG.getLoadActiveListeningPipeDir();
577571
if (Objects.isNull(loadActiveListeningPipeDir)) {
578572
throw new PipeException("Load active listening pipe dir is not set.");
579573
}
580574

575+
if (Objects.nonNull(dataBaseName)) {
576+
final File targetDir = new File(loadActiveListeningPipeDir, dataBaseName);
577+
return this.loadTsFileAsyncToTargetDir(targetDir, absolutePaths);
578+
}
579+
580+
return loadTsFileAsyncToTargetDir(new File(loadActiveListeningPipeDir), absolutePaths);
581+
}
582+
583+
private TSStatus loadTsFileAsyncToTargetDir(
584+
final File targetDir, final List<String> absolutePaths) throws IOException {
581585
for (final String absolutePath : absolutePaths) {
582586
if (absolutePath == null) {
583587
continue;
584588
}
585589
final File sourceFile = new File(absolutePath);
586590
if (!Objects.equals(
587-
loadActiveListeningPipeDir, sourceFile.getParentFile().getAbsolutePath())) {
591+
targetDir.getAbsolutePath(), sourceFile.getParentFile().getAbsolutePath())) {
588592
RetryUtils.retryOnException(
589593
() -> {
590-
FileUtils.moveFileWithMD5Check(sourceFile, new File(loadActiveListeningPipeDir));
594+
FileUtils.moveFileWithMD5Check(sourceFile, targetDir);
591595
return null;
592596
});
593597
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.auth.AuthException;
2424
import org.apache.iotdb.commons.conf.CommonDescriptor;
25+
import org.apache.iotdb.commons.utils.RetryUtils;
2526
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
2627
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2728
import org.apache.iotdb.db.exception.load.LoadAnalyzeException;
@@ -80,6 +81,8 @@
8081
import java.util.Objects;
8182
import java.util.Optional;
8283

84+
import static org.apache.iotdb.commons.utils.FileUtils.copyFileWithMD5Check;
85+
import static org.apache.iotdb.commons.utils.FileUtils.moveFileWithMD5Check;
8386
import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.DATABASE_NOT_SPECIFIED;
8487
import static org.apache.iotdb.db.queryengine.plan.execution.config.TableConfigTaskVisitor.validateDatabaseName;
8588
import static org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet.ANALYSIS;
@@ -117,6 +120,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
117120
// User specified configs
118121
private final int databaseLevel;
119122
private String databaseForTableData;
123+
private final boolean isAsyncLoad;
120124
private final boolean isVerifySchema;
121125
private final boolean isAutoCreateDatabase;
122126
private final boolean isDeleteAfterLoad;
@@ -143,6 +147,7 @@ public LoadTsFileAnalyzer(
143147

144148
this.databaseLevel = loadTsFileStatement.getDatabaseLevel();
145149
this.databaseForTableData = loadTsFileStatement.getDatabase();
150+
this.isAsyncLoad = loadTsFileStatement.isAsyncLoad();
146151
this.isVerifySchema = loadTsFileStatement.isVerifySchema();
147152
this.isAutoCreateDatabase = loadTsFileStatement.isAutoCreateDatabase();
148153
this.isDeleteAfterLoad = loadTsFileStatement.isDeleteAfterLoad();
@@ -166,6 +171,7 @@ public LoadTsFileAnalyzer(
166171

167172
this.databaseLevel = loadTsFileTableStatement.getDatabaseLevel();
168173
this.databaseForTableData = loadTsFileTableStatement.getDatabase();
174+
this.isAsyncLoad = loadTsFileTableStatement.isAsyncLoad();
169175
this.isVerifySchema = loadTsFileTableStatement.isVerifySchema();
170176
this.isAutoCreateDatabase = loadTsFileTableStatement.isAutoCreateDatabase();
171177
this.isDeleteAfterLoad = loadTsFileTableStatement.isDeleteAfterLoad();
@@ -199,6 +205,10 @@ public IAnalysis analyzeFileByFile(IAnalysis analysis) {
199205
return analysis;
200206
}
201207

208+
if (isAsyncLoad && doAsyncLoad(analysis)) {
209+
return analysis;
210+
}
211+
202212
try {
203213
if (!doAnalyzeFileByFile(analysis)) {
204214
return analysis;
@@ -268,6 +278,72 @@ private boolean checkBeforeAnalyzeFileByFile(IAnalysis analysis) {
268278
return true;
269279
}
270280

281+
private boolean doAsyncLoad(final IAnalysis analysis) {
282+
final String[] loadActiveListeningDirs =
283+
IoTDBDescriptor.getInstance().getConfig().getLoadActiveListeningDirs();
284+
String targetFilePath = null;
285+
for (int i = 0, size = loadActiveListeningDirs == null ? 0 : loadActiveListeningDirs.length;
286+
i < size;
287+
i++) {
288+
if (loadActiveListeningDirs[i] != null) {
289+
targetFilePath = loadActiveListeningDirs[i];
290+
break;
291+
}
292+
}
293+
if (targetFilePath == null) {
294+
LOGGER.warn("Load active listening dir is not set. Will try sync load instead.");
295+
return false;
296+
}
297+
298+
try {
299+
if (Objects.nonNull(databaseForTableData)) {
300+
loadTsFilesAsyncToTargetDir(new File(targetFilePath, databaseForTableData), tsFiles);
301+
} else {
302+
loadTsFilesAsyncToTargetDir(new File(targetFilePath), tsFiles);
303+
}
304+
} catch (Exception e) {
305+
LOGGER.warn(
306+
"Failed to async load tsfiles {} to target dir {}. Will try sync load instead.",
307+
tsFiles,
308+
targetFilePath,
309+
e);
310+
return false;
311+
}
312+
313+
analysis.setFinishQueryAfterAnalyze(true);
314+
setRealStatement(analysis);
315+
return true;
316+
}
317+
318+
private void loadTsFilesAsyncToTargetDir(final File targetDir, final List<File> files)
319+
throws IOException {
320+
for (final File file : files) {
321+
if (file == null) {
322+
continue;
323+
}
324+
325+
loadTsFileAsyncToTargetDir(targetDir, file);
326+
loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".resource"));
327+
loadTsFileAsyncToTargetDir(targetDir, new File(file.getAbsolutePath() + ".mods"));
328+
}
329+
}
330+
331+
private void loadTsFileAsyncToTargetDir(final File targetDir, final File file)
332+
throws IOException {
333+
if (!file.exists()) {
334+
return;
335+
}
336+
RetryUtils.retryOnException(
337+
() -> {
338+
if (isDeleteAfterLoad) {
339+
moveFileWithMD5Check(file, targetDir);
340+
} else {
341+
copyFileWithMD5Check(file, targetDir);
342+
}
343+
return null;
344+
});
345+
}
346+
271347
private boolean doAnalyzeFileByFile(IAnalysis analysis) {
272348
// analyze tsfile metadata file by file
273349
for (int i = 0, tsfileNum = tsFiles.size(); i < tsfileNum; i++) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/sql/ast/LoadTsFile.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class LoadTsFile extends Statement {
4646
private long tabletConversionThresholdBytes = -1;
4747
private boolean autoCreateDatabase = true;
4848
private boolean verify = true;
49+
private boolean isAsyncLoad = false;
4950

5051
private boolean isGeneratedByPipe = false;
5152

@@ -138,6 +139,10 @@ public LoadTsFile setDatabase(String database) {
138139
return this;
139140
}
140141

142+
public boolean isAsyncLoad() {
143+
return isAsyncLoad;
144+
}
145+
141146
public void markIsGeneratedByPipe() {
142147
isGeneratedByPipe = true;
143148
}
@@ -183,6 +188,7 @@ private void initAttributes() {
183188
this.tabletConversionThresholdBytes =
184189
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
185190
this.verify = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
191+
this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
186192
}
187193

188194
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/crud/LoadTsFileStatement.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.util.List;
4444
import java.util.Map;
4545

46+
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.ASYNC_LOAD_KEY;
4647
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.CONVERT_ON_TYPE_MISMATCH_KEY;
4748
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_LEVEL_KEY;
4849
import static org.apache.iotdb.db.storageengine.load.config.LoadTsFileConfigurator.DATABASE_NAME_KEY;
@@ -62,6 +63,7 @@ public class LoadTsFileStatement extends Statement {
6263
private long tabletConversionThresholdBytes = -1;
6364
private boolean autoCreateDatabase = true;
6465
private boolean isGeneratedByPipe = false;
66+
private boolean isAsyncLoad = false;
6567

6668
private Map<String, String> loadAttributes;
6769

@@ -249,6 +251,10 @@ public void setLoadAttributes(final Map<String, String> loadAttributes) {
249251
initAttributes();
250252
}
251253

254+
public boolean isAsyncLoad() {
255+
return isAsyncLoad;
256+
}
257+
252258
private void initAttributes() {
253259
this.databaseLevel = LoadTsFileConfigurator.parseOrGetDefaultDatabaseLevel(loadAttributes);
254260
this.database = LoadTsFileConfigurator.parseDatabaseName(loadAttributes);
@@ -258,6 +264,7 @@ private void initAttributes() {
258264
this.tabletConversionThresholdBytes =
259265
LoadTsFileConfigurator.parseOrGetDefaultTabletConversionThresholdBytes(loadAttributes);
260266
this.verifySchema = LoadTsFileConfigurator.parseOrGetDefaultVerify(loadAttributes);
267+
this.isAsyncLoad = LoadTsFileConfigurator.parseOrGetDefaultAsyncLoad(loadAttributes);
261268
}
262269

263270
public boolean reconstructStatementIfMiniFileConverted(final List<Boolean> isMiniTsFile) {
@@ -326,6 +333,7 @@ public org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement toRelat
326333
loadAttributes.put(CONVERT_ON_TYPE_MISMATCH_KEY, String.valueOf(convertOnTypeMismatch));
327334
loadAttributes.put(
328335
TABLET_CONVERSION_THRESHOLD_KEY, String.valueOf(tabletConversionThresholdBytes));
336+
loadAttributes.put(ASYNC_LOAD_KEY, String.valueOf(isAsyncLoad));
329337

330338
return new LoadTsFile(null, file.getAbsolutePath(), loadAttributes);
331339
}
@@ -350,6 +358,8 @@ public String toString() {
350358
+ convertOnTypeMismatch
351359
+ ", tablet-conversion-threshold="
352360
+ tabletConversionThresholdBytes
361+
+ ", async-load="
362+
+ isAsyncLoad
353363
+ ", tsFiles size="
354364
+ tsFiles.size()
355365
+ '}';

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/active/ActiveLoadTsFileLoader.java

Lines changed: 66 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
import org.apache.iotdb.commons.concurrent.ThreadName;
2525
import org.apache.iotdb.commons.concurrent.threadpool.WrappedThreadPoolExecutor;
2626
import org.apache.iotdb.commons.conf.CommonDescriptor;
27+
import org.apache.iotdb.commons.conf.IoTDBConstant;
2728
import org.apache.iotdb.commons.utils.RetryUtils;
2829
import org.apache.iotdb.db.auth.AuthorityChecker;
2930
import org.apache.iotdb.db.conf.IoTDBConfig;
3031
import org.apache.iotdb.db.conf.IoTDBDescriptor;
32+
import org.apache.iotdb.db.protocol.session.IClientSession;
33+
import org.apache.iotdb.db.protocol.session.InternalClientSession;
3134
import org.apache.iotdb.db.protocol.session.SessionManager;
32-
import org.apache.iotdb.db.queryengine.common.SessionInfo;
3335
import org.apache.iotdb.db.queryengine.plan.Coordinator;
3436
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
3537
import org.apache.iotdb.db.queryengine.plan.analyze.schema.ClusterSchemaFetcher;
@@ -54,6 +56,7 @@
5456
import java.nio.file.SimpleFileVisitor;
5557
import java.nio.file.attribute.BasicFileAttributes;
5658
import java.time.ZoneId;
59+
import java.util.List;
5760
import java.util.Objects;
5861
import java.util.Optional;
5962
import java.util.concurrent.LinkedBlockingQueue;
@@ -67,6 +70,8 @@ public class ActiveLoadTsFileLoader {
6770

6871
private static final IoTDBConfig IOTDB_CONFIG = IoTDBDescriptor.getInstance().getConfig();
6972

73+
private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
74+
7075
private static final int MAX_PENDING_SIZE = 1000;
7176
private final ActiveLoadPendingQueue pendingQueue = new ActiveLoadPendingQueue();
7277

@@ -149,30 +154,43 @@ private void adjustExecutorIfNecessary() {
149154
}
150155

151156
private void tryLoadPendingTsFiles() {
152-
while (true) {
153-
final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
154-
if (!filePair.isPresent()) {
155-
return;
156-
}
157+
final IClientSession session =
158+
new InternalClientSession(
159+
String.format(
160+
"%s_%s",
161+
ActiveLoadTsFileLoader.class.getSimpleName(), Thread.currentThread().getName()));
162+
session.setUsername(AuthorityChecker.SUPER_USER);
163+
session.setClientVersion(IoTDBConstant.ClientVersion.V_1_0);
164+
session.setZoneId(ZoneId.systemDefault());
165+
166+
try {
167+
while (true) {
168+
final Optional<Pair<String, Boolean>> filePair = tryGetNextPendingFile();
169+
if (!filePair.isPresent()) {
170+
return;
171+
}
157172

158-
try {
159-
final TSStatus result = loadTsFile(filePair.get());
160-
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
161-
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
162-
LOGGER.info(
163-
"Successfully auto load tsfile {} (isGeneratedByPipe = {})",
164-
filePair.get().getLeft(),
165-
filePair.get().getRight());
166-
} else {
167-
handleLoadFailure(filePair.get(), result);
173+
try {
174+
final TSStatus result = loadTsFile(filePair.get(), session);
175+
if (result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
176+
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()) {
177+
LOGGER.info(
178+
"Successfully auto load tsfile {} (isGeneratedByPipe = {})",
179+
filePair.get().getLeft(),
180+
filePair.get().getRight());
181+
} else {
182+
handleLoadFailure(filePair.get(), result);
183+
}
184+
} catch (final FileNotFoundException e) {
185+
handleFileNotFoundException(filePair.get());
186+
} catch (final Exception e) {
187+
handleOtherException(filePair.get(), e);
188+
} finally {
189+
pendingQueue.removeFromLoading(filePair.get().getLeft());
168190
}
169-
} catch (final FileNotFoundException e) {
170-
handleFileNotFoundException(filePair.get());
171-
} catch (final Exception e) {
172-
handleOtherException(filePair.get(), e);
173-
} finally {
174-
pendingQueue.removeFromLoading(filePair.get().getLeft());
175191
}
192+
} finally {
193+
SESSION_MANAGER.closeSession(session, Coordinator.getInstance()::cleanupQueryExecution);
176194
}
177195
}
178196

@@ -195,27 +213,39 @@ private Optional<Pair<String, Boolean>> tryGetNextPendingFile() {
195213
}
196214
}
197215

198-
private TSStatus loadTsFile(final Pair<String, Boolean> filePair) throws FileNotFoundException {
216+
private TSStatus loadTsFile(final Pair<String, Boolean> filePair, final IClientSession session)
217+
throws FileNotFoundException {
199218
final LoadTsFileStatement statement = new LoadTsFileStatement(filePair.getLeft());
219+
final List<File> files = statement.getTsFiles();
220+
if (!files.isEmpty()) {
221+
final File parentFile = files.get(0).getParentFile();
222+
statement.setDatabase(parentFile == null ? "null" : parentFile.getName());
223+
}
200224
statement.setDeleteAfterLoad(true);
201225
statement.setConvertOnTypeMismatch(true);
202226
statement.setVerifySchema(isVerify);
203227
statement.setAutoCreateDatabase(false);
204-
return executeStatement(filePair.getRight() ? new PipeEnrichedStatement(statement) : statement);
228+
return executeStatement(
229+
filePair.getRight() ? new PipeEnrichedStatement(statement) : statement, session);
205230
}
206231

207-
private TSStatus executeStatement(final Statement statement) {
208-
return Coordinator.getInstance()
209-
.executeForTreeModel(
210-
statement,
211-
SessionManager.getInstance().requestQueryId(),
212-
new SessionInfo(0, AuthorityChecker.SUPER_USER, ZoneId.systemDefault()),
213-
"",
214-
ClusterPartitionFetcher.getInstance(),
215-
ClusterSchemaFetcher.getInstance(),
216-
IOTDB_CONFIG.getQueryTimeoutThreshold(),
217-
false)
218-
.status;
232+
private TSStatus executeStatement(final Statement statement, final IClientSession session) {
233+
SESSION_MANAGER.registerSession(session);
234+
try {
235+
return Coordinator.getInstance()
236+
.executeForTreeModel(
237+
statement,
238+
SESSION_MANAGER.requestQueryId(),
239+
SESSION_MANAGER.getSessionInfo(session),
240+
"",
241+
ClusterPartitionFetcher.getInstance(),
242+
ClusterSchemaFetcher.getInstance(),
243+
IOTDB_CONFIG.getQueryTimeoutThreshold(),
244+
false)
245+
.status;
246+
} finally {
247+
SESSION_MANAGER.removeCurrSession();
248+
}
219249
}
220250

221251
private void handleLoadFailure(final Pair<String, Boolean> filePair, final TSStatus status) {

0 commit comments

Comments
 (0)