diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 8a485a9c053e7b..2bb8d56e3a9915 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -450,6 +450,7 @@ supportedShowStatement supportedLoadStatement : SYNC #sync + | SYNC TABLE table=multipartIdentifier #syncTable | SHOW CREATE LOAD FOR label=multipartIdentifier #showCreateLoad | createRoutineLoad #createRoutineLoadAlias | LOAD mysqlDataDesc diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index c75b870d91ec95..80d861330b7c13 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -287,6 +287,7 @@ import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr; import org.apache.doris.transaction.GlobalTransactionMgrIface; import org.apache.doris.transaction.PublishVersionDaemon; +import org.apache.doris.transaction.TransactionState; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -7204,6 +7205,46 @@ public void compactTable(String dbName, String tableName, String type, List> txnIdToTableIds = txnMgr.getDbRunningTransInfo(dbId); + List tableTxnIds = new ArrayList<>(); + for (Map.Entry> entry : txnIdToTableIds.entrySet()) { + if (entry.getValue().contains(tableId)) { + tableTxnIds.add(entry.getKey()); + } + } + + for (Long txnId : tableTxnIds) { + TransactionState txnState = txnMgr.getTransactionState(dbId, txnId); + if (txnState == null) { + continue; + } + String label = txnState.getLabel(); + if (label != null && label.startsWith("group_commit")) { + // wait group commit transaction + while (!txnState.getTransactionStatus().isFinalStatus()) { + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + LOG.warn("failed to wait for table={} to finish transaction={} when schema change", + tableId, txnId, ie); + } + } + } + } + } + private static void addTableComment(TableIf table, StringBuilder sb) { if (StringUtils.isNotBlank(table.getComment())) { sb.append("\nCOMMENT '").append(table.getComment(true)).append("'"); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 5c0a4ca2a53533..fc2564982d2520 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -861,6 +861,7 @@ import org.apache.doris.nereids.trees.plans.commands.ShowWorkloadGroupsCommand; import org.apache.doris.nereids.trees.plans.commands.StartTransactionCommand; import org.apache.doris.nereids.trees.plans.commands.SyncCommand; +import org.apache.doris.nereids.trees.plans.commands.SyncTableCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionBeginCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionCommitCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionRollbackCommand; @@ -6831,6 +6832,12 @@ public LogicalPlan visitSync(SyncContext ctx) { return new SyncCommand(); } + @Override + public Object visitSyncTable(DorisParser.SyncTableContext ctx) { + List table = visitMultipartIdentifier(ctx.table); + return new SyncTableCommand(new TableNameInfo(table)); + } + @Override public LogicalPlan visitShowDelete(ShowDeleteContext ctx) { String dbName = null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 4274e8ca5b940c..63903eb921ba99 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -339,6 +339,7 @@ public enum PlanType { SHOW_CONVERT_LSC_COMMAND, SHOW_DICTIONARIES_COMMAND, SYNC_COMMAND, + SYNC_TABLE_COMMAND, RECOVER_DATABASE_COMMAND, RECOVER_TABLE_COMMAND, RECOVER_PARTITION_COMMAND, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/SyncTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/SyncTableCommand.java new file mode 100644 index 00000000000000..1e14bf93ef02b5 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/SyncTableCommand.java @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.commands; + +import org.apache.doris.analysis.RedirectStatus; +import org.apache.doris.analysis.StmtType; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.util.Util; +import org.apache.doris.info.TableNameInfo; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.StmtExecutor; + +/** + * sync table command + */ +public class SyncTableCommand extends Command implements Redirect { + private TableNameInfo tableNameInfo; + + public SyncTableCommand(TableNameInfo table) { + super(PlanType.SYNC_TABLE_COMMAND); + this.tableNameInfo = table; + } + + @Override + public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { + System.out.println("sync table " + tableNameInfo); + tableNameInfo.analyze(ctx); + + String catalogName = tableNameInfo.getCtl(); + Util.prohibitExternalCatalog(catalogName, this.getClass().getSimpleName()); + + String dbName = tableNameInfo.getDb(); + String tblName = tableNameInfo.getTbl(); + + Env.getCurrentEnv().syncTable(dbName, tblName); + } + + @Override + public R accept(PlanVisitor visitor, C context) { + return visitor.visitSyncTableCommand(this, context); + } + + @Override + public RedirectStatus toRedirectStatus() { + return RedirectStatus.FORWARD_WITH_SYNC; + } + + @Override + public StmtType stmtType() { + return StmtType.SYNC; + } + + public TableNameInfo getTableNameInfo() { + return tableNameInfo; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java index 63072f34efeff5..721aa1a1bdab71 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/CommandVisitor.java @@ -273,6 +273,7 @@ import org.apache.doris.nereids.trees.plans.commands.ShowWorkloadGroupsCommand; import org.apache.doris.nereids.trees.plans.commands.StartTransactionCommand; import org.apache.doris.nereids.trees.plans.commands.SyncCommand; +import org.apache.doris.nereids.trees.plans.commands.SyncTableCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionBeginCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionCommitCommand; import org.apache.doris.nereids.trees.plans.commands.TransactionRollbackCommand; @@ -993,6 +994,10 @@ default R visitSyncCommand(SyncCommand syncCommand, C context) { return visitCommand(syncCommand, context); } + default R visitSyncTableCommand(SyncTableCommand syncTableCommand, C context) { + return visitCommand(syncTableCommand, context); + } + default R visitShowEventsCommand(ShowEventsCommand showEventsCommand, C context) { return visitCommand(showEventsCommand, context); } diff --git a/regression-test/data/query_p0/test_sync_table.out b/regression-test/data/query_p0/test_sync_table.out new file mode 100644 index 00000000000000..10a50ab0ab7ba6 --- /dev/null +++ b/regression-test/data/query_p0/test_sync_table.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select1 -- +0 + +-- !select2 -- +2 + diff --git a/regression-test/suites/query_p0/test_sync_table.groovy b/regression-test/suites/query_p0/test_sync_table.groovy new file mode 100644 index 00000000000000..72f9f214c5ce73 --- /dev/null +++ b/regression-test/suites/query_p0/test_sync_table.groovy @@ -0,0 +1,46 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_sync_table") { + def tableName = "test_sync_table_tbl" + + sql """DROP TABLE IF EXISTS ${tableName}""" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `id` INT, + `name` STRING + ) ENGINE=OLAP + UNIQUE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES( + "replication_num" = "1", + "group_commit_interval_ms" = "10000" -- 10s + ); + """ + + sql """set group_commit = async_mode;""" + + sql """INSERT INTO ${tableName} VALUES (1, "tom"), (2, "jerry");""" + + // cannot view data begin + qt_select1 """SELECT COUNT(*) FROM ${tableName};""" + + // run sync table + sql """SYNC TABLE ${tableName};""" + + // can view data + qt_select2 """SELECT COUNT(*) FROM ${tableName};""" +}