Skip to content

Commit 9937d22

Browse files
authored
[improve][broker] Use atomic counter for ongoing transaction count (#25053)
1 parent f9bb2e4 commit 9937d22

File tree

3 files changed

+40
-36
lines changed

3 files changed

+40
-36
lines changed

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMetadataStoreServiceTest.java

Lines changed: 18 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.concurrent.ExecutionException;
3232
import java.util.concurrent.TimeUnit;
3333
import java.util.concurrent.TimeoutException;
34+
import java.util.concurrent.atomic.LongAdder;
3435
import org.apache.bookkeeper.mledger.Position;
3536
import org.apache.commons.lang3.tuple.Pair;
3637
import org.apache.pulsar.broker.ServiceConfiguration;
@@ -173,10 +174,7 @@ public void testTimeoutTracker() throws Exception {
173174
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
174175
.getStores().get(TransactionCoordinatorID.get(0));
175176
checkTransactionMetadataStoreReady(transactionMetadataStore);
176-
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
177-
field.setAccessible(true);
178-
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
179-
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
177+
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = transactionMetadataStore.getTxnMetaMap();
180178
int i = -1;
181179
while (++i < 1000) {
182180
try {
@@ -189,7 +187,7 @@ public void testTimeoutTracker() throws Exception {
189187
txnMap.forEach((txnID, txnMetaListPair) ->
190188
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
191189
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
192-
.until(() -> txnMap.size() == 0);
190+
.until(() -> transactionMetadataStore.getOnGoingTxnCount().intValue() == 0);
193191
}
194192

195193
private TxnID newTransactionWithTimeoutOf(long timeout)
@@ -209,25 +207,23 @@ public void testTimeoutTrackerExpired() throws Exception {
209207
(MLTransactionMetadataStore) pulsar.getTransactionMetadataStoreService()
210208
.getStores().get(TransactionCoordinatorID.get(0));
211209
checkTransactionMetadataStoreReady(transactionMetadataStore);
212-
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
213-
field.setAccessible(true);
214-
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
215-
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
210+
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap = transactionMetadataStore.getTxnMetaMap();
211+
LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount();
216212

217213
newTransactionWithTimeoutOf(2000);
218214

219-
assertEquals(txnMap.size(), 1);
215+
assertEquals(onGoingTxtCount.intValue(), 1);
220216

221217
txnMap.forEach((txnID, txnMetaListPair) ->
222218
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
223-
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
219+
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> onGoingTxtCount.intValue() == 0);
224220

225221
newTransactionWithTimeoutOf(2000);
226-
assertEquals(txnMap.size(), 1);
222+
assertEquals(onGoingTxtCount.intValue(), 1);
227223

228224
txnMap.forEach((txnID, txnMetaListPair) ->
229225
Assert.assertEquals(txnMetaListPair.getLeft().status(), TxnStatus.OPEN));
230-
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> txnMap.size() == 0);
226+
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS).until(() -> onGoingTxtCount.intValue() == 0);
231227
}
232228

233229
@Test
@@ -241,10 +237,7 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
241237
.getStores().get(TransactionCoordinatorID.get(0));
242238

243239
checkTransactionMetadataStoreReady(transactionMetadataStore);
244-
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
245-
field.setAccessible(true);
246-
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
247-
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
240+
LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount();
248241
new Thread(() -> {
249242
int i = -1;
250243
while (++i < 100) {
@@ -289,15 +282,15 @@ public void testTimeoutTrackerMultiThreading() throws Exception {
289282
}
290283
}).start();
291284

292-
checkoutTimeout(txnMap, 300);
293-
checkoutTimeout(txnMap, 200);
294-
checkoutTimeout(txnMap, 100);
295-
checkoutTimeout(txnMap, 0);
285+
checkoutTimeout(onGoingTxtCount, 300);
286+
checkoutTimeout(onGoingTxtCount, 200);
287+
checkoutTimeout(onGoingTxtCount, 100);
288+
checkoutTimeout(onGoingTxtCount, 0);
296289
}
297290

298-
private void checkoutTimeout(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap, int time) {
291+
private void checkoutTimeout(LongAdder onGoingTxtCount, int time) {
299292
Awaitility.await().atLeast(1000, TimeUnit.MICROSECONDS)
300-
.until(() -> txnMap.size() == time);
293+
.until(() -> onGoingTxtCount.intValue() == time);
301294
}
302295

303296
@Test
@@ -326,12 +319,8 @@ public void transactionTimeoutRecoverTest() throws Exception {
326319
.getStores().get(TransactionCoordinatorID.get(0));
327320

328321
checkTransactionMetadataStoreReady(transactionMetadataStore);
329-
330-
Field field = MLTransactionMetadataStore.class.getDeclaredField("txnMetaMap");
331-
field.setAccessible(true);
332-
ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMap =
333-
(ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>>) field.get(transactionMetadataStore);
334-
Awaitility.await().until(() -> txnMap.size() == 0);
322+
LongAdder onGoingTxtCount = transactionMetadataStore.getOnGoingTxnCount();
323+
Awaitility.await().until(() -> onGoingTxtCount.intValue() == 0);
335324

336325
}
337326

pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionMetadataStore.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.util.concurrent.Executors;
3333
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3434
import java.util.concurrent.atomic.LongAdder;
35+
import lombok.Getter;
3536
import org.apache.bookkeeper.mledger.ManagedLedger;
3637
import org.apache.bookkeeper.mledger.Position;
3738
import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@ public class MLTransactionMetadataStore
7273
private final TransactionCoordinatorID tcID;
7374
private final MLTransactionLogImpl transactionLog;
7475
@VisibleForTesting
76+
@Getter
7577
final ConcurrentSkipListMap<Long, Pair<TxnMeta, List<Position>>> txnMetaMap = new ConcurrentSkipListMap<>();
7678
private final TransactionTimeoutTracker timeoutTracker;
7779
private final TransactionMetadataStoreStats transactionMetadataStoreStats;
@@ -80,6 +82,9 @@ public class MLTransactionMetadataStore
8082
private final LongAdder abortedTransactionCount;
8183
private final LongAdder transactionTimeoutCount;
8284
private final LongAdder appendLogCount;
85+
@Getter
86+
@VisibleForTesting
87+
private final LongAdder onGoingTxnCount;
8388
private final MLTransactionSequenceIdGenerator sequenceIdGenerator;
8489
private final ExecutorService internalPinnedExecutor;
8590
public final RecoverTimeRecord recoverTime = new RecoverTimeRecord();
@@ -108,6 +113,7 @@ public MLTransactionMetadataStore(TransactionCoordinatorID tcID,
108113
this.abortedTransactionCount = new LongAdder();
109114
this.transactionTimeoutCount = new LongAdder();
110115
this.appendLogCount = new LongAdder();
116+
this.onGoingTxnCount = new LongAdder();
111117
DefaultThreadFactory threadFactory = new DefaultThreadFactory("transaction_coordinator_"
112118
+ tcID.toString() + "thread_factory");
113119
this.internalPinnedExecutor = Executors.newSingleThreadScheduledExecutor(threadFactory);
@@ -162,6 +168,7 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran
162168
final TxnMetaImpl left = new TxnMetaImpl(txnID,
163169
openTimestamp, timeoutAt, owner);
164170
txnMetaMap.put(transactionId, MutablePair.of(left, positions));
171+
onGoingTxnCount.increment();
165172
recoverTracker.handleOpenStatusTransaction(txnSequenceId,
166173
timeoutAt + openTimestamp);
167174
}
@@ -197,8 +204,12 @@ public void handleMetadataEntry(Position position, TransactionMetadataEntry tran
197204
recoverTracker.updateTransactionStatus(txnID.getLeastSigBits(), newStatus);
198205
if (newStatus == TxnStatus.COMMITTED || newStatus == TxnStatus.ABORTED) {
199206
transactionLog.deletePosition(txnMetaMap
200-
.get(transactionId).getRight()).thenAccept(v ->
201-
txnMetaMap.remove(transactionId).getLeft());
207+
.get(transactionId).getRight()).thenAccept(v -> {
208+
if (txnMetaMap.remove(transactionId) != null) {
209+
onGoingTxnCount.decrement();
210+
}
211+
}
212+
);
202213
}
203214
}
204215
break;
@@ -237,7 +248,7 @@ public CompletableFuture<TxnMeta> getTxnMeta(TxnID txnID) {
237248
@Override
238249
public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) {
239250
if (this.maxActiveTransactionsPerCoordinator == 0
240-
|| this.maxActiveTransactionsPerCoordinator > txnMetaMap.size()) {
251+
|| this.maxActiveTransactionsPerCoordinator > onGoingTxnCount.longValue()) {
241252
CompletableFuture<TxnID> completableFuture = new CompletableFuture<>();
242253
FutureUtil.safeRunAsync(() -> {
243254
if (!checkIfReady()) {
@@ -276,6 +287,7 @@ public CompletableFuture<TxnID> newTransaction(long timeOut, String owner) {
276287
positions.add(position);
277288
Pair<TxnMeta, List<Position>> pair = MutablePair.of(txn, positions);
278289
txnMetaMap.put(leastSigBits, pair);
290+
onGoingTxnCount.increment();
279291
this.timeoutTracker.addTransaction(leastSigBits, timeOut);
280292
createdTransactionCount.increment();
281293
completableFuture.complete(txnID);
@@ -422,7 +434,9 @@ public CompletableFuture<Void> updateTxnStatus(TxnID txnID, TxnStatus newStatus,
422434
} else {
423435
abortedTransactionCount.increment();
424436
}
425-
txnMetaMap.remove(txnID.getLeastSigBits());
437+
if (txnMetaMap.remove(txnID.getLeastSigBits()) != null) {
438+
onGoingTxnCount.decrement();
439+
}
426440
transactionLog.deletePosition(txnMetaListPair.getRight()).exceptionally(ex -> {
427441
log.warn("Failed to delete transaction log position "
428442
+ "at end transaction [{}]", txnID);
@@ -466,7 +480,7 @@ public TransactionCoordinatorStats getCoordinatorStats() {
466480
transactionCoordinatorstats.setLowWaterMark(getLowWaterMark());
467481
transactionCoordinatorstats.setState(getState().name());
468482
transactionCoordinatorstats.setLeastSigBits(sequenceIdGenerator.getCurrentSequenceId());
469-
transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
483+
transactionCoordinatorstats.ongoingTxnSize = onGoingTxnCount.longValue();
470484
transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime();
471485
transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime();
472486
return transactionCoordinatorstats;
@@ -490,6 +504,7 @@ public CompletableFuture<Void> closeAsync() {
490504
internalPinnedExecutor.shutdown();
491505
return transactionLog.closeAsync().thenCompose(v -> {
492506
txnMetaMap.clear();
507+
onGoingTxnCount.reset();
493508
this.timeoutTracker.close();
494509
if (!this.changeToCloseState()) {
495510
return FutureUtil.failedFuture(
@@ -508,7 +523,7 @@ public CompletableFuture<Void> closeAsync() {
508523
@Override
509524
public TransactionMetadataStoreStats getMetadataStoreStats() {
510525
this.transactionMetadataStoreStats.setCoordinatorId(tcID.getId());
511-
this.transactionMetadataStoreStats.setActives(txnMetaMap.size());
526+
this.transactionMetadataStoreStats.setActives(onGoingTxnCount.intValue());
512527
this.transactionMetadataStoreStats.setCreatedCount(this.createdTransactionCount.longValue());
513528
this.transactionMetadataStoreStats.setCommittedCount(this.committedTransactionCount.longValue());
514529
this.transactionMetadataStoreStats.setAbortedCount(this.abortedTransactionCount.longValue());

pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/impl/MLTransactionLogImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ factory, new ManagedLedgerConfig(), bufferedWriterConfigForRecover, transactionT
171171
new MLTransactionMetadataStore(transactionCoordinatorID,
172172
mlTransactionLogForRecover, timeoutTracker, sequenceIdGenerator, Integer.MAX_VALUE);
173173
transactionMetadataStoreForRecover.init(recoverTracker).get(2000, TimeUnit.SECONDS);
174-
Assert.assertEquals(transactionMetadataStoreForRecover.txnMetaMap.size(), expectedMapping.size());
174+
Assert.assertEquals(transactionMetadataStoreForRecover.getOnGoingTxnCount().intValue(), expectedMapping.size());
175175
Iterator<Integer> txnIdSet = expectedMapping.keySet().iterator();
176176
while (txnIdSet.hasNext()){
177177
int txnId = txnIdSet.next();

0 commit comments

Comments
 (0)