Skip to content

Commit decacce

Browse files
[cherry-pick][back-port] Make TxnTransitMetadata.topicPartitions immutable (#89)
1 parent f0a1ceb commit decacce

File tree

7 files changed

+41
-53
lines changed

7 files changed

+41
-53
lines changed

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
2222

2323
import com.google.common.annotations.VisibleForTesting;
24+
import com.google.common.collect.ImmutableSet;
25+
import com.google.common.collect.Sets;
2426
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
2527
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
2628
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
@@ -31,8 +33,6 @@
3133
import io.streamnative.pulsar.handlers.kop.storage.PulsarPartitionedTopicProducerStateManagerSnapshotBuffer;
3234
import io.streamnative.pulsar.handlers.kop.utils.MetadataUtils;
3335
import io.streamnative.pulsar.handlers.kop.utils.ProducerIdAndEpoch;
34-
import java.util.Collections;
35-
import java.util.HashSet;
3636
import java.util.List;
3737
import java.util.Optional;
3838
import java.util.Set;
@@ -365,7 +365,7 @@ public void handleInitProducerId(String transactionalId,
365365
.producerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
366366
.lastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
367367
.state(TransactionState.EMPTY)
368-
.topicPartitions(Collections.emptySet())
368+
.topicPartitions(Sets.newHashSet())
369369
.txnLastUpdateTimestamp(time.milliseconds())
370370
.build();
371371
epochAndTxnMetaFuture.complete(txnManager.putTransactionStateIfNotExists(newMetadata));
@@ -623,7 +623,7 @@ public void handleAddPartitionsToTransaction(String transactionalId,
623623
} else {
624624
return Either.right(new EpochAndTxnTransitMetadata(
625625
coordinatorEpoch, txnMetadata.prepareAddPartitions(
626-
new HashSet<>(partitionList), time.milliseconds())));
626+
ImmutableSet.copyOf(partitionList), time.milliseconds())));
627627
}
628628
});
629629

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionLogValue.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;
1515

