Skip to content

Commit f99b31f

Browse files
committed
Add test to kafka producer
1 parent eda67cf commit f99b31f

File tree

1 file changed

+57
-0
lines changed

1 file changed

+57
-0
lines changed

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.kafka.common.requests.FindCoordinatorRequest;
7575
import org.apache.kafka.common.requests.FindCoordinatorResponse;
7676
import org.apache.kafka.common.requests.InitProducerIdResponse;
77+
import org.apache.kafka.common.requests.InitProducerIdRequest;
7778
import org.apache.kafka.common.requests.JoinGroupRequest;
7879
import org.apache.kafka.common.requests.MetadataResponse;
7980
import org.apache.kafka.common.requests.ProduceResponse;
@@ -102,7 +103,10 @@
102103
import org.junit.jupiter.api.Test;
103104
import org.junit.jupiter.api.TestInfo;
104105
import org.junit.jupiter.params.ParameterizedTest;
106+
import org.junit.jupiter.params.provider.Arguments;
107+
import org.junit.jupiter.params.provider.MethodSource;
105108
import org.junit.jupiter.params.provider.ValueSource;
109+
import org.junit.jupiter.params.provider.CsvSource;
106110
import org.mockito.MockedStatic;
107111
import org.mockito.Mockito;
108112
import org.mockito.internal.stubbing.answers.CallsRealMethods;
@@ -1364,6 +1368,59 @@ public void testInitTransactionWhileThrottled() {
13641368
}
13651369
}
13661370

1371+
@ParameterizedTest
1372+
@CsvSource({
1373+
"true, false",
1374+
"true, true",
1375+
"false, true"
1376+
})
1377+
public void testInitTransactionsWithKeepPreparedTxnAndTwoPhaseCommit(boolean keepPreparedTxn, boolean enable2PC) {
1378+
Map<String, Object> configs = new HashMap<>();
1379+
configs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "test-txn-id");
1380+
configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000);
1381+
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
1382+
if (enable2PC) {
1383+
configs.put(ProducerConfig.TRANSACTION_TWO_PHASE_COMMIT_ENABLE_CONFIG, true);
1384+
}
1385+
1386+
Time time = new MockTime(1);
1387+
MetadataResponse initialUpdateResponse = RequestTestUtils.metadataUpdateWith(1, singletonMap("topic", 1));
1388+
ProducerMetadata metadata = newMetadata(0, 0, Long.MAX_VALUE);
1389+
MockClient client = new MockClient(time, metadata);
1390+
client.updateMetadata(initialUpdateResponse);
1391+
1392+
// Capture flags from the InitProducerIdRequest
1393+
boolean[] requestFlags = new boolean[2]; // [keepPreparedTxn, enable2Pc]
1394+
1395+
client.prepareResponse(
1396+
request -> request instanceof FindCoordinatorRequest &&
1397+
((FindCoordinatorRequest) request).data().keyType() == FindCoordinatorRequest.CoordinatorType.TRANSACTION.id(),
1398+
FindCoordinatorResponse.prepareResponse(Errors.NONE, "test-txn-id", NODE));
1399+
1400+
client.prepareResponse(
1401+
request -> {
1402+
if (request instanceof InitProducerIdRequest) {
1403+
InitProducerIdRequest initRequest = (InitProducerIdRequest) request;
1404+
requestFlags[0] = initRequest.data().keepPreparedTxn();
1405+
requestFlags[1] = initRequest.data().enable2Pc();
1406+
return true;
1407+
}
1408+
return false;
1409+
},
1410+
initProducerIdResponse(1L, (short) 5, Errors.NONE));
1411+
1412+
try (Producer<String, String> producer = kafkaProducer(configs, new StringSerializer(),
1413+
new StringSerializer(), metadata, client, null, time)) {
1414+
producer.initTransactions(keepPreparedTxn);
1415+
1416+
// Verify request flags match expected values
1417+
assertEquals(keepPreparedTxn, requestFlags[0],
1418+
"keepPreparedTxn flag should match input parameter");
1419+
assertEquals(enable2PC, requestFlags[1],
1420+
"enable2Pc flag should match producer configuration");
1421+
}
1422+
}
1423+
13671424
@Test
13681425
public void testClusterAuthorizationFailure() throws Exception {
13691426
int maxBlockMs = 500;

0 commit comments

Comments
 (0)