|
287 | 287 | import org.apache.doris.transaction.GlobalExternalTransactionInfoMgr; |
288 | 288 | import org.apache.doris.transaction.GlobalTransactionMgrIface; |
289 | 289 | import org.apache.doris.transaction.PublishVersionDaemon; |
| 290 | +import org.apache.doris.transaction.TransactionState; |
290 | 291 |
|
291 | 292 | import com.google.common.base.Joiner; |
292 | 293 | import com.google.common.base.Preconditions; |
@@ -7205,13 +7206,39 @@ public void compactTable(String dbName, String tableName, String type, List<Stri |
7205 | 7206 | } |
7206 | 7207 |
|
7207 | 7208 | // sync table |
7208 | | - public void syncTable(String dbName, String tableName) throws DdlException { |
7209 | | - // wait for group commit data visible |
| 7209 | + public void syncTable(String dbName, String tableName) throws DdlException, AnalysisException { |
7210 | 7210 | Database db = getInternalCatalog().getDbOrDdlException(dbName); |
7211 | 7211 | Table table = db.getTableOrDdlException(tableName); |
7212 | | - GroupCommitManager groupCommitManager = getGroupCommitManager(); |
| 7212 | + long dbId = db.getId(); |
7213 | 7213 | long tableId = table.getId(); |
7214 | | - groupCommitManager.waitWalFinished(tableId); |
| 7214 | + GlobalTransactionMgrIface txnMgr = Env.getCurrentGlobalTransactionMgr(); |
| 7215 | + // get table transaction ids |
| 7216 | + Map<Long, List<Long>> txnIdToTableIds = txnMgr.getDbRunningTransInfo(dbId); |
| 7217 | + List<Long> tableTxnIds = new ArrayList<>(); |
| 7218 | + for (Map.Entry<Long, List<Long>> entry : txnIdToTableIds.entrySet()) { |
| 7219 | + if (entry.getValue().contains(tableId)) { |
| 7220 | + tableTxnIds.add(entry.getKey()); |
| 7221 | + } |
| 7222 | + } |
| 7223 | + |
| 7224 | + for (Long txnId : tableTxnIds) { |
| 7225 | + TransactionState txnState = txnMgr.getTransactionState(dbId, txnId); |
| 7226 | + if (txnState == null) { |
| 7227 | + continue; |
| 7228 | + } |
| 7229 | + String label = txnState.getLabel(); |
| 7230 | + if (label != null && label.startsWith("group_commit")) { |
| 7231 | + // wait group commit transaction |
| 7232 | + while (!txnState.getTransactionStatus().isFinalStatus()) { |
| 7233 | + try { |
| 7234 | + Thread.sleep(100); |
| 7235 | + } catch (InterruptedException ie) { |
| 7236 | + LOG.warn("failed to wait for table={} to finish transaction={} when schema change", |
| 7237 | + tableId, txnId, ie); |
| 7238 | + } |
| 7239 | + } |
| 7240 | + } |
| 7241 | + } |
7215 | 7242 | } |
7216 | 7243 |
|
7217 | 7244 | private static void addTableComment(TableIf table, StringBuilder sb) { |
|
0 commit comments