@@ -159,17 +159,28 @@ public void setup() {
159159 this .client .updateMetadata (RequestTestUtils .metadataUpdateWith (1 , singletonMap ("test" , 2 )));
160160 this .brokerNode = new Node (0 , "localhost" , 2211 );
161161
162- initializeTransactionManager (Optional .of (transactionalId ), false );
162+ initializeTransactionManager (Optional .of (transactionalId ), false , false );
163+ }
164+
165+ private void initializeTransactionManager (
166+ Optional <String > transactionalId ,
167+ boolean transactionV2Enabled
168+ ) {
169+ initializeTransactionManager (transactionalId , transactionV2Enabled , false );
163170 }
164171
165- private void initializeTransactionManager (Optional <String > transactionalId , boolean transactionV2Enabled ) {
172+ private void initializeTransactionManager (
173+ Optional <String > transactionalId ,
174+ boolean transactionV2Enabled ,
175+ boolean enable2pc
176+ ) {
166177 Metrics metrics = new Metrics (time );
167178
168179 apiVersions .update ("0" , new NodeApiVersions (Arrays .asList (
169180 new ApiVersion ()
170181 .setApiKey (ApiKeys .INIT_PRODUCER_ID .id )
171182 .setMinVersion ((short ) 0 )
172- .setMaxVersion ((short ) 3 ),
183+ .setMaxVersion ((short ) 6 ),
173184 new ApiVersion ()
174185 .setApiKey (ApiKeys .PRODUCE .id )
175186 .setMinVersion ((short ) 0 )
@@ -189,7 +200,7 @@ private void initializeTransactionManager(Optional<String> transactionalId, bool
189200 finalizedFeaturesEpoch ));
190201 finalizedFeaturesEpoch += 1 ;
191202 this .transactionManager = new TransactionManager (logContext , transactionalId .orElse (null ),
192- transactionTimeoutMs , DEFAULT_RETRY_BACKOFF_MS , apiVersions , false );
203+ transactionTimeoutMs , DEFAULT_RETRY_BACKOFF_MS , apiVersions , enable2pc );
193204
194205 int batchSize = 16 * 1024 ;
195206 int deliveryTimeoutMs = 3000 ;
@@ -4035,16 +4046,39 @@ private void prepareFindCoordinatorResponse(Errors error, boolean shouldDisconne
40354046 }, FindCoordinatorResponse .prepareResponse (error , coordinatorKey , brokerNode ), shouldDisconnect );
40364047 }
40374048
4038- private void prepareInitPidResponse (Errors error , boolean shouldDisconnect , long producerId , short producerEpoch ) {
4049+ private void prepareInitPidResponse (
4050+ Errors error ,
4051+ boolean shouldDisconnect ,
4052+ long producerId ,
4053+ short producerEpoch
4054+ ) {
4055+ prepareInitPidResponse (error , shouldDisconnect , producerId , producerEpoch , false , false , (long ) -1 , (short ) -1 );
4056+ }
4057+
4058+ private void prepareInitPidResponse (
4059+ Errors error ,
4060+ boolean shouldDisconnect ,
4061+ long producerId ,
4062+ short producerEpoch ,
4063+ boolean keepPreparedTxn ,
4064+ boolean enable2Pc ,
4065+ long ongoingProducerId ,
4066+ short ongoingProducerEpoch
4067+ ) {
40394068 InitProducerIdResponseData responseData = new InitProducerIdResponseData ()
4040- .setErrorCode (error .code ())
4041- .setProducerEpoch (producerEpoch )
4042- .setProducerId (producerId )
4043- .setThrottleTimeMs (0 );
4069+ .setErrorCode (error .code ())
4070+ .setProducerEpoch (producerEpoch )
4071+ .setProducerId (producerId )
4072+ .setThrottleTimeMs (0 )
4073+ .setOngoingTxnProducerId (ongoingProducerId )
4074+ .setOngoingTxnProducerEpoch (ongoingProducerEpoch );
4075+
40444076 client .prepareResponse (body -> {
40454077 InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest ) body ;
40464078 assertEquals (transactionalId , initProducerIdRequest .data ().transactionalId ());
40474079 assertEquals (transactionTimeoutMs , initProducerIdRequest .data ().transactionTimeoutMs ());
4080+ assertEquals (keepPreparedTxn , initProducerIdRequest .data ().keepPreparedTxn ());
4081+ assertEquals (enable2Pc , initProducerIdRequest .data ().enable2Pc ());
40484082 return true ;
40494083 }, new InitProducerIdResponse (responseData ), shouldDisconnect );
40504084 }
@@ -4373,4 +4407,36 @@ private void runUntil(Supplier<Boolean> condition) {
43734407 ProducerTestUtils .runUntil (sender , condition );
43744408 }
43754409
4410+ @ Test
4411+ public void testInitializeTransactionsWithKeepPreparedTxn () {
4412+ initializeTransactionManager (Optional .of (transactionalId ), true , true );
4413+
4414+ client .prepareResponse (
4415+ FindCoordinatorResponse .prepareResponse (Errors .NONE , transactionalId , brokerNode )
4416+ );
4417+
4418+ // Simulate an ongoing prepared transaction (ongoingProducerId != -1).
4419+ long ongoingProducerId = 999L ;
4420+ short ongoingEpoch = 10 ;
4421+ short bumpedEpoch = 11 ;
4422+
4423+ prepareInitPidResponse (
4424+ Errors .NONE ,
4425+ false ,
4426+ ongoingProducerId ,
4427+ bumpedEpoch ,
4428+ true ,
4429+ true ,
4430+ ongoingProducerId ,
4431+ ongoingEpoch
4432+ );
4433+
4434+ transactionManager .initializeTransactions (true );
4435+ runUntil (transactionManager ::hasProducerId );
4436+
4437+ assertTrue (transactionManager .hasProducerId ());
4438+ assertFalse (transactionManager .hasOngoingTransaction ());
4439+ assertEquals (ongoingProducerId , transactionManager .producerIdAndEpoch ().producerId );
4440+ assertEquals (bumpedEpoch , transactionManager .producerIdAndEpoch ().epoch );
4441+ }
43764442}
0 commit comments