27
27
import org .apache .flink .metrics .Gauge ;
28
28
import org .apache .flink .metrics .MetricGroup ;
29
29
import org .apache .flink .metrics .groups .OperatorIOMetricGroup ;
30
+ import org .apache .flink .metrics .groups .OperatorMetricGroup ;
30
31
import org .apache .flink .metrics .groups .SinkWriterMetricGroup ;
31
32
import org .apache .flink .metrics .testutils .MetricListener ;
32
33
import org .apache .flink .runtime .metrics .groups .InternalSinkWriterMetricGroup ;
34
+ import org .apache .flink .runtime .metrics .groups .ProxyMetricGroup ;
33
35
import org .apache .flink .runtime .metrics .groups .UnregisteredMetricGroups ;
34
36
import org .apache .flink .util .TestLoggerExtension ;
35
37
import org .apache .flink .util .UserCodeClassLoader ;
58
60
import java .nio .ByteBuffer ;
59
61
import java .util .ArrayList ;
60
62
import java .util .Collection ;
61
- import java .util .Collections ;
62
63
import java .util .Comparator ;
63
64
import java .util .List ;
64
65
import java .util .Optional ;
@@ -84,7 +85,7 @@ public class KafkaWriterITCase {
84
85
private static final Network NETWORK = Network .newNetwork ();
85
86
private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total" ;
86
87
private static final SinkWriter .Context SINK_WRITER_CONTEXT = new DummySinkWriterContext ();
87
- private String topic ;
88
+ private static String topic ;
88
89
89
90
private MetricListener metricListener ;
90
91
private TriggerTimeService timeService ;
@@ -130,11 +131,8 @@ public void testNotRegisterMetrics(DeliveryGuarantee guarantee) throws Exception
130
131
131
132
@ Test
132
133
public void testIncreasingRecordBasedCounters () throws Exception {
133
- final OperatorIOMetricGroup operatorIOMetricGroup =
134
- UnregisteredMetricGroups .createUnregisteredOperatorMetricGroup ().getIOMetricGroup ();
135
- final InternalSinkWriterMetricGroup metricGroup =
136
- InternalSinkWriterMetricGroup .mock (
137
- metricListener .getMetricGroup (), operatorIOMetricGroup );
134
+ final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup ();
135
+
138
136
try (final KafkaWriter <Integer > writer =
139
137
createWriterWithConfiguration (
140
138
getKafkaClientConfiguration (), DeliveryGuarantee .NONE , metricGroup )) {
@@ -167,13 +165,9 @@ public void testIncreasingRecordBasedCounters() throws Exception {
167
165
168
166
@ Test
169
167
public void testCurrentSendTimeMetric () throws Exception {
170
- final InternalSinkWriterMetricGroup metricGroup =
171
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ());
172
168
try (final KafkaWriter <Integer > writer =
173
169
createWriterWithConfiguration (
174
- getKafkaClientConfiguration (),
175
- DeliveryGuarantee .AT_LEAST_ONCE ,
176
- metricGroup )) {
170
+ getKafkaClientConfiguration (), DeliveryGuarantee .AT_LEAST_ONCE )) {
177
171
final Optional <Gauge <Long >> currentSendTime =
178
172
metricListener .getGauge ("currentSendTime" );
179
173
assertThat (currentSendTime .isPresent ()).isTrue ();
@@ -199,16 +193,12 @@ public void testCurrentSendTimeMetric() throws Exception {
199
193
void testFlushAsyncErrorPropagationAndErrorCounter () throws Exception {
200
194
Properties properties = getKafkaClientConfiguration ();
201
195
202
- SinkInitContext sinkInitContext =
203
- new SinkInitContext (
204
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ()),
205
- timeService ,
206
- null );
196
+ final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup ();
197
+
207
198
final KafkaWriter <Integer > writer =
208
199
createWriterWithConfiguration (
209
- properties , DeliveryGuarantee .EXACTLY_ONCE , sinkInitContext );
210
- final Counter numRecordsOutErrors =
211
- sinkInitContext .metricGroup .getNumRecordsOutErrorsCounter ();
200
+ properties , DeliveryGuarantee .EXACTLY_ONCE , metricGroup );
201
+ final Counter numRecordsOutErrors = metricGroup .getNumRecordsOutErrorsCounter ();
212
202
assertThat (numRecordsOutErrors .getCount ()).isEqualTo (0L );
213
203
214
204
triggerProducerException (writer , properties );
@@ -228,16 +218,12 @@ void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
228
218
void testWriteAsyncErrorPropagationAndErrorCounter () throws Exception {
229
219
Properties properties = getKafkaClientConfiguration ();
230
220
231
- SinkInitContext sinkInitContext =
232
- new SinkInitContext (
233
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ()),
234
- timeService ,
235
- null );
221
+ final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup ();
222
+
236
223
final KafkaWriter <Integer > writer =
237
224
createWriterWithConfiguration (
238
- properties , DeliveryGuarantee .EXACTLY_ONCE , sinkInitContext );
239
- final Counter numRecordsOutErrors =
240
- sinkInitContext .metricGroup .getNumRecordsOutErrorsCounter ();
225
+ properties , DeliveryGuarantee .EXACTLY_ONCE , metricGroup );
226
+ final Counter numRecordsOutErrors = metricGroup .getNumRecordsOutErrorsCounter ();
241
227
assertThat (numRecordsOutErrors .getCount ()).isEqualTo (0L );
242
228
243
229
triggerProducerException (writer , properties );
@@ -259,10 +245,8 @@ void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
259
245
Properties properties = getKafkaClientConfiguration ();
260
246
261
247
SinkInitContext sinkInitContext =
262
- new SinkInitContext (
263
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ()),
264
- timeService ,
265
- null );
248
+ new SinkInitContext (createSinkWriterMetricGroup (), timeService , null );
249
+
266
250
final KafkaWriter <Integer > writer =
267
251
createWriterWithConfiguration (
268
252
properties , DeliveryGuarantee .EXACTLY_ONCE , sinkInitContext );
@@ -289,16 +273,12 @@ void testMailboxAsyncErrorPropagationAndErrorCounter() throws Exception {
289
273
void testCloseAsyncErrorPropagationAndErrorCounter () throws Exception {
290
274
Properties properties = getKafkaClientConfiguration ();
291
275
292
- SinkInitContext sinkInitContext =
293
- new SinkInitContext (
294
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ()),
295
- timeService ,
296
- null );
276
+ final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup ();
277
+
297
278
final KafkaWriter <Integer > writer =
298
279
createWriterWithConfiguration (
299
- properties , DeliveryGuarantee .EXACTLY_ONCE , sinkInitContext );
300
- final Counter numRecordsOutErrors =
301
- sinkInitContext .metricGroup .getNumRecordsOutErrorsCounter ();
280
+ properties , DeliveryGuarantee .EXACTLY_ONCE , metricGroup );
281
+ final Counter numRecordsOutErrors = metricGroup .getNumRecordsOutErrorsCounter ();
302
282
assertThat (numRecordsOutErrors .getCount ()).isEqualTo (0L );
303
283
304
284
triggerProducerException (writer , properties );
@@ -334,7 +314,7 @@ public void testMetadataPublisher() throws Exception {
334
314
createWriterWithConfiguration (
335
315
getKafkaClientConfiguration (),
336
316
DeliveryGuarantee .AT_LEAST_ONCE ,
337
- InternalSinkWriterMetricGroup . mock ( metricListener . getMetricGroup () ),
317
+ createSinkWriterMetricGroup ( ),
338
318
meta -> metadataList .add (meta .toString ()))) {
339
319
List <String > expected = new ArrayList <>();
340
320
for (int i = 0 ; i < 100 ; i ++) {
@@ -518,45 +498,53 @@ private void assertKafkaMetricNotPresent(
518
498
}
519
499
520
500
private KafkaWriter <Integer > createWriterWithConfiguration (
521
- Properties config , DeliveryGuarantee guarantee ) {
522
- return createWriterWithConfiguration (
523
- config ,
524
- guarantee ,
525
- InternalSinkWriterMetricGroup .mock (metricListener .getMetricGroup ()));
501
+ Properties config , DeliveryGuarantee guarantee ) throws IOException {
502
+ return createWriterWithConfiguration (config , guarantee , createSinkWriterMetricGroup ());
526
503
}
527
504
528
505
private KafkaWriter <Integer > createWriterWithConfiguration (
529
506
Properties config ,
530
507
DeliveryGuarantee guarantee ,
531
- SinkWriterMetricGroup sinkWriterMetricGroup ) {
508
+ SinkWriterMetricGroup sinkWriterMetricGroup )
509
+ throws IOException {
532
510
return createWriterWithConfiguration (config , guarantee , sinkWriterMetricGroup , null );
533
511
}
534
512
535
513
private KafkaWriter <Integer > createWriterWithConfiguration (
536
514
Properties config ,
537
515
DeliveryGuarantee guarantee ,
538
516
SinkWriterMetricGroup sinkWriterMetricGroup ,
539
- @ Nullable Consumer <RecordMetadata > metadataConsumer ) {
540
- return new KafkaWriter <>(
541
- guarantee ,
542
- config ,
543
- "test-prefix" ,
544
- new SinkInitContext (sinkWriterMetricGroup , timeService , metadataConsumer ),
545
- new DummyRecordSerializer (),
546
- new DummySchemaContext (),
547
- Collections .emptyList ());
517
+ @ Nullable Consumer <RecordMetadata > metadataConsumer )
518
+ throws IOException {
519
+ KafkaSink <Integer > kafkaSink =
520
+ KafkaSink .<Integer >builder ()
521
+ .setKafkaProducerConfig (config )
522
+ .setDeliveryGuarantee (guarantee )
523
+ .setTransactionalIdPrefix ("test-prefix" )
524
+ .setRecordSerializer (new DummyRecordSerializer ())
525
+ .build ();
526
+ return (KafkaWriter <Integer >)
527
+ kafkaSink .createWriter (
528
+ new SinkInitContext (sinkWriterMetricGroup , timeService , metadataConsumer ));
548
529
}
549
530
550
531
private KafkaWriter <Integer > createWriterWithConfiguration (
551
- Properties config , DeliveryGuarantee guarantee , SinkInitContext sinkInitContext ) {
552
- return new KafkaWriter <>(
553
- guarantee ,
554
- config ,
555
- "test-prefix" ,
556
- sinkInitContext ,
557
- new DummyRecordSerializer (),
558
- new DummySchemaContext (),
559
- Collections .emptyList ());
532
+ Properties config , DeliveryGuarantee guarantee , SinkInitContext sinkInitContext )
533
+ throws IOException {
534
+ KafkaSink <Integer > kafkaSink =
535
+ KafkaSink .<Integer >builder ()
536
+ .setKafkaProducerConfig (config )
537
+ .setDeliveryGuarantee (guarantee )
538
+ .setTransactionalIdPrefix ("test-prefix" )
539
+ .setRecordSerializer (new DummyRecordSerializer ())
540
+ .build ();
541
+ return (KafkaWriter <Integer >) kafkaSink .createWriter (sinkInitContext );
542
+ }
543
+
544
+ private SinkWriterMetricGroup createSinkWriterMetricGroup () {
545
+ DummyOperatorMetricGroup operatorMetricGroup =
546
+ new DummyOperatorMetricGroup (metricListener .getMetricGroup ());
547
+ return InternalSinkWriterMetricGroup .wrap (operatorMetricGroup );
560
548
}
561
549
562
550
private static Properties getKafkaClientConfiguration () {
@@ -632,7 +620,7 @@ public <MetaT> Optional<Consumer<MetaT>> metadataConsumer() {
632
620
}
633
621
}
634
622
635
- private class DummyRecordSerializer implements KafkaRecordSerializationSchema <Integer > {
623
+ private static class DummyRecordSerializer implements KafkaRecordSerializationSchema <Integer > {
636
624
@ Override
637
625
public ProducerRecord <byte [], byte []> serialize (
638
626
Integer element , KafkaSinkContext context , Long timestamp ) {
@@ -644,28 +632,33 @@ public ProducerRecord<byte[], byte[]> serialize(
644
632
}
645
633
}
646
634
647
- private static class DummySchemaContext implements SerializationSchema .InitializationContext {
648
-
635
+ private static class DummySinkWriterContext implements SinkWriter .Context {
649
636
@ Override
650
- public MetricGroup getMetricGroup () {
651
- throw new UnsupportedOperationException ( "Not implemented." ) ;
637
+ public long currentWatermark () {
638
+ return 0 ;
652
639
}
653
640
654
641
@ Override
655
- public UserCodeClassLoader getUserCodeClassLoader () {
656
- throw new UnsupportedOperationException ( "Not implemented." ) ;
642
+ public Long timestamp () {
643
+ return null ;
657
644
}
658
645
}
659
646
660
- private static class DummySinkWriterContext implements SinkWriter .Context {
661
- @ Override
662
- public long currentWatermark () {
663
- return 0 ;
647
+ private static class DummyOperatorMetricGroup extends ProxyMetricGroup <MetricGroup >
648
+ implements OperatorMetricGroup {
649
+
650
+ private final OperatorIOMetricGroup operatorIOMetricGroup ;
651
+
652
+ public DummyOperatorMetricGroup (MetricGroup parentMetricGroup ) {
653
+ super (parentMetricGroup );
654
+ this .operatorIOMetricGroup =
655
+ UnregisteredMetricGroups .createUnregisteredOperatorMetricGroup ()
656
+ .getIOMetricGroup ();
664
657
}
665
658
666
659
@ Override
667
- public Long timestamp () {
668
- return null ;
660
+ public OperatorIOMetricGroup getIOMetricGroup () {
661
+ return operatorIOMetricGroup ;
669
662
}
670
663
}
671
664
0 commit comments