1616
import com.google.common.collect.Lists;
17+
import com.google.common.collect.Sets;
1718
import io.netty.buffer.ByteBuf;
1819
import java.nio.ByteBuffer;
1920
import java.util.Arrays;
20-
import java.util.Collections;
2121
import java.util.List;
2222
import java.util.stream.Collectors;
2323
import lombok.AllArgsConstructor;
@@ -159,7 +159,7 @@ public static TransactionMetadata readTxnRecordValue(String transactionalId, Byt
159159
.lastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
160160
.txnTimeoutMs(value.getTransactionTimeoutMs())
161161
.state(TransactionState.byteToState(value.getTransactionStatus()))
162-
.topicPartitions(Collections.emptySet())
162+
.topicPartitions(Sets.newHashSet())
163163
.txnStartTimestamp(value.getTransactionStartTimestampMs())
164164
.txnLastUpdateTimestamp(value.getTransactionLastUpdateTimestampMs())
165165
.build();

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ public void addTxnMarkersToBrokerQueue(String transactionalId,
331331
TransactionMetadata txnMetadata =
332332
epochAndTxnMetadata.get().getTransactionMetadata();
333333
txnMetadata.inLock(() -> {
334-
txnMetadata.removePartitions(topicPartitions);
334+
topicPartitions.forEach(txnMetadata::removePartition);
335335
return null;
336336
});
337337
maybeWriteTxnCompletion(transactionalId);

kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMetadata.java

Lines changed: 24 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@
1313
*/
1414
package io.streamnative.pulsar.handlers.kop.coordinator.transaction;
1515

16+
import com.google.common.collect.ImmutableSet;
1617
import com.google.common.collect.Sets;
1718
import io.streamnative.pulsar.handlers.kop.scala.Either;
1819
import io.streamnative.pulsar.handlers.kop.utils.CoreUtils;
19-
import java.util.Collections;
2020
import java.util.HashMap;
21-
import java.util.HashSet;
2221
import java.util.Map;
2322
import java.util.Optional;
2423
import java.util.Set;
@@ -132,7 +131,7 @@ public TxnTransitMetadata prepareTransitionTo(TransactionState newState,
132131
short newEpoch,
133132
short newLastEpoch,
134133
int newTxnTimeoutMs,
135-
Set<TopicPartition> newTopicPartitions,
134+
ImmutableSet<TopicPartition> newTopicPartitions,
136135
long newTxnStartTimestamp,
137136
long updateTimestamp) {
138137
if (pendingState.isPresent()) {
@@ -157,7 +156,7 @@ public TxnTransitMetadata prepareTransitionTo(TransactionState newState,
157156
.lastProducerEpoch(newLastEpoch)
158157
.txnTimeoutMs(newTxnTimeoutMs)
159158
.txnState(newState)
160-
.topicPartitions(Collections.unmodifiableSet(newTopicPartitions))
159+
.topicPartitions(newTopicPartitions)
161160
.txnStartTimestamp(newTxnStartTimestamp)
162161
.txnLastUpdateTimestamp(updateTimestamp).build();
163162
if (log.isDebugEnabled()) {
@@ -175,6 +174,8 @@ public TxnTransitMetadata prepareTransitionTo(TransactionState newState,
175174

176175
/**
177176
* Transaction transit metadata.
177+
*
178+
* this is a immutable object representing the target transition of the transaction metadata.
178179
*/
179180
@ToString
180181
@Builder
@@ -187,7 +188,7 @@ public static class TxnTransitMetadata {
187188
private short lastProducerEpoch;
188189
private int txnTimeoutMs;
189190
private TransactionState txnState;
190-
private Set<TopicPartition> topicPartitions;
191+
private ImmutableSet<TopicPartition> topicPartitions;
191192
private long txnStartTimestamp;
192193
private long txnLastUpdateTimestamp;
193194
}
@@ -236,7 +237,7 @@ public void completeTransitionTo(TxnTransitMetadata transitMetadata) {
236237
throwStateTransitionFailure(transitMetadata);
237238
} else {
238239
txnStartTimestamp = transitMetadata.txnStartTimestamp;
239-
addPartitions(transitMetadata.topicPartitions);
240+
topicPartitions.addAll(transitMetadata.topicPartitions);
240241
}
241242
break;
242243
case PREPARE_ABORT: // from endTxn
@@ -258,7 +259,7 @@ public void completeTransitionTo(TxnTransitMetadata transitMetadata) {
258259
throwStateTransitionFailure(transitMetadata);
259260
} else {
260261
this.txnStartTimestamp = transitMetadata.txnStartTimestamp;
261-
this.topicPartitions = Collections.emptySet();
262+
this.topicPartitions.clear();
262263
}
263264
break;
264265
case PREPARE_EPOCH_FENCE:
@@ -308,7 +309,7 @@ public TxnTransitMetadata prepareNoTransit() {
308309
// do not call transitTo as it will set the pending state,
309310
// a follow-up call to abort the transaction will set its pending state
310311
return new TxnTransitMetadata(producerId, lastProducerId, producerEpoch, lastProducerEpoch, txnTimeoutMs,
311-
state, topicPartitions, txnStartTimestamp, txnLastUpdateTimestamp);
312+
state, ImmutableSet.copyOf(topicPartitions), txnStartTimestamp, txnLastUpdateTimestamp);
312313
}
313314

314315
public TxnTransitMetadata prepareFenceProducerEpoch() {
@@ -332,7 +333,7 @@ public TxnTransitMetadata prepareFenceProducerEpoch() {
332333
bumpedEpoch,
333334
RecordBatch.NO_PRODUCER_EPOCH,
334335
txnTimeoutMs,
335-
topicPartitions,
336+
ImmutableSet.copyOf(topicPartitions),
336337
txnStartTimestamp,
337338
txnLastUpdateTimestamp);
338339
}
@@ -377,7 +378,7 @@ public Either<Errors, TxnTransitMetadata> prepareIncrementProducerEpoch(Integer
377378

378379
return errorsOrBumpEpochResult.map(bumpEpochResult -> prepareTransitionTo(TransactionState.EMPTY,
379380
producerId, bumpEpochResult.bumpedEpoch, bumpEpochResult.lastEpoch, newTxnTimeoutMs,
380-
Collections.emptySet(), -1, updateTimestamp));
381+
ImmutableSet.of(), -1, updateTimestamp));
381382
}
382383

383384
@AllArgsConstructor
@@ -400,7 +401,7 @@ public TxnTransitMetadata prepareProducerIdRotation(Long newProducerId,
400401
(short) 0,
401402
recordLastEpoch ? producerEpoch : RecordBatch.NO_PRODUCER_EPOCH,
402403
newTxnTimeoutMs,
403-
Collections.emptySet(),
404+
ImmutableSet.of(),
404405
-1,
405406
updateTimestamp);
406407
}
@@ -419,7 +420,8 @@ private boolean hasPendingTransaction() {
419420
return flag;
420421
}
421422

422-
public TxnTransitMetadata prepareAddPartitions(Set<TopicPartition> addedTopicPartitions, Long updateTimestamp) {
423+
public TxnTransitMetadata prepareAddPartitions(ImmutableSet<TopicPartition> addedTopicPartitions,
424+
Long updateTimestamp) {
423425
long newTxnStartTimestamp;
424426
switch(state) {
425427
case EMPTY:
@@ -430,18 +432,17 @@ public TxnTransitMetadata prepareAddPartitions(Set<TopicPartition> addedTopicPar
430432
default:
431433
newTxnStartTimestamp = txnStartTimestamp;
432434
}
433-
Set<TopicPartition> newPartitionSet = new HashSet<>();
434-
if (topicPartitions != null) {
435-
newPartitionSet.addAll(topicPartitions);
436-
}
437-
newPartitionSet.addAll(new HashSet<>(addedTopicPartitions));
435+
ImmutableSet<TopicPartition> partitions = ImmutableSet.<TopicPartition>builder()
436+
.addAll((topicPartitions != null) ? topicPartitions : ImmutableSet.of())
437+
.addAll(addedTopicPartitions)
438+
.build();
438439
return prepareTransitionTo(TransactionState.ONGOING, producerId, producerEpoch, lastProducerEpoch,
439-
txnTimeoutMs, newPartitionSet, newTxnStartTimestamp, updateTimestamp);
440+
txnTimeoutMs, partitions, newTxnStartTimestamp, updateTimestamp);
440441
}
441442

442443
public TxnTransitMetadata prepareAbortOrCommit(TransactionState newState, Long updateTimestamp) {
443444
return prepareTransitionTo(newState, producerId, producerEpoch, lastProducerEpoch,
444-
txnTimeoutMs, topicPartitions, txnStartTimestamp, updateTimestamp);
445+
txnTimeoutMs, ImmutableSet.copyOf(topicPartitions), txnStartTimestamp, updateTimestamp);
445446
}
446447

447448
public TxnTransitMetadata prepareComplete(Long updateTimestamp) {
@@ -455,12 +456,12 @@ public TxnTransitMetadata prepareComplete(Long updateTimestamp) {
455456
hasFailedEpochFence = false;
456457

457458
return prepareTransitionTo(newState, producerId, producerEpoch, lastProducerEpoch,
458-
txnTimeoutMs, Collections.emptySet(), txnStartTimestamp, updateTimestamp);
459+
txnTimeoutMs, ImmutableSet.of(), txnStartTimestamp, updateTimestamp);
459460
}
460461

461462
public TxnTransitMetadata prepareDead() {
462463
return prepareTransitionTo(TransactionState.DEAD, producerId, producerEpoch, lastProducerEpoch, txnTimeoutMs,
463-
Collections.emptySet(), txnStartTimestamp, txnLastUpdateTimestamp);
464+
ImmutableSet.of(), txnStartTimestamp, txnLastUpdateTimestamp);
464465
}
465466

466467
private void throwStateTransitionFailure(TxnTransitMetadata txnTransitMetadata) throws IllegalStateException {
@@ -484,27 +485,11 @@ public void removePartition(TopicPartition topicPartition) {
484485
+ "trying to remove partitions whose txn marker has been sent, this is not expected",
485486
state, pendingState));
486487
}
487-
Set<TopicPartition> newTopicPartitions = new HashSet<>(topicPartitions);
488-
newTopicPartitions.remove(topicPartition);
489-
this.topicPartitions = Collections.unmodifiableSet(newTopicPartitions);
490-
}
491-
492-
public void removePartitions(Set<TopicPartition> topicPartitions) {
493-
if (state != TransactionState.PREPARE_COMMIT && state != TransactionState.PREPARE_ABORT) {
494-
throw new IllegalStateException(
495-
String.format("Transaction metadata's current state is %s, and its pending state is %s while "
496-
+ "trying to remove partitions whose txn marker has been sent, this is not expected",
497-
state, pendingState));
498-
}
499-
Set<TopicPartition> newTopicPartitions = new HashSet<>(topicPartitions);
500-
newTopicPartitions.removeAll(topicPartitions);
501-
this.topicPartitions = Collections.unmodifiableSet(newTopicPartitions);
488+
topicPartitions.remove(topicPartition);
502489
}
503490

