Skip to content

Commit 9b6b323

Browse files
Pipe/Load: Assign distinct progress indexes for loading tsfiles in time partitions to reduce pipe reprocessing after restart & Decrease pipe heartbeat interval (apache#15583)
1 parent bed5450 commit 9b6b323

File tree

8 files changed

+95
-53
lines changed

8 files changed

+95
-53
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -615,17 +615,10 @@ private boolean mayTsFileContainUnprocessedData(final TsFileResource resource) {
615615
}
616616

617617
if (startIndex instanceof StateProgressIndex) {
618-
// Some different tsFiles may share the same max progressIndex, thus tsFiles with an
619-
// "equals" max progressIndex must be transmitted to avoid data loss
620-
final ProgressIndex innerProgressIndex =
621-
((StateProgressIndex) startIndex).getInnerProgressIndex();
622-
return !innerProgressIndex.isAfter(resource.getMaxProgressIndexAfterClose())
623-
&& !innerProgressIndex.equals(resource.getMaxProgressIndexAfterClose());
618+
startIndex = ((StateProgressIndex) startIndex).getInnerProgressIndex();
624619
}
625-
626-
// Some different tsFiles may share the same max progressIndex, thus tsFiles with an
627-
// "equals" max progressIndex must be transmitted to avoid data loss
628-
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose());
620+
return !startIndex.isAfter(resource.getMaxProgressIndexAfterClose())
621+
&& !startIndex.equals(resource.getMaxProgressIndexAfterClose());
629622
}
630623

631624
private boolean mayTsFileResourceOverlappedWithPattern(final TsFileResource resource) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.iotdb.common.rpc.thrift.TShowConfigurationResp;
3838
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
3939
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
40+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
4041
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
4142
import org.apache.iotdb.commons.cluster.NodeStatus;
4243
import org.apache.iotdb.commons.conf.CommonConfig;
@@ -517,14 +518,18 @@ public TLoadResp sendTsFilePieceNode(final TTsFilePieceReq req) {
517518

518519
@Override
519520
public TLoadResp sendLoadCommand(TLoadCommandReq req) {
520-
final ProgressIndex progressIndex;
521-
if (req.isSetProgressIndex()) {
522-
progressIndex = ProgressIndexType.deserializeFrom(ByteBuffer.wrap(req.getProgressIndex()));
521+
final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap = new HashMap<>();
522+
if (req.isSetTimePartition2ProgressIndex()) {
523+
for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
524+
req.getTimePartition2ProgressIndex().entrySet()) {
525+
timePartitionProgressIndexMap.put(
526+
entry.getKey(), ProgressIndexType.deserializeFrom(entry.getValue()));
527+
}
523528
} else {
524-
// fallback to use local generated progress index for compatibility
525-
progressIndex = PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
526-
LOGGER.info(
527-
"Use local generated load progress index {} for uuid {}.", progressIndex, req.uuid);
529+
final TSStatus status = new TSStatus();
530+
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
531+
status.setMessage("Load command requires time partition to progress index map");
532+
return createTLoadResp(status);
528533
}
529534

530535
return createTLoadResp(
@@ -533,7 +538,7 @@ public TLoadResp sendLoadCommand(TLoadCommandReq req) {
533538
LoadTsFileScheduler.LoadCommand.values()[req.commandType],
534539
req.uuid,
535540
req.isSetIsGeneratedByPipe() && req.isGeneratedByPipe,
536-
progressIndex));
541+
timePartitionProgressIndexMap));
537542
}
538543

539544
@Override

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
2424
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
2525
import org.apache.iotdb.common.rpc.thrift.TSStatus;
26+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2627
import org.apache.iotdb.commons.client.IClientManager;
2728
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
2829
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -56,8 +57,10 @@
5657
import java.io.IOException;
5758
import java.net.SocketTimeoutException;
5859
import java.nio.ByteBuffer;
60+
import java.util.HashMap;
5961
import java.util.HashSet;
6062
import java.util.List;
63+
import java.util.Map;
6164
import java.util.Set;
6265
import java.util.concurrent.ExecutorService;
6366
import java.util.concurrent.Future;
@@ -219,7 +222,7 @@ private void dispatchRemote(TTsFilePieceReq loadTsFileReq, TEndPoint endPoint)
219222
}
220223

221224
public Future<FragInstanceDispatchResult> dispatchCommand(
222-
TLoadCommandReq loadCommandReq, Set<TRegionReplicaSet> replicaSets) {
225+
TLoadCommandReq originalLoadCommandReq, Set<TRegionReplicaSet> replicaSets) {
223226
Set<TEndPoint> allEndPoint = new HashSet<>();
224227
for (TRegionReplicaSet replicaSet : replicaSets) {
225228
for (TDataNodeLocation dataNodeLocation : replicaSet.getDataNodeLocations()) {
@@ -228,23 +231,27 @@ public Future<FragInstanceDispatchResult> dispatchCommand(
228231
}
229232

230233
for (TEndPoint endPoint : allEndPoint) {
234+
// duplicate for progress index binary serialization
235+
final TLoadCommandReq duplicatedLoadCommandReq = originalLoadCommandReq.deepCopy();
231236
try (SetThreadName threadName =
232237
new SetThreadName(
233238
"load-dispatcher"
234239
+ "-"
235-
+ LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType]
240+
+ LoadTsFileScheduler.LoadCommand.values()[duplicatedLoadCommandReq.commandType]
236241
+ "-"
237-
+ loadCommandReq.uuid)) {
242+
+ duplicatedLoadCommandReq.uuid)) {
238243
if (isDispatchedToLocal(endPoint)) {
239-
dispatchLocally(loadCommandReq);
244+
dispatchLocally(duplicatedLoadCommandReq);
240245
} else {
241-
dispatchRemote(loadCommandReq, endPoint);
246+
dispatchRemote(duplicatedLoadCommandReq, endPoint);
242247
}
243248
} catch (FragmentInstanceDispatchException e) {
244-
LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", loadCommandReq, e);
249+
LOGGER.warn(
250+
"Cannot dispatch LoadCommand for load operation {}", duplicatedLoadCommandReq, e);
245251
return immediateFuture(new FragInstanceDispatchResult(e.getFailureStatus()));
246252
} catch (Exception t) {
247-
LOGGER.warn("Cannot dispatch LoadCommand for load operation {}", loadCommandReq, t);
253+
LOGGER.warn(
254+
"Cannot dispatch LoadCommand for load operation {}", duplicatedLoadCommandReq, t);
248255
return immediateFuture(
249256
new FragInstanceDispatchResult(
250257
RpcUtils.getStatus(
@@ -256,17 +263,18 @@ public Future<FragInstanceDispatchResult> dispatchCommand(
256263

257264
private void dispatchLocally(TLoadCommandReq loadCommandReq)
258265
throws FragmentInstanceDispatchException {
259-
final ProgressIndex progressIndex;
260-
if (loadCommandReq.isSetProgressIndex()) {
261-
progressIndex =
262-
ProgressIndexType.deserializeFrom(ByteBuffer.wrap(loadCommandReq.getProgressIndex()));
266+
final Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap = new HashMap<>();
267+
if (loadCommandReq.isSetTimePartition2ProgressIndex()) {
268+
for (Map.Entry<TTimePartitionSlot, ByteBuffer> entry :
269+
loadCommandReq.getTimePartition2ProgressIndex().entrySet()) {
270+
timePartitionProgressIndexMap.put(
271+
entry.getKey(), ProgressIndexType.deserializeFrom(entry.getValue()));
272+
}
263273
} else {
264-
// fallback to use local generated progress index for compatibility
265-
progressIndex = PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad();
266-
LOGGER.info(
267-
"Use local generated load progress index {} for uuid {}.",
268-
progressIndex,
269-
loadCommandReq.uuid);
274+
final TSStatus status = new TSStatus();
275+
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
276+
status.setMessage("Load command requires time partition to progress index map");
277+
throw new FragmentInstanceDispatchException(status);
270278
}
271279

272280
final TSStatus resultStatus =
@@ -275,7 +283,7 @@ private void dispatchLocally(TLoadCommandReq loadCommandReq)
275283
LoadTsFileScheduler.LoadCommand.values()[loadCommandReq.commandType],
276284
loadCommandReq.uuid,
277285
loadCommandReq.isSetIsGeneratedByPipe() && loadCommandReq.isGeneratedByPipe,
278-
progressIndex);
286+
timePartitionProgressIndexMap);
279287
if (!RpcUtils.SUCCESS_STATUS.equals(resultStatus)) {
280288
throw new FragmentInstanceDispatchException(resultStatus);
281289
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.commons.conf.CommonDescriptor;
3030
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
3131
import org.apache.iotdb.commons.consensus.DataRegionId;
32+
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
3233
import org.apache.iotdb.commons.exception.IoTDBException;
3334
import org.apache.iotdb.commons.partition.DataPartition;
3435
import org.apache.iotdb.commons.partition.DataPartitionQueryParam;
@@ -140,6 +141,7 @@ public class LoadTsFileScheduler implements IScheduler {
140141
private final PlanFragmentId fragmentId;
141142
private final Set<TRegionReplicaSet> allReplicaSets;
142143
private final boolean isGeneratedByPipe;
144+
private final Map<TTimePartitionSlot, ProgressIndex> timePartitionSlotToProgressIndex;
143145
private final LoadTsFileDataCacheMemoryBlock block;
144146

145147
public LoadTsFileScheduler(
@@ -158,6 +160,7 @@ public LoadTsFileScheduler(
158160
this.partitionFetcher = new DataPartitionBatchFetcher(partitionFetcher);
159161
this.allReplicaSets = new HashSet<>();
160162
this.isGeneratedByPipe = isGeneratedByPipe;
163+
this.timePartitionSlotToProgressIndex = new HashMap<>();
161164
this.block = LoadTsFileMemoryManager.getInstance().allocateDataCacheMemoryBlock();
162165

163166
for (FragmentInstance fragmentInstance : distributedQueryPlan.getInstances()) {
@@ -408,7 +411,26 @@ private boolean secondPhase(
408411

409412
try {
410413
loadCommandReq.setIsGeneratedByPipe(isGeneratedByPipe);
411-
loadCommandReq.setProgressIndex(assignProgressIndex(tsFileResource));
414+
loadCommandReq.setTimePartition2ProgressIndex(
415+
timePartitionSlotToProgressIndex.entrySet().stream()
416+
.collect(
417+
Collectors.toMap(
418+
Map.Entry::getKey,
419+
entry -> {
420+
try (final PublicBAOS byteArrayOutputStream = new PublicBAOS();
421+
final DataOutputStream dataOutputStream =
422+
new DataOutputStream(byteArrayOutputStream)) {
423+
entry.getValue().serialize(dataOutputStream);
424+
return ByteBuffer.wrap(
425+
byteArrayOutputStream.getBuf(), 0, byteArrayOutputStream.size());
426+
} catch (final IOException e) {
427+
throw new RuntimeException(
428+
String.format(
429+
"Serialize Progress Index error, isFirstPhaseSuccess: %s, uuid: %s, tsFile: %s",
430+
isFirstPhaseSuccess, uuid, tsFile.getAbsolutePath()),
431+
e);
432+
}
433+
})));
412434
Future<FragInstanceDispatchResult> dispatchResultFuture =
413435
dispatcher.dispatchCommand(loadCommandReq, allReplicaSets);
414436

@@ -431,14 +453,6 @@ private boolean secondPhase(
431453
stateMachine.transitionToFailed(status);
432454
return false;
433455
}
434-
} catch (IOException e) {
435-
LOGGER.warn(
436-
"Serialize Progress Index error, isFirstPhaseSuccess: {}, uuid: {}, tsFile: {}",
437-
isFirstPhaseSuccess,
438-
uuid,
439-
tsFile.getAbsolutePath());
440-
stateMachine.transitionToFailed(e);
441-
return false;
442456
} catch (InterruptedException | ExecutionException e) {
443457
if (e instanceof InterruptedException) {
444458
Thread.currentThread().interrupt();
@@ -639,6 +653,12 @@ public FragmentInfo getFragmentInfo() {
639653
return null;
640654
}
641655

656+
private void computeTimePartitionSlotToProgressIndexIfAbsent(
657+
final TTimePartitionSlot timePartitionSlot) {
658+
timePartitionSlotToProgressIndex.putIfAbsent(
659+
timePartitionSlot, PipeDataNodeAgent.runtime().getNextProgressIndexForTsFileLoad());
660+
}
661+
642662
public enum LoadCommand {
643663
EXECUTE,
644664
ROLLBACK
@@ -686,6 +706,7 @@ private boolean addOrSendChunkData(ChunkData chunkData) throws LoadFileException
686706
nonDirectionalChunkData.add(chunkData);
687707
dataSize += chunkData.getDataSize();
688708
block.addMemoryUsage(chunkData.getDataSize());
709+
scheduler.computeTimePartitionSlotToProgressIndexIfAbsent(chunkData.getTimePartitionSlot());
689710

690711
if (!isMemoryEnough()) {
691712
routeChunkData();

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2323
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
2424
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
25+
import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
2526
import org.apache.iotdb.commons.concurrent.ExceptionalCountDownLatch;
2627
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
2728
import org.apache.iotdb.commons.concurrent.ThreadName;
@@ -980,13 +981,13 @@ public TSStatus executeLoadCommand(
980981
LoadTsFileScheduler.LoadCommand loadCommand,
981982
String uuid,
982983
boolean isGeneratedByPipe,
983-
ProgressIndex progressIndex) {
984+
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap) {
984985
TSStatus status = new TSStatus();
985986

986987
try {
987988
switch (loadCommand) {
988989
case EXECUTE:
989-
if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, progressIndex)) {
990+
if (loadTsFileManager.loadAll(uuid, isGeneratedByPipe, timePartitionProgressIndexMap)) {
990991
status = RpcUtils.SUCCESS_STATUS;
991992
} else {
992993
status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.conf.IoTDBConstant;
2525
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
2626
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
27+
import org.apache.iotdb.commons.consensus.index.impl.MinimumProgressIndex;
2728
import org.apache.iotdb.commons.file.SystemFileFactory;
2829
import org.apache.iotdb.commons.service.metric.MetricService;
2930
import org.apache.iotdb.commons.service.metric.enums.Metric;
@@ -273,7 +274,10 @@ private String getNextFolder() throws DiskSpaceInsufficientException {
273274
return FOLDER_MANAGER.get().getNextFolder();
274275
}
275276

276-
public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex progressIndex)
277+
public boolean loadAll(
278+
String uuid,
279+
boolean isGeneratedByPipe,
280+
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
277281
throws IOException, LoadFileException {
278282
if (!uuid2WriterManager.containsKey(uuid)) {
279283
return false;
@@ -282,7 +286,7 @@ public boolean loadAll(String uuid, boolean isGeneratedByPipe, ProgressIndex pro
282286
final Optional<CleanupTask> cleanupTask = Optional.of(uuid2CleanupTask.get(uuid));
283287
cleanupTask.ifPresent(CleanupTask::markLoadTaskRunning);
284288
try {
285-
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, progressIndex);
289+
uuid2WriterManager.get(uuid).loadAll(isGeneratedByPipe, timePartitionProgressIndexMap);
286290
} finally {
287291
cleanupTask.ifPresent(CleanupTask::markLoadTaskNotRunning);
288292
}
@@ -497,7 +501,9 @@ private void writeDeletion(DataRegion dataRegion, DeletionData deletionData)
497501
}
498502
}
499503

500-
private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
504+
private void loadAll(
505+
boolean isGeneratedByPipe,
506+
Map<TTimePartitionSlot, ProgressIndex> timePartitionProgressIndexMap)
501507
throws IOException, LoadFileException {
502508
if (isClosed) {
503509
throw new IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
@@ -516,7 +522,11 @@ private void loadAll(boolean isGeneratedByPipe, ProgressIndex progressIndex)
516522

517523
final DataRegion dataRegion = entry.getKey().getDataRegion();
518524
final TsFileResource tsFileResource = dataPartition2Resource.get(entry.getKey());
519-
endTsFileResource(writer, tsFileResource, progressIndex);
525+
endTsFileResource(
526+
writer,
527+
tsFileResource,
528+
timePartitionProgressIndexMap.getOrDefault(
529+
entry.getKey().getTimePartitionSlot(), MinimumProgressIndex.INSTANCE));
520530
dataRegion.loadNewTsFile(tsFileResource, true, isGeneratedByPipe);
521531

522532
// Metrics
@@ -681,6 +691,10 @@ public DataRegion getDataRegion() {
681691
return dataRegion;
682692
}
683693

694+
public TTimePartitionSlot getTimePartitionSlot() {
695+
return timePartitionSlot;
696+
}
697+
684698
@Override
685699
public String toString() {
686700
return String.join(

iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/conf/CommonConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,7 @@ public class CommonConfig {
253253
(int) (RpcUtils.THRIFT_FRAME_MAX_SIZE * 0.8);
254254

255255
private boolean isSeperatedPipeHeartbeatEnabled = true;
256-
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 100;
256+
private int pipeHeartbeatIntervalSecondsForCollectingPipeMeta = 30;
257257
private long pipeMetaSyncerInitialSyncDelayMinutes = 3;
258258
private long pipeMetaSyncerSyncIntervalMinutes = 3;
259259
private long pipeMetaSyncerAutoRestartPipeCheckIntervalRound = 1;

iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ struct TLoadCommandReq {
396396
1: required i32 commandType
397397
2: required string uuid
398398
3: optional bool isGeneratedByPipe
399-
4: optional binary progressIndex
399+
4: optional map<common.TTimePartitionSlot, binary> timePartition2ProgressIndex
400400
}
401401

402402
struct TAttributeUpdateReq {

0 commit comments

Comments
 (0)