Skip to content

Commit f4b5d0e

Browse files
Load: Batched tablet insertion during conversion (#15125) (#15247)
1 parent bad9c00 commit f4b5d0e

File tree

8 files changed

+180
-69
lines changed

8 files changed

+180
-69
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1142,6 +1142,8 @@ public class IoTDBConfig {
11421142
private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
11431143
0L; // 0 means that the decision will be adaptive based on the number of sequences
11441144

1145+
private long loadTsFileTabletConversionBatchMemorySizeInBytes = 4096 * 1024;
1146+
11451147
private long loadChunkMetadataMemorySizeInBytes = 33554432; // 32MB
11461148

11471149
private long loadMemoryAllocateRetryIntervalMs = 1000L;
@@ -3990,6 +3992,16 @@ public void setLoadTsFileAnalyzeSchemaMemorySizeInBytes(
39903992
this.loadTsFileAnalyzeSchemaMemorySizeInBytes = loadTsFileAnalyzeSchemaMemorySizeInBytes;
39913993
}
39923994

3995+
public long getLoadTsFileTabletConversionBatchMemorySizeInBytes() {
3996+
return loadTsFileTabletConversionBatchMemorySizeInBytes;
3997+
}
3998+
3999+
public void setLoadTsFileTabletConversionBatchMemorySizeInBytes(
4000+
long loadTsFileTabletConversionBatchMemorySizeInBytes) {
4001+
this.loadTsFileTabletConversionBatchMemorySizeInBytes =
4002+
loadTsFileTabletConversionBatchMemorySizeInBytes;
4003+
}
4004+
39934005
public long getLoadChunkMetadataMemorySizeInBytes() {
39944006
return loadChunkMetadataMemorySizeInBytes;
39954007
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2381,6 +2381,11 @@ private void loadLoadTsFileProps(TrimProperties properties) throws IOException {
23812381
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes())))
23822382
.map(String::trim)
23832383
.orElse(String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
2384+
conf.setLoadTsFileTabletConversionBatchMemorySizeInBytes(
2385+
Long.parseLong(
2386+
properties.getProperty(
2387+
"load_tsfile_tablet_conversion_batch_memory_size_in_bytes",
2388+
String.valueOf(conf.getLoadTsFileTabletConversionBatchMemorySizeInBytes()))));
23842389
conf.setLoadChunkMetadataMemorySizeInBytes(
23852390
Long.parseLong(
23862391
Optional.ofNullable(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
import org.apache.iotdb.db.storageengine.dataregion.tsfile.timeindex.ITimeIndex;
6666
import org.apache.iotdb.db.storageengine.dataregion.utils.TsFileResourceUtils;
6767
import org.apache.iotdb.db.storageengine.load.converter.LoadTsFileDataTypeConverter;
68-
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileAnalyzeSchemaMemoryBlock;
68+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
6969
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
7070
import org.apache.iotdb.db.storageengine.load.metrics.LoadTsFileCostMetricsSet;
7171
import org.apache.iotdb.db.utils.ModificationUtils;
@@ -882,7 +882,7 @@ public void close() {
882882

883883
private static class LoadTsFileAnalyzeSchemaCache {
884884

885-
private final LoadTsFileAnalyzeSchemaMemoryBlock block;
885+
private final LoadTsFileMemoryBlock block;
886886

887887
private Map<IDeviceID, Set<MeasurementSchema>> currentBatchDevice2TimeSeriesSchemas;
888888
private Map<IDeviceID, Boolean> tsFileDevice2IsAligned;
@@ -902,7 +902,7 @@ private static class LoadTsFileAnalyzeSchemaCache {
902902
public LoadTsFileAnalyzeSchemaCache() throws LoadRuntimeOutOfMemoryException {
903903
this.block =
904904
LoadTsFileMemoryManager.getInstance()
905-
.allocateAnalyzeSchemaMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
905+
.allocateMemoryBlock(ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES);
906906
this.currentBatchDevice2TimeSeriesSchemas = new HashMap<>();
907907
this.tsFileDevice2IsAligned = new HashMap<>();
908908
this.alreadySetDatabases = new HashSet<>();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadConvertedInsertTabletStatementTSStatusVisitor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
2727
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
2828
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertBaseStatement;
29+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
2930
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
3031
import org.apache.iotdb.rpc.TSStatusCode;
3132

@@ -45,6 +46,12 @@ public TSStatus visitInsertTablet(
4546
return visitInsertBase(insertTabletStatement, context);
4647
}
4748

49+
@Override
50+
public TSStatus visitInsertMultiTablets(
51+
final InsertMultiTabletsStatement insertMultiTabletsStatement, final TSStatus context) {
52+
return visitInsertBase(insertMultiTabletsStatement, context);
53+
}
54+
4855
private TSStatus visitInsertBase(
4956
final InsertBaseStatement insertBaseStatement, final TSStatus context) {
5057
if (context.getCode() == TSStatusCode.SYSTEM_READ_ONLY.getStatusCode()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java

Lines changed: 138 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,18 @@
2121

2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBPipePattern;
24+
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2425
import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferTabletRawReq;
2526
import org.apache.iotdb.db.pipe.event.common.tsfile.container.scan.TsFileInsertionScanDataContainer;
2627
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
2728
import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
2829
import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
30+
import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
2931
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
3032
import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
3133
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
34+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryBlock;
35+
import org.apache.iotdb.db.storageengine.load.memory.LoadTsFileMemoryManager;
3236
import org.apache.iotdb.rpc.TSStatusCode;
3337

3438
import org.apache.commons.io.FileUtils;
@@ -38,16 +42,25 @@
3842
import org.slf4j.LoggerFactory;
3943

4044
import java.io.File;
45+
import java.util.ArrayList;
46+
import java.util.List;
4147
import java.util.Optional;
48+
import java.util.stream.Collectors;
49+
50+
import static org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil.calculateTabletSizeInBytes;
4251

4352
public class LoadTreeStatementDataTypeConvertExecutionVisitor
4453
extends StatementVisitor<Optional<TSStatus>, Void> {
45-
46-
private final StatementExecutor statementExecutor;
47-
4854
private static final Logger LOGGER =
4955
LoggerFactory.getLogger(LoadTreeStatementDataTypeConvertExecutionVisitor.class);
5056

57+
private static final long TABLET_BATCH_MEMORY_SIZE_IN_BYTES =
58+
IoTDBDescriptor.getInstance()
59+
.getConfig()
60+
.getLoadTsFileTabletConversionBatchMemorySizeInBytes();
61+
62+
private final StatementExecutor statementExecutor;
63+
5164
@FunctionalInterface
5265
public interface StatementExecutor {
5366
TSStatus execute(final Statement statement);
@@ -69,60 +82,83 @@ public Optional<TSStatus> visitLoadFile(
6982

7083
LOGGER.info("Start data type conversion for LoadTsFileStatement: {}", loadTsFileStatement);
7184

72-
for (final File file : loadTsFileStatement.getTsFiles()) {
73-
try (final TsFileInsertionScanDataContainer container =
74-
new TsFileInsertionScanDataContainer(
75-
file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
76-
for (final Pair<Tablet, Boolean> tabletWithIsAligned : container.toTabletWithIsAligneds()) {
77-
final LoadConvertedInsertTabletStatement statement =
78-
new LoadConvertedInsertTabletStatement(
79-
PipeTransferTabletRawReq.toTPipeTransferRawReq(
80-
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight())
81-
.constructStatement(),
82-
loadTsFileStatement.isConvertOnTypeMismatch());
83-
84-
TSStatus result;
85-
try {
86-
result =
87-
statement.accept(
88-
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
89-
statementExecutor.execute(statement));
90-
91-
// Retry max 5 times if the write process is rejected
92-
for (int i = 0;
93-
i < 5
94-
&& result.getCode()
95-
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
96-
i++) {
97-
Thread.sleep(100L * (i + 1));
98-
result =
99-
statement.accept(
100-
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
101-
statementExecutor.execute(statement));
85+
final LoadTsFileMemoryBlock block =
86+
LoadTsFileMemoryManager.getInstance()
87+
.allocateMemoryBlock(TABLET_BATCH_MEMORY_SIZE_IN_BYTES);
88+
final List<PipeTransferTabletRawReq> tabletRawReqs = new ArrayList<>();
89+
final List<Long> tabletRawReqSizes = new ArrayList<>();
90+
91+
try {
92+
for (final File file : loadTsFileStatement.getTsFiles()) {
93+
try (final TsFileInsertionScanDataContainer container =
94+
new TsFileInsertionScanDataContainer(
95+
file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
96+
for (final Pair<Tablet, Boolean> tabletWithIsAligned :
97+
container.toTabletWithIsAligneds()) {
98+
final PipeTransferTabletRawReq tabletRawReq =
99+
PipeTransferTabletRawReq.toTPipeTransferRawReq(
100+
tabletWithIsAligned.getLeft(), tabletWithIsAligned.getRight());
101+
final long curMemory = calculateTabletSizeInBytes(tabletWithIsAligned.getLeft()) + 1;
102+
if (block.hasEnoughMemory(curMemory)) {
103+
tabletRawReqs.add(tabletRawReq);
104+
tabletRawReqSizes.add(curMemory);
105+
block.addMemoryUsage(curMemory);
106+
continue;
102107
}
103-
} catch (final Exception e) {
104-
if (e instanceof InterruptedException) {
105-
Thread.currentThread().interrupt();
108+
109+
final TSStatus result =
110+
executeInsertMultiTabletsWithRetry(
111+
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());
112+
113+
for (final long memoryCost : tabletRawReqSizes) {
114+
block.reduceMemoryUsage(memoryCost);
115+
}
116+
tabletRawReqs.clear();
117+
tabletRawReqSizes.clear();
118+
119+
if (!handleTSStatus(result, loadTsFileStatement)) {
120+
return Optional.empty();
106121
}
107-
result = statement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
122+
123+
tabletRawReqs.add(tabletRawReq);
124+
tabletRawReqSizes.add(curMemory);
125+
block.addMemoryUsage(curMemory);
126+
}
127+
} catch (final Exception e) {
128+
LOGGER.warn(
129+
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
130+
return Optional.empty();
131+
}
132+
}
133+
134+
if (!tabletRawReqs.isEmpty()) {
135+
try {
136+
final TSStatus result =
137+
executeInsertMultiTabletsWithRetry(
138+
tabletRawReqs, loadTsFileStatement.isConvertOnTypeMismatch());
139+
140+
for (final long memoryCost : tabletRawReqSizes) {
141+
block.reduceMemoryUsage(memoryCost);
108142
}
143+
tabletRawReqs.clear();
144+
tabletRawReqSizes.clear();
109145

110-
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
111-
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
112-
|| result.getCode()
113-
== TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
114-
LOGGER.warn(
115-
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
116-
loadTsFileStatement,
117-
result.getCode());
146+
if (!handleTSStatus(result, loadTsFileStatement)) {
118147
return Optional.empty();
119148
}
149+
} catch (final Exception e) {
150+
LOGGER.warn(
151+
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
152+
return Optional.empty();
120153
}
121-
} catch (final Exception e) {
122-
LOGGER.warn(
123-
"Failed to convert data type for LoadTsFileStatement: {}.", loadTsFileStatement, e);
124-
return Optional.empty();
125154
}
155+
} finally {
156+
for (final long memoryCost : tabletRawReqSizes) {
157+
block.reduceMemoryUsage(memoryCost);
158+
}
159+
tabletRawReqs.clear();
160+
tabletRawReqSizes.clear();
161+
block.close();
126162
}
127163

128164
if (loadTsFileStatement.isDeleteAfterLoad()) {
@@ -142,4 +178,57 @@ file, new IoTDBPipePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, null, null)) {
142178

143179
return Optional.of(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
144180
}
181+
182+
private TSStatus executeInsertMultiTabletsWithRetry(
183+
final List<PipeTransferTabletRawReq> tabletRawReqs, boolean isConvertOnTypeMismatch) {
184+
final InsertMultiTabletsStatement batchStatement = new InsertMultiTabletsStatement();
185+
batchStatement.setInsertTabletStatementList(
186+
tabletRawReqs.stream()
187+
.map(
188+
req ->
189+
new LoadConvertedInsertTabletStatement(
190+
req.constructStatement(), isConvertOnTypeMismatch))
191+
.collect(Collectors.toList()));
192+
193+
TSStatus result;
194+
try {
195+
result =
196+
batchStatement.accept(
197+
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
198+
statementExecutor.execute(batchStatement));
199+
200+
// Retry max 5 times if the write process is rejected
201+
for (int i = 0;
202+
i < 5
203+
&& result.getCode()
204+
== TSStatusCode.LOAD_TEMPORARY_UNAVAILABLE_EXCEPTION.getStatusCode();
205+
i++) {
206+
Thread.sleep(100L * (i + 1));
207+
result =
208+
batchStatement.accept(
209+
LoadTsFileDataTypeConverter.STATEMENT_STATUS_VISITOR,
210+
statementExecutor.execute(batchStatement));
211+
}
212+
} catch (final Exception e) {
213+
if (e instanceof InterruptedException) {
214+
Thread.currentThread().interrupt();
215+
}
216+
result = batchStatement.accept(LoadTsFileDataTypeConverter.STATEMENT_EXCEPTION_VISITOR, e);
217+
}
218+
return result;
219+
}
220+
221+
private static boolean handleTSStatus(
222+
final TSStatus result, final LoadTsFileStatement loadTsFileStatement) {
223+
if (!(result.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()
224+
|| result.getCode() == TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()
225+
|| result.getCode() == TSStatusCode.LOAD_IDEMPOTENT_CONFLICT_EXCEPTION.getStatusCode())) {
226+
LOGGER.warn(
227+
"Failed to convert data type for LoadTsFileStatement: {}, status code is {}.",
228+
loadTsFileStatement,
229+
result.getCode());
230+
return false;
231+
}
232+
return true;
233+
}
145234
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java renamed to iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryBlock.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,13 @@
3030

3131
import java.util.concurrent.atomic.AtomicLong;
3232

33-
public class LoadTsFileAnalyzeSchemaMemoryBlock extends LoadTsFileAbstractMemoryBlock {
34-
private static final Logger LOGGER =
35-
LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class);
33+
public class LoadTsFileMemoryBlock extends LoadTsFileAbstractMemoryBlock {
34+
private static final Logger LOGGER = LoggerFactory.getLogger(LoadTsFileMemoryBlock.class);
3635

3736
private final long totalMemorySizeInBytes;
3837
private final AtomicLong memoryUsageInBytes;
3938

40-
LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) {
39+
LoadTsFileMemoryBlock(long totalMemorySizeInBytes) {
4140
super();
4241

4342
this.totalMemorySizeInBytes = totalMemorySizeInBytes;
@@ -60,7 +59,7 @@ public synchronized void addMemoryUsage(long memoryInBytes) {
6059
Metric.LOAD_MEM.toString(),
6160
MetricLevel.IMPORTANT,
6261
Tag.NAME.toString(),
63-
LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
62+
LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY)
6463
.incr(memoryInBytes);
6564
}
6665

@@ -75,7 +74,7 @@ public synchronized void reduceMemoryUsage(long memoryInBytes) {
7574
Metric.LOAD_MEM.toString(),
7675
MetricLevel.IMPORTANT,
7776
Tag.NAME.toString(),
78-
LoadTsFileMemMetricSet.LOAD_TSFILE_ANALYZE_SCHEMA_MEMORY)
77+
LoadTsFileMemMetricSet.LOAD_TSFILE_OTHER_MEMORY)
7978
.decr(memoryInBytes);
8079
}
8180

@@ -90,7 +89,7 @@ protected synchronized void releaseAllMemory() {
9089

9190
@Override
9291
public String toString() {
93-
return "LoadTsFileAnalyzeSchemaMemoryBlock{"
92+
return "LoadTsFileMemoryBlock{"
9493
+ "totalMemorySizeInBytes="
9594
+ totalMemorySizeInBytes
9695
+ ", memoryUsageInBytes="

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -94,25 +94,24 @@ public synchronized void releaseToQuery(long sizeInBytes) {
9494
this.notifyAll();
9595
}
9696

97-
public synchronized LoadTsFileAnalyzeSchemaMemoryBlock allocateAnalyzeSchemaMemoryBlock(
98-
long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
97+
public synchronized LoadTsFileMemoryBlock allocateMemoryBlock(long sizeInBytes)
98+
throws LoadRuntimeOutOfMemoryException {
9999
try {
100100
forceAllocateFromQuery(sizeInBytes);
101101
if (LOGGER.isDebugEnabled()) {
102-
LOGGER.debug(
103-
"Load: Allocated AnalyzeSchemaMemoryBlock from query engine, size: {}", sizeInBytes);
102+
LOGGER.debug("Load: Allocated MemoryBlock from query engine, size: {}", sizeInBytes);
104103
}
105104
} catch (LoadRuntimeOutOfMemoryException e) {
106105
if (dataCacheMemoryBlock != null && dataCacheMemoryBlock.doShrink(sizeInBytes)) {
107106
LOGGER.info(
108-
"Load: Query engine's memory is not sufficient, allocated AnalyzeSchemaMemoryBlock from DataCacheMemoryBlock, size: {}",
107+
"Load: Query engine's memory is not sufficient, allocated MemoryBlock from DataCacheMemoryBlock, size: {}",
109108
sizeInBytes);
110109
usedMemorySizeInBytes.addAndGet(sizeInBytes);
111-
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
110+
return new LoadTsFileMemoryBlock(sizeInBytes);
112111
}
113112
throw e;
114113
}
115-
return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
114+
return new LoadTsFileMemoryBlock(sizeInBytes);
116115
}
117116

118117
public synchronized LoadTsFileDataCacheMemoryBlock allocateDataCacheMemoryBlock()

0 commit comments

Comments
 (0)