Skip to content

Commit 1ad56c7

Browse files
authored
Pipe: Enabled waiting for pipes to finish & progress index persist to config node in shutdown hook (#15896)
* persist in shutdown hook * Update PipeTaskAgent.java * Update PipeConfigNodeTaskAgent.java * Update DataNodeShutdownHook.java * Fix * Fix2 * Fix3 * Update PipeHeartbeatScheduler.java
1 parent 309f205 commit 1ad56c7

File tree

32 files changed

+202
-66
lines changed

32 files changed

+202
-66
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.client.async.handlers.rpc;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2425
import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
2526
import org.apache.iotdb.commons.client.request.AsyncRequestContext;
@@ -33,7 +34,6 @@
3334
import org.apache.iotdb.mpp.rpc.thrift.TCountPathsUsingTemplateResp;
3435
import org.apache.iotdb.mpp.rpc.thrift.TDeviceViewResp;
3536
import org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListResp;
36-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3737
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
3838
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaResp;
3939
import org.apache.iotdb.mpp.rpc.thrift.TPushTopicMetaResp;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/PipeHeartbeatRPCHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020
package org.apache.iotdb.confignode.client.async.handlers.rpc;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
24-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
2525

2626
import org.slf4j.Logger;
2727
import org.slf4j.LoggerFactory;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
3030
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
3131
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
32+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
3233
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
3334
import org.apache.iotdb.common.rpc.thrift.TSStatus;
3435
import org.apache.iotdb.common.rpc.thrift.TSchemaNode;
@@ -2917,6 +2918,23 @@ public TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap)
29172918
: new TFetchTableResp(status);
29182919
}
29192920

2921+
@Override
2922+
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
2923+
final TSStatus status = confirmLeader();
2924+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
2925+
return status;
2926+
}
2927+
pipeManager
2928+
.getPipeRuntimeCoordinator()
2929+
.parseHeartbeat(
2930+
dataNodeId,
2931+
resp.getPipeMetaList(),
2932+
resp.getPipeCompletedList(),
2933+
resp.getPipeRemainingEventCountList(),
2934+
resp.getPipeRemainingTimeList());
2935+
return StatusUtils.OK;
2936+
}
2937+
29202938
@Override
29212939
public DataSet registerAINode(TAINodeRegisterReq req) {
29222940
TSStatus status = confirmLeader();

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
25+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2526
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2627
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
2728
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -890,4 +891,6 @@ TDescTableResp describeTable(
890891
TDescTable4InformationSchemaResp describeTable4InformationSchema();
891892

892893
TFetchTableResp fetchTables(final Map<String, Set<String>> fetchTableMap);
894+
895+
TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp);
893896
}

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeTaskAgent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.iotdb.confignode.manager.pipe.agent.task;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2223
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
2324
import org.apache.iotdb.commons.consensus.index.impl.MetaProgressIndex;
2425
import org.apache.iotdb.commons.exception.IllegalPathException;
@@ -35,7 +36,6 @@
3536
import org.apache.iotdb.confignode.manager.pipe.metric.source.PipeConfigRegionExtractorMetrics;
3637
import org.apache.iotdb.confignode.manager.pipe.resource.PipeConfigNodeResourceManager;
3738
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
38-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3939
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
4040
import org.apache.iotdb.pipe.api.exception.PipeException;
4141

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/heartbeat/PipeHeartbeatScheduler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat;
2121

2222
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
23+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2324
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
2425
import org.apache.iotdb.commons.concurrent.ThreadName;
2526
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
@@ -31,7 +32,6 @@
3132
import org.apache.iotdb.confignode.manager.ConfigManager;
3233
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
3334
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
34-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
3535

