Skip to content

Commit f48426c

Browse files
committed
fix ut
Signed-off-by: gengjun-git <gengjun@starrocks.com>
1 parent e9c0ac9 commit f48426c

File tree

4 files changed

+73
-32
lines changed

4 files changed

+73
-32
lines changed

fe/fe-core/src/main/java/com/starrocks/transaction/TransactionStateBatch.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,12 @@ public void afterVisible(TransactionStatus transactionStatus, boolean txnOperate
106106
.getCallbackFactory().getCallback(callbackId);
107107
if (callback != null) {
108108
if (txnOperated && Objects.requireNonNull(transactionStatus) == TransactionStatus.VISIBLE) {
109-
callback.afterVisible(transactionState);
109+
try {
110+
callback.afterVisible(transactionState);
111+
} catch (Throwable t) {
112+
LOG.warn("afterVisible callback failed for txn {}, callbackId {}",
113+
transactionState.getTransactionId(), callbackId, t);
114+
}
110115
}
111116
}
112117
}

fe/fe-core/src/test/java/com/starrocks/load/DeleteHandlerTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import com.starrocks.common.util.concurrent.lock.Locker;
3636
import com.starrocks.common.util.concurrent.lock.NotSupportLockException;
3737
import com.starrocks.load.DeleteJob.DeleteState;
38+
import com.starrocks.load.MultiDeleteInfo;
3839
import com.starrocks.persist.EditLog;
3940
import com.starrocks.qe.ConnectContext;
4041
import com.starrocks.qe.QueryStateException;
@@ -139,7 +140,18 @@ public void logSaveTransactionId(long transactionId) {
139140
}
140141

141142
@Mock
142-
public void logInsertTransactionState(TransactionState transactionState) {
143+
public void logInsertTransactionState(TransactionState transactionState,
144+
com.starrocks.persist.WALApplier walApplier) {
145+
if (walApplier != null) {
146+
walApplier.apply(transactionState);
147+
}
148+
}
149+
150+
@Mock
151+
public void logFinishMultiDelete(MultiDeleteInfo info, com.starrocks.persist.WALApplier walApplier) {
152+
if (walApplier != null) {
153+
walApplier.apply(info);
154+
}
143155
}
144156
};
145157

fe/fe-core/src/test/java/com/starrocks/transaction/DatabaseTransactionMgrTest.java

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@
9494
import java.util.Map;
9595
import java.util.concurrent.CountDownLatch;
9696
import java.util.concurrent.TimeUnit;
97+
import java.util.concurrent.atomic.AtomicBoolean;
9798
import java.util.concurrent.atomic.AtomicInteger;
9899

99100
import static org.hamcrest.CoreMatchers.containsString;
@@ -1269,28 +1270,27 @@ public boolean isCloudNativeTableOrMaterializedView() {
12691270
}
12701271
};
12711272

1272-
class InspectPersistedBatch extends TransactionStateBatch {
1273-
private boolean persistedWhenAfterVisible = false;
1274-
1275-
InspectPersistedBatch(List<TransactionState> transactionStates) {
1276-
super(transactionStates);
1277-
}
1278-
1273+
// With COW, the original stateBatch is copied internally, so we verify via callbacks
1274+
// that the transaction is persisted before afterVisible is called.
1275+
AtomicBoolean persistedWhenAfterVisible = new AtomicBoolean(false);
1276+
long checkCallbackId = 99999L;
1277+
states.get(0).addCallbackId(checkCallbackId);
1278+
masterTransMgr.getCallbackFactory().addCallback(new AbstractTxnStateChangeCallback() {
12791279
@Override
1280-
public void afterVisible(TransactionStatus transactionStatus, boolean txnOperated) {
1281-
long txnId = getTransactionStates().get(0).getTransactionId();
1282-
persistedWhenAfterVisible = fakeEditLog.getTransaction(txnId) != null;
1283-
super.afterVisible(transactionStatus, txnOperated);
1280+
public long getId() {
1281+
return checkCallbackId;
12841282
}
12851283

1286-
boolean isPersistedWhenAfterVisible() {
1287-
return persistedWhenAfterVisible;
1284+
@Override
1285+
public void afterVisible(TransactionState txnState) {
1286+
persistedWhenAfterVisible.set(
1287+
fakeEditLog.getTransaction(txnState.getTransactionId()) != null);
12881288
}
1289-
}
1289+
});
12901290

1291-
InspectPersistedBatch stateBatch = new InspectPersistedBatch(states);
1291+
TransactionStateBatch stateBatch = new TransactionStateBatch(states);
12921292
masterTransMgr.finishTransactionBatch(GlobalStateMgrTestUtil.testDbId1, stateBatch, null);
1293-
assertTrue(stateBatch.isPersistedWhenAfterVisible());
1293+
assertTrue(persistedWhenAfterVisible.get());
12941294
}
12951295

12961296
@Test
@@ -1306,21 +1306,25 @@ public boolean isCloudNativeTableOrMaterializedView() {
13061306
}
13071307
};
13081308

1309-
class ThrowingBatch extends TransactionStateBatch {
1310-
ThrowingBatch(List<TransactionState> transactionStates) {
1311-
super(transactionStates);
1309+
// Register a throwing callback to verify exception is swallowed
1310+
long throwCallbackId = 99998L;
1311+
states.get(0).addCallbackId(throwCallbackId);
1312+
masterTransMgr.getCallbackFactory().addCallback(new AbstractTxnStateChangeCallback() {
1313+
@Override
1314+
public long getId() {
1315+
return throwCallbackId;
13121316
}
13131317

13141318
@Override
1315-
public void afterVisible(TransactionStatus transactionStatus, boolean txnOperated) {
1319+
public void afterVisible(TransactionState txnState) {
13161320
throw new RuntimeException("mock afterVisible failure");
13171321
}
1318-
}
1322+
});
13191323

1320-
TransactionStateBatch stateBatch = new ThrowingBatch(states);
1321-
Assertions.assertDoesNotThrow(
1324+
TransactionStateBatch stateBatch = new TransactionStateBatch(states);
1325+
TransactionStateBatch result = Assertions.assertDoesNotThrow(
13221326
() -> masterTransMgr.finishTransactionBatch(GlobalStateMgrTestUtil.testDbId1, stateBatch, null));
1323-
for (TransactionState state : states) {
1327+
for (TransactionState state : result.getTransactionStates()) {
13241328
assertEquals(TransactionStatus.VISIBLE, state.getTransactionStatus());
13251329
assertNotNull(fakeEditLog.getTransaction(state.getTransactionId()));
13261330
}
@@ -1350,11 +1354,11 @@ public boolean isCloudNativeTableOrMaterializedView() {
13501354
};
13511355

13521356
TransactionStateBatch stateBatch = new TransactionStateBatch(states);
1353-
Assertions.assertDoesNotThrow(
1357+
TransactionStateBatch result = Assertions.assertDoesNotThrow(
13541358
() -> masterTransMgr.finishTransactionBatch(GlobalStateMgrTestUtil.testDbId1, stateBatch, null));
13551359

13561360
assertEquals(2, callbackInvokeCount.get());
1357-
for (TransactionState state : states) {
1361+
for (TransactionState state : result.getTransactionStates()) {
13581362
assertEquals(TransactionStatus.VISIBLE, state.getTransactionStatus());
13591363
}
13601364
}

fe/fe-core/src/test/java/com/starrocks/transaction/LakePublishBatchTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,12 +108,22 @@ private void generateSimpleTabletCommitInfo(Database db, Table table,
108108
}
109109
}
110110

111-
private void waitTransactionDone(TransactionState transaction) throws InterruptedException {
112-
while (!transaction.getTransactionStatus().isFinalStatus()) {
113-
LOG.warn("transaction {} is running. state: {}", transaction.getTransactionId(), transaction.getTransactionStatus());
111+
private void waitTransactionDone(TransactionState transaction) throws Exception {
112+
long dbId = transaction.getDbId();
113+
long txnId = transaction.getTransactionId();
114+
GlobalTransactionMgr globalTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr();
115+
while (true) {
116+
// Re-fetch from the map each iteration, because COW may have replaced the object
117+
TransactionState current = globalTransactionMgr.getDatabaseTransactionMgr(dbId)
118+
.getTransactionState(txnId);
119+
if (current != null && current.getTransactionStatus().isFinalStatus()) {
120+
LOG.warn("transaction {} is done. state: {}", txnId, current.getTransactionStatus());
121+
break;
122+
}
123+
LOG.warn("transaction {} is running. state: {}", txnId,
124+
current != null ? current.getTransactionStatus() : "null");
114125
Thread.sleep(200);
115126
}
116-
LOG.warn("transaction {} is done. state: {}", transaction.getTransactionId(), transaction.getTransactionStatus());
117127
}
118128

119129
@BeforeAll
@@ -355,6 +365,11 @@ public Database getDb(long dbId) {
355365
waitTransactionDone(transactionState1);
356366
waitTransactionDone(transactionState2);
357367

368+
// Re-fetch after waiting because COW may have replaced the objects in the map
369+
transactionState1 = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()).
370+
getTransactionState(transactionId5);
371+
transactionState2 = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()).
372+
getTransactionState(transactionId6);
358373
assertEquals(transactionState1.getTransactionStatus(), TransactionStatus.ABORTED);
359374
assertEquals(transactionState2.getTransactionStatus(), TransactionStatus.ABORTED);
360375
}
@@ -413,6 +428,11 @@ public Table getTable(String tableName) {
413428
waitTransactionDone(transactionState1);
414429
waitTransactionDone(transactionState2);
415430

431+
// Re-fetch after waiting because COW may have replaced the objects in the map
432+
transactionState1 = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()).
433+
getTransactionState(transactionId7);
434+
transactionState2 = globalTransactionMgr.getDatabaseTransactionMgr(db.getId()).
435+
getTransactionState(transactionId8);
416436
assertEquals(transactionState1.getTransactionStatus(), TransactionStatus.VISIBLE);
417437
assertEquals(transactionState2.getTransactionStatus(), TransactionStatus.VISIBLE);
418438
}

0 commit comments

Comments
 (0)