Skip to content

Commit 9b0e48f

Browse files
authored
Pipe: Fixed the shouldMarkAsPipeRequest for CreateTable and AlterLogicalView Sync (apache#16619)
* unwebbed * fix
1 parent 656b4fc commit 9b0e48f

File tree

4 files changed

+16
-9
lines changed

4 files changed

+16
-9
lines changed

iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/receiver/protocol/IoTDBConfigNodeReceiver.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,7 +654,7 @@ private TSStatus executePlan(final ConfigPhysicalPlan plan) throws ConsensusExce
654654
case DeleteTriggerInTable:
655655
return configManager.dropTrigger(
656656
new TDropTriggerReq(((DeleteTriggerInTablePlan) plan).getTriggerName())
657-
.setIsGeneratedByPipe(true));
657+
.setIsGeneratedByPipe(shouldMarkAsPipeRequest.get()));
658658
case SetTTL:
659659
return ((SetTTLPlan) plan).getTTL() == TTLCache.NULL_TTL
660660
? configManager
@@ -664,7 +664,8 @@ private TSStatus executePlan(final ConfigPhysicalPlan plan) throws ConsensusExce
664664
.getTTLManager()
665665
.setTTL((SetTTLPlan) plan, shouldMarkAsPipeRequest.get());
666666
case PipeCreateTableOrView:
667-
return executeIdempotentCreateTableOrView((PipeCreateTableOrViewPlan) plan, queryId);
667+
return executeIdempotentCreateTableOrView(
668+
(PipeCreateTableOrViewPlan) plan, queryId, shouldMarkAsPipeRequest.get());
668669
case AddTableColumn:
669670
return configManager
670671
.getProcedureManager()
@@ -941,7 +942,10 @@ private TSStatus executePlan(final ConfigPhysicalPlan plan) throws ConsensusExce
941942
}
942943

943944
private TSStatus executeIdempotentCreateTableOrView(
944-
final PipeCreateTableOrViewPlan plan, final String queryId) throws ConsensusException {
945+
final PipeCreateTableOrViewPlan plan,
946+
final String queryId,
947+
final boolean shouldMarkAsPipeRequest)
948+
throws ConsensusException {
945949
final String database = plan.getDatabase();
946950
final TsTable table = plan.getTable();
947951
final boolean isView = TreeViewSchema.isTreeViewTable(table);
@@ -957,8 +961,8 @@ private TSStatus executeIdempotentCreateTableOrView(
957961
? ProcedureType.CREATE_TABLE_VIEW_PROCEDURE
958962
: ProcedureType.CREATE_TABLE_PROCEDURE,
959963
isView
960-
? new CreateTableViewProcedure(database, table, true, true)
961-
: new CreateTableProcedure(database, table, true));
964+
? new CreateTableViewProcedure(database, table, true, shouldMarkAsPipeRequest)
965+
: new CreateTableProcedure(database, table, shouldMarkAsPipeRequest));
962966
// Note that the view and its column won't be auto created
963967
// Skip it to avoid affecting the existing base table
964968
if (!isView && result.getCode() == TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/thrift/IoTDBDataNodeReceiver.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -685,7 +685,8 @@ private TPipeTransferResp handleTransferSchemaPlan(final PipeTransferPlanNodeReq
685685
}
686686
return new TPipeTransferResp(
687687
ClusterConfigTaskExecutor.getInstance()
688-
.alterLogicalViewByPipe((AlterLogicalViewNode) req.getPlanNode()));
688+
.alterLogicalViewByPipe(
689+
(AlterLogicalViewNode) req.getPlanNode(), shouldMarkAsPipeRequest.get()));
689690
}
690691
final Object statement = PLAN_TO_STATEMENT_VISITOR.process(req.getPlanNode(), null);
691692
return statement instanceof Statement

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3018,7 +3018,8 @@ public SettableFuture<ConfigTaskResult> alterLogicalView(
30183018
}
30193019

30203020
@Override
3021-
public TSStatus alterLogicalViewByPipe(final AlterLogicalViewNode alterLogicalViewNode) {
3021+
public TSStatus alterLogicalViewByPipe(
3022+
final AlterLogicalViewNode alterLogicalViewNode, final boolean shouldMarkAsPipeRequest) {
30223023
final Map<PartialPath, ViewExpression> viewPathToSourceMap =
30233024
alterLogicalViewNode.getViewPathToSourceMap();
30243025

@@ -3037,7 +3038,7 @@ public TSStatus alterLogicalViewByPipe(final AlterLogicalViewNode alterLogicalVi
30373038
new TAlterLogicalViewReq(
30383039
Coordinator.getInstance().createQueryId().getId(),
30393040
ByteBuffer.wrap(stream.toByteArray()))
3040-
.setIsGeneratedByPipe(true);
3041+
.setIsGeneratedByPipe(shouldMarkAsPipeRequest);
30413042
TSStatus tsStatus;
30423043
try (final ConfigNodeClient client =
30433044
CLUSTER_DELETION_CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,8 @@ SettableFuture<ConfigTaskResult> renameLogicalView(
250250
SettableFuture<ConfigTaskResult> alterLogicalView(
251251
AlterLogicalViewStatement alterLogicalViewStatement, MPPQueryContext context);
252252

253-
TSStatus alterLogicalViewByPipe(AlterLogicalViewNode alterLogicalViewNode);
253+
TSStatus alterLogicalViewByPipe(
254+
AlterLogicalViewNode alterLogicalViewNode, boolean shouldMarkAsPipeRequest);
254255

255256
SettableFuture<ConfigTaskResult> getRegionId(GetRegionIdStatement getRegionIdStatement);
256257

0 commit comments

Comments
 (0)