3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
2424
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
2525
import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
26+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
2627
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2728
import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot;
2829
import org.apache.iotdb.common.rpc.thrift.TSetConfigurationReq;
@@ -1415,4 +1416,9 @@ public TDeleteTableDeviceResp deleteDevice(final TDeleteTableDeviceReq req) {
14151416
public TSStatus createTableView(final TCreateTableViewReq req) {
14161417
return configManager.createTableView(req);
14171418
}
1419+
1420+
@Override
1421+
public TSStatus pushHeartbeat(final int dataNodeId, final TPipeHeartbeatResp resp) {
1422+
return configManager.pushHeartbeat(dataNodeId, resp);
1423+
}
14181424
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.iotdb.db.pipe.agent.task;
2121

22+
import org.apache.iotdb.common.rpc.thrift.TPipeHeartbeatResp;
23+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
2224
import org.apache.iotdb.commons.consensus.DataRegionId;
2325
import org.apache.iotdb.commons.consensus.SchemaRegionId;
2426
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
@@ -49,10 +51,13 @@
4951
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
5052
import org.apache.iotdb.db.pipe.extractor.dataregion.realtime.listener.PipeInsertionDataNodeListener;
5153
import org.apache.iotdb.db.pipe.extractor.schemaregion.SchemaRegionListeningFilter;
52-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
54+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
5355
import org.apache.iotdb.db.pipe.metric.overview.PipeTsFileToTabletsMetrics;
5456
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
5557
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
58+
import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
59+
import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
60+
import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
5661
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
5762
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.pipe.PipeOperateSchemaQueueNode;
5863
import org.apache.iotdb.db.schemaengine.SchemaEngine;
@@ -63,10 +68,10 @@
6368
import org.apache.iotdb.metrics.utils.SystemMetric;
6469
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatResp;
6570
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
66-
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
6771
import org.apache.iotdb.mpp.rpc.thrift.TPushPipeMetaRespExceptionMessage;
6872
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters;
6973
import org.apache.iotdb.pipe.api.exception.PipeException;
74+
import org.apache.iotdb.rpc.TSStatusCode;
7075

7176
import com.google.common.collect.ImmutableMap;
7277
import org.apache.thrift.TException;
@@ -322,13 +327,12 @@ private void closeSchemaRegionListeningQueueIfNecessary(
322327

323328
@Override
324329
protected void thawRate(final String pipeName, final long creationTime) {
325-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
330+
PipeDataNodeSinglePipeMetrics.getInstance().thawRate(pipeName + "_" + creationTime);
326331
}
327332

328333
@Override
329334
protected void freezeRate(final String pipeName, final long creationTime) {
330-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
331-
.freezeRate(pipeName + "_" + creationTime);
335+
PipeDataNodeSinglePipeMetrics.getInstance().freezeRate(pipeName + "_" + creationTime);
332336
}
333337

334338
@Override
@@ -339,7 +343,7 @@ protected boolean dropPipe(final String pipeName, final long creationTime) {
339343

340344
final String taskId = pipeName + "_" + creationTime;
341345
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
342-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
346+
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
343347

344348
return true;
345349
}
@@ -367,7 +371,7 @@ protected boolean dropPipe(final String pipeName) {
367371
final long creationTime = pipeMeta.getStaticMeta().getCreationTime();
368372
final String taskId = pipeName + "_" + creationTime;
369373
PipeTsFileToTabletsMetrics.getInstance().deregister(taskId);
370-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance().deregister(taskId);
374+
PipeDataNodeSinglePipeMetrics.getInstance().deregister(taskId);
371375
// When the pipe contains no pipe tasks, there is no corresponding prefetching queue for the
372376
// subscribed pipe, so the subscription needs to be manually marked as completed.
373377
if (!hasPipeTasks && PipeStaticMeta.isSubscriptionPipe(pipeName)) {
@@ -461,7 +465,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
461465

462466
final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
463467
final Pair<Long, Double> remainingEventAndTime =
464-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
468+
PipeDataNodeSinglePipeMetrics.getInstance()
465469
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
466470
pipeCompletedList.add(isCompleted);
467471
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -491,7 +495,7 @@ private void collectPipeMetaListInternal(final TDataNodeHeartbeatResp resp) thro
491495
protected void collectPipeMetaListInternal(
492496
final TPipeHeartbeatReq req, final TPipeHeartbeatResp resp) throws TException {
493497
// Do nothing if data node is removing or removed, or request does not need pipe meta list
494-
if (PipeDataNodeAgent.runtime().isShutdown()) {
498+
if (PipeDataNodeAgent.runtime().isShutdown() && req.heartbeatId != Long.MIN_VALUE) {
495499
return;
496500
}
497501
LOGGER.info("Received pipe heartbeat request {} from config node.", req.heartbeatId);
@@ -533,7 +537,7 @@ protected void collectPipeMetaListInternal(
533537

534538
final boolean isCompleted = isAllDataRegionCompleted && includeDataAndNeedDrop;
535539
final Pair<Long, Double> remainingEventAndTime =
536-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
540+
PipeDataNodeSinglePipeMetrics.getInstance()
537541
.getRemainingEventAndTime(staticMeta.getPipeName(), staticMeta.getCreationTime());
538542
pipeCompletedList.add(isCompleted);
539543
pipeRemainingEventCountList.add(remainingEventAndTime.getLeft());
@@ -842,7 +846,29 @@ private boolean isSnapshotMode(final PipeParameters parameters) {
842846

843847
///////////////////////// Shutdown Logic /////////////////////////
844848

845-
public void persistAllProgressIndexLocally() {
849+
public void persistAllProgressIndex() {
850+
persistAllProgressIndexLocally();
851+
persistAllProgressIndex2ConfigNode();
852+
}
853+
854+
private void persistAllProgressIndex2ConfigNode() {
855+
try (final ConfigNodeClient configNodeClient =
856+
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
857+
// Send request to some API server
858+
final TPipeHeartbeatResp resp = new TPipeHeartbeatResp();
859+
collectPipeMetaList(new TPipeHeartbeatReq(Long.MIN_VALUE), resp);
860+
final TSStatus result =
861+
configNodeClient.pushHeartbeat(
862+
IoTDBDescriptor.getInstance().getConfig().getDataNodeId(), resp);
863+
if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != result.getCode()) {
864+
LOGGER.warn("Failed to persist progress index to configNode, status: {}", result);
865+
}
866+
} catch (final Exception e) {
867+
LOGGER.warn(e.getMessage());
868+
}
869+
}
870+
871+
private void persistAllProgressIndexLocally() {
846872
if (!PipeConfig.getInstance().isPipeProgressIndexPersistEnabled()) {
847873
LOGGER.info(
848874
"Pipe progress index persist disabled. Skipping persist all progress index locally.");

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/processor/PipeProcessorSubtask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.iotdb.db.pipe.event.UserDefinedEnrichedEvent;
3434
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
3535
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
36-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
36+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3737
import org.apache.iotdb.db.pipe.metric.processor.PipeProcessorMetrics;
3838
import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
3939
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -167,7 +167,7 @@ protected boolean executeOnce() throws Exception {
167167
pipeProcessor.process((TsFileInsertionEvent) event, outputEventCollector);
168168
}
169169
PipeProcessorMetrics.getInstance().markTsFileEvent(taskID);
170-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
170+
PipeDataNodeSinglePipeMetrics.getInstance()
171171
.markTsFileCollectInvocationCount(
172172
pipeNameWithCreationTime, outputEventCollector.getCollectInvocationCount());
173173
} else if (event instanceof PipeHeartbeatEvent) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/heartbeat/PipeHeartbeatEvent.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
2727
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
2828
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
29-
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
29+
import org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeSinglePipeMetrics;
3030
import org.apache.iotdb.db.pipe.metric.overview.PipeHeartbeatEventMetrics;
3131
import org.apache.iotdb.db.utils.DateTimeUtils;
3232
import org.apache.iotdb.pipe.api.event.Event;
@@ -93,7 +93,7 @@ public PipeHeartbeatEvent(
9393
@Override
9494
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
9595
if (Objects.nonNull(pipeName)) {
96-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
96+
PipeDataNodeSinglePipeMetrics.getInstance()
9797
.increaseHeartbeatEventCount(pipeName, creationTime);
9898
}
9999
return true;
@@ -104,7 +104,7 @@ public boolean internallyDecreaseResourceReferenceCount(final String holderMessa
104104
// PipeName == null indicates that the event is the raw event at disruptor,
105105
// not the event copied and passed to the extractor
106106
if (Objects.nonNull(pipeName)) {
107-
PipeDataNodeRemainingEventAndTimeMetrics.getInstance()
107+
PipeDataNodeSinglePipeMetrics.getInstance()
108108
.decreaseHeartbeatEventCount(pipeName, creationTime);
109109
if (shouldPrintMessage && LOGGER.isDebugEnabled()) {
110110
LOGGER.debug(this.toString());

0 commit comments

Comments
 (0)