504491
public void addPartitions(Set<TopicPartition> partitions) {
505-
Set<TopicPartition> newTopicPartitions = new HashSet<>(topicPartitions);
506-
newTopicPartitions.addAll(partitions);
507-
this.topicPartitions = Collections.unmodifiableSet(newTopicPartitions);
492+
topicPartitions.addAll(partitions);
508493
}
509494

510495
public boolean pendingTransitionInProgress() {

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionCoordinatorTest.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import static org.testng.AssertJUnit.assertNotNull;
2929
import static org.testng.AssertJUnit.assertTrue;
3030

31+
import com.google.common.collect.ImmutableSet;
3132
import com.google.common.collect.Sets;
3233
import io.streamnative.pulsar.handlers.kop.KafkaProtocolHandler;
3334
import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase;
@@ -845,7 +846,7 @@ private void mockPrepare(TransactionState transactionState) {
845846
.lastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
846847
.txnTimeoutMs(txnTimeoutMs)
847848
.txnState(transactionState)
848-
.topicPartitions(partitions)
849+
.topicPartitions(ImmutableSet.copyOf(partitions))
849850
.txnStartTimestamp(now)
850851
.txnLastUpdateTimestamp(now)
851852
.build();
@@ -1372,7 +1373,7 @@ public void shouldUseLastEpochToFenceWhenEpochsAreExhausted() {
13721373
RecordBatch.NO_PRODUCER_EPOCH,
13731374
txnTimeoutMs,
13741375
TransactionState.PREPARE_ABORT,
1375-
partitions,
1376+
ImmutableSet.copyOf(partitions),
13761377
time.milliseconds(),
13771378
time.milliseconds()
13781379
)),
@@ -1703,7 +1704,7 @@ public void shouldAbortExpiredTransactionsInOngoingStateAndBumpEpoch() {
17031704
(short) -1,
17041705
txnTimeoutMs,
17051706
TransactionState.PREPARE_ABORT,
1706-
partitions,
1707+
ImmutableSet.copyOf(partitions),
17071708
now,
17081709
now + DefaultAbortTimedOutTransactionsIntervalMs);
17091710
time.sleep(DefaultAbortTimedOutTransactionsIntervalMs);
@@ -1863,7 +1864,7 @@ public void shouldNotBumpEpochWhenAbortingExpiredTransactionIfAppendToLogFails()
18631864
.lastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
18641865
.txnTimeoutMs(txnTimeoutMs)
18651866
.txnState(TransactionState.PREPARE_ABORT)
1866-
.topicPartitions(partitions)
1867+
.topicPartitions(ImmutableSet.copyOf(partitions))
18671868
.txnStartTimestamp(now)
18681869
.txnLastUpdateTimestamp(now + TransactionConfig.DefaultAbortTimedOutTransactionsIntervalMs)
18691870
.build();

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionMarkerChannelManagerTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.mockito.Mockito.when;
2020
import static org.testng.Assert.assertEquals;
2121

22+
import com.google.common.collect.ImmutableSet;
2223
import com.google.common.collect.Sets;
2324
import io.streamnative.pulsar.handlers.kop.KafkaServiceConfiguration;
2425
import io.streamnative.pulsar.handlers.kop.KopBrokerLookupManager;
@@ -110,7 +111,7 @@ public void testTopicDeletedBeforeWriteMarker(boolean isTopicExists) {
110111
.lastProducerEpoch(RecordBatch.NO_PRODUCER_EPOCH)
111112
.txnTimeoutMs(txnTimeoutMs)
112113
.txnState(TransactionState.COMPLETE_COMMIT)
113-
.topicPartitions(partitions)
114+
.topicPartitions(ImmutableSet.copyOf(partitions))
114115
.txnStartTimestamp(time.milliseconds())
115116
.txnLastUpdateTimestamp(time.milliseconds())
116117
.build();

tests/src/test/java/io/streamnative/pulsar/handlers/kop/coordinator/transaction/TransactionStateManagerTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import static org.testng.AssertJUnit.assertTrue;
2222
import static org.testng.AssertJUnit.fail;
2323

24+
import com.google.common.collect.ImmutableSet;
2425
import com.google.common.collect.Lists;
2526
import io.streamnative.pulsar.handlers.kop.KopProtocolHandlerTestBase;
2627
import io.streamnative.pulsar.handlers.kop.SystemTopicClient;
@@ -221,7 +222,7 @@ public void shouldNotReadExpiredLogWhenTopicAlreadyCompacted() throws Exception
221222
(short) 0,
222223
0,
223224
TransactionState.COMPLETE_COMMIT,
224-
Collections.emptySet(),
225+
ImmutableSet.of(),
225226
now,
226227
now);
227228

@@ -235,7 +236,7 @@ public void shouldNotReadExpiredLogWhenTopicAlreadyCompacted() throws Exception
235236
(short) 0,
236237
0,
237238
TransactionState.COMPLETE_COMMIT,
238-
Collections.emptySet(),
239+
ImmutableSet.of(),
239240
now + txnConfig.getTransactionalIdExpirationMs(),
240241
now + txnConfig.getTransactionalIdExpirationMs());
241242

0 commit comments

Comments
 (0)