17
17
18
18
package org .apache .flink .connector .kafka .sink ;
19
19
20
+ import org .apache .flink .api .java .tuple .Tuple2 ;
20
21
import org .apache .flink .configuration .Configuration ;
21
22
import org .apache .flink .connector .base .DeliveryGuarantee ;
22
23
import org .apache .flink .connector .kafka .sink .internal .FlinkKafkaInternalProducer ;
43
44
import java .util .Collection ;
44
45
import java .util .List ;
45
46
import java .util .Properties ;
47
+ import java .util .function .Consumer ;
46
48
47
49
import static org .apache .flink .connector .kafka .testutils .KafkaUtil .drainAllRecordsFromTopic ;
48
50
import static org .assertj .core .api .Assertions .assertThat ;
@@ -61,6 +63,9 @@ public class ExactlyOnceKafkaWriterITCase extends KafkaWriterTestBase {
61
63
.setConfiguration (new Configuration ())
62
64
.build ());
63
65
66
+ private static final Consumer <KafkaSinkBuilder <?>> EXACTLY_ONCE =
67
+ sink -> sink .setDeliveryGuarantee (DeliveryGuarantee .EXACTLY_ONCE );
68
+
64
69
@ Test
65
70
void testFlushAsyncErrorPropagationAndErrorCounter () throws Exception {
66
71
Properties properties = getKafkaClientConfiguration ();
@@ -197,36 +202,35 @@ private void triggerProducerException(KafkaWriter<Integer> writer, Properties pr
197
202
198
203
/** Test that producer is not accidentally recreated or pool is used. */
199
204
@ Test
200
- void testLingeringTransaction () throws Exception {
201
- final KafkaWriter <Integer > failedWriter = createWriter (DeliveryGuarantee .EXACTLY_ONCE );
205
+ void shouldAbortLingeringTransactions () throws Exception {
206
+ try (final ExactlyOnceKafkaWriter <Integer > failedWriter =
207
+ createWriter (DeliveryGuarantee .EXACTLY_ONCE )) {
202
208
203
- // create two lingering transactions
204
- failedWriter .flush (false );
205
- failedWriter .prepareCommit ();
206
- failedWriter .snapshotState (1 );
207
- failedWriter .flush (false );
208
- failedWriter .prepareCommit ();
209
- failedWriter .snapshotState (2 );
209
+ // create two lingering transactions
210
+ onCheckpointBarrier (failedWriter , 1 );
211
+ onCheckpointBarrier (failedWriter , 2 );
210
212
211
- try (final KafkaWriter <Integer > recoveredWriter =
212
- createWriter (DeliveryGuarantee .EXACTLY_ONCE )) {
213
- recoveredWriter .write (1 , SINK_WRITER_CONTEXT );
213
+ // use state to ensure that the new writer knows about the old prefix
214
+ KafkaWriterState state = new KafkaWriterState (failedWriter .getTransactionalIdPrefix ());
214
215
215
- recoveredWriter .flush (false );
216
- Collection <KafkaCommittable > committables = recoveredWriter .prepareCommit ();
217
- recoveredWriter .snapshotState (1 );
218
- assertThat (committables ).hasSize (1 );
219
- final KafkaCommittable committable = committables .stream ().findFirst ().get ();
220
- assertThat (committable .getProducer ().isPresent ()).isTrue ();
216
+ try (final KafkaWriter <Integer > recoveredWriter =
217
+ restoreWriter (EXACTLY_ONCE , List .of (state ), createInitContext ())) {
218
+ recoveredWriter .write (1 , SINK_WRITER_CONTEXT );
221
219
222
- committable .getProducer ().get ().commitTransaction ();
220
+ recoveredWriter .flush (false );
221
+ Collection <KafkaCommittable > committables = recoveredWriter .prepareCommit ();
222
+ recoveredWriter .snapshotState (1 );
223
+ assertThat (committables ).hasSize (1 );
224
+ final KafkaCommittable committable = committables .stream ().findFirst ().get ();
225
+ assertThat (committable .getProducer ().isPresent ()).isTrue ();
223
226
224
- List <ConsumerRecord <byte [], byte []>> records =
225
- drainAllRecordsFromTopic (topic , getKafkaClientConfiguration (), true );
226
- assertThat (records ).hasSize (1 );
227
- }
227
+ committable .getProducer ().get ().commitTransaction ();
228
228
229
- failedWriter .close ();
229
+ List <ConsumerRecord <byte [], byte []>> records =
230
+ drainAllRecordsFromTopic (topic , getKafkaClientConfiguration (), true );
231
+ assertThat (records ).hasSize (1 );
232
+ }
233
+ }
230
234
}
231
235
232
236
/** Test that producers are reused when committed. */
@@ -332,4 +336,15 @@ private static Collection<FlinkKafkaInternalProducer<byte[], byte[]>> getProduce
332
336
ExactlyOnceKafkaWriter <Integer > writer ) {
333
337
return ((ProducerPoolImpl ) writer .getProducerPool ()).getProducers ();
334
338
}
339
+
340
+ private Tuple2 <KafkaWriterState , KafkaCommittable > onCheckpointBarrier (
341
+ KafkaWriter <Integer > failedWriter , int checkpointId )
342
+ throws IOException , InterruptedException {
343
+ // constant number to force the same partition
344
+ failedWriter .write (1 , SINK_WRITER_CONTEXT );
345
+ failedWriter .flush (false );
346
+ KafkaCommittable committable = Iterables .getOnlyElement (failedWriter .prepareCommit ());
347
+ KafkaWriterState state = Iterables .getOnlyElement (failedWriter .snapshotState (checkpointId ));
348
+ return Tuple2 .of (state , committable );
349
+ }
335
350
}
0 commit comments