diff --git a/flink-connector-pulsar/archunit-violations/284bb951-3f59-4332-b61b-6f65b674f2ac b/flink-connector-pulsar/archunit-violations/284bb951-3f59-4332-b61b-6f65b674f2ac index 0511fea4..4494abe6 100644 --- a/flink-connector-pulsar/archunit-violations/284bb951-3f59-4332-b61b-6f65b674f2ac +++ b/flink-connector-pulsar/archunit-violations/284bb951-3f59-4332-b61b-6f65b674f2ac @@ -4,188 +4,25 @@ Class has annotation Class is annotated with in (PulsarSinkOptions.java:0) Class has annotation member of type in (PulsarSourceOptions.java:0) Class is annotated with in (PulsarSourceOptions.java:0) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:69) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:70) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:71) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:72) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:75) -Constructor (org.apache.pulsar.client.api.Schema)> calls method in (PulsarSchema.java:79) -Constructor (org.apache.pulsar.client.api.Schema, java.lang.Class)> calls method in (PulsarSchema.java:93) -Constructor (org.apache.pulsar.client.api.Schema, java.lang.Class, java.lang.Class)> calls method in (PulsarSchema.java:106) -Constructor (org.apache.pulsar.common.schema.SchemaType, org.apache.pulsar.client.api.Schema, org.apache.flink.api.common.typeinfo.TypeInformation)> calls method in (PrimitiveSchemaFactory.java:69) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:102) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:103) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:104) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:107) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:109) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:116) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto)> calls method in (PulsarSink.java:117) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration)> calls method in (PulsarCommitter.java:64) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:106) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:107) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:108) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:109) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:110) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration, org.apache.flink.connector.pulsar.sink.writer.serializer.PulsarSerializationSchema, org.apache.flink.connector.pulsar.sink.writer.topic.MetadataListener, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer, org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto, org.apache.flink.api.connector.sink2.Sink$InitContext)> calls method in (PulsarWriter.java:111) -Constructor (org.apache.flink.connector.pulsar.sink.config.SinkConfiguration)> calls method in (RoundRobinTopicRouter.java:50) -Constructor (java.lang.Long)> calls method in (CursorPosition.java:74) -Constructor (org.apache.pulsar.client.api.MessageId, boolean)> calls method in (CursorPosition.java:61) -Constructor (org.apache.pulsar.client.api.MessageId, boolean)> calls method in (MessageIdStopCursor.java:44) -Constructor (org.apache.pulsar.client.api.MessageId, boolean)> calls method in (MessageIdStopCursor.java:45) -Constructor (java.lang.String, int)> calls method in (TopicMetadata.java:40) -Constructor (java.lang.String, int, java.util.List)> calls method in (TopicPartition.java:104) -Constructor (java.lang.String, int, java.util.List)> calls method in (TopicPartition.java:114) -Constructor (int, int)> calls method in (TopicRange.java:63) -Constructor (int, int)> calls method in (TopicRange.java:61) -Constructor (int, int)> calls method in (TopicRange.java:62) -Constructor (org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor)> calls method in (PulsarPartitionSplit.java:56) -Constructor (org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor)> calls method in (PulsarPartitionSplit.java:57) -Constructor (org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.MessageId, org.apache.pulsar.client.api.transaction.TxnID)> calls method in (PulsarPartitionSplit.java:67) -Constructor (org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.MessageId, org.apache.pulsar.client.api.transaction.TxnID)> calls method in (PulsarPartitionSplit.java:68) -Constructor (org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.connector.pulsar.table.sink.PulsarWritableMetadata, boolean)> calls method in (PulsarTableSerializationSchema.java:64) -Constructor (org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.connector.pulsar.table.sink.PulsarWritableMetadata, boolean)> calls method in (PulsarTableSerializationSchema.java:65) -Constructor (org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.connector.pulsar.table.sink.PulsarWritableMetadata, boolean)> calls method in (PulsarTableSerializationSchema.java:66) -Constructor (org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.connector.pulsar.table.sink.PulsarWritableMetadata, boolean)> calls method in (PulsarTableSerializationSchema.java:67) Constructor (org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.api.common.serialization.SerializationSchema, [Lorg.apache.flink.table.data.RowData$FieldGetter;, org.apache.flink.connector.pulsar.table.sink.PulsarWritableMetadata, boolean)> has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (PulsarTableSerializationSchema.java:0) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, [I, org.apache.flink.table.connector.format.EncodingFormat, [I, boolean)> calls method in (PulsarTableSerializationSchemaFactory.java:64) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, [I, org.apache.flink.table.connector.format.EncodingFormat, [I, boolean)> calls method in (PulsarTableSerializationSchemaFactory.java:66) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, [I, org.apache.flink.table.connector.format.EncodingFormat, [I, boolean)> calls method in (PulsarTableSerializationSchemaFactory.java:67) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.EncodingFormat, [I, org.apache.flink.table.connector.format.EncodingFormat, [I, boolean)> calls method in (PulsarTableSerializationSchemaFactory.java:68) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:71) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:72) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:73) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:75) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:76) -Constructor (org.apache.flink.connector.pulsar.table.sink.PulsarTableSerializationSchemaFactory, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.base.DeliveryGuarantee, org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter, org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode, long)> calls method in (PulsarTableSink.java:78) -Constructor (org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.typeinfo.TypeInformation, org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter, boolean)> calls method in (PulsarTableDeserializationSchema.java:73) -Constructor (org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.typeinfo.TypeInformation, org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter, boolean)> calls method in (PulsarTableDeserializationSchema.java:74) -Constructor (org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.typeinfo.TypeInformation, org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter, boolean)> calls method in (PulsarTableDeserializationSchema.java:75) -Constructor (org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.serialization.DeserializationSchema, org.apache.flink.api.common.typeinfo.TypeInformation, org.apache.flink.connector.pulsar.table.source.PulsarRowDataConverter, boolean)> calls method in (PulsarTableDeserializationSchema.java:70) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, [I, org.apache.flink.table.connector.format.DecodingFormat, [I, boolean)> calls method in (PulsarTableDeserializationSchemaFactory.java:98) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, [I, org.apache.flink.table.connector.format.DecodingFormat, [I, boolean)> calls method in (PulsarTableDeserializationSchemaFactory.java:100) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, [I, org.apache.flink.table.connector.format.DecodingFormat, [I, boolean)> calls method in (PulsarTableDeserializationSchemaFactory.java:102) -Constructor (org.apache.flink.table.types.DataType, org.apache.flink.table.connector.format.DecodingFormat, [I, org.apache.flink.table.connector.format.DecodingFormat, [I, boolean)> calls method in (PulsarTableDeserializationSchemaFactory.java:96) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:101) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:102) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:103) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:104) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:96) -Constructor (org.apache.flink.connector.pulsar.table.source.PulsarTableDeserializationSchemaFactory, org.apache.flink.table.connector.format.DecodingFormat, org.apache.flink.table.connector.ChangelogMode, java.util.List, java.util.Properties, org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor, org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor, org.apache.pulsar.client.api.SubscriptionType)> calls method in (PulsarTableSource.java:97) Field is annotated with in (PulsarSourceReader.java:0) Field has generic type > with type argument depending on in (PulsarTypeInformationWrapper.java:0) Field has type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (PulsarTableSerializationSchema.java:0) Field has type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (PulsarTableSerializationSchema.java:0) -Method calls method in (PulsarConfigBuilder.java:131) -Method calls method in (PulsarConfigBuilder.java:132) -Method calls method in (PulsarConfigBuilder.java:72) -Method calls method in (PulsarConfigBuilder.java:67) -Method calls method in (PulsarConfigBuilder.java:68) -Method calls method in (PulsarConfigBuilder.java:99) -Method calls method in (PulsarConfigValidator.java:88) Method has parameter of type <[Lorg.apache.flink.configuration.ConfigOption;> in (PulsarConfigValidator.java:0) -Method calls method in (PulsarConfigValidator.java:62) -Method calls method in (PulsarConfigValidator.java:69) -Method calls method in (DefaultPulsarCrypto.java:103) -Method calls method in (DefaultPulsarCrypto.java:102) -Method calls method in (DefaultPulsarCrypto.java:97) -Method calls method in (PulsarSchema.java:232) -Method calls method in (PulsarSchemaTypeSerializer.java:192) -Method calls method in (PulsarSchemaTypeSerializer.java:188) -Method calls method in (PulsarSchemaTypeSerializer.java:177) -Method calls method in (PulsarSchemaTypeSerializer.java:126) -Method calls method in (PulsarSchemaTypeSerializer.java:77) -Method calls method in (PulsarSchemaTypeSerializer.java:110) -Method calls method in (PulsarSchemaUtils.java:166) -Method calls method in (PulsarSchemaUtils.java:178) -Method calls method in (PulsarSchemaUtils.java:200) -Method calls method in (PulsarSchemaUtils.java:147) Method calls method in (AvroSchemaFactory.java:57) Method calls method in (AvroSchemaFactory.java:57) Method calls method in (JSONSchemaFactory.java:50) Method calls method in (JSONSchemaFactory.java:50) -Method calls method in (PulsarSerdeUtils.java:74) -Method calls method in (PulsarSerdeUtils.java:89) -Method has generic parameter type > with type argument depending on in (PulsarSerdeUtils.java:0) -Method has parameter of type in (PulsarSerdeUtils.java:0) -Method calls method in (PulsarSerdeUtils.java:142) -Method calls method in (PulsarSerdeUtils.java:143) -Method has generic parameter type > with type argument depending on in (PulsarSerdeUtils.java:0) -Method has generic parameter type > with type argument depending on in (PulsarSerdeUtils.java:0) -Method has parameter of type in (PulsarSerdeUtils.java:0) -Method calls method in (PulsarSerdeUtils.java:65) -Method calls method in (PulsarSerdeUtils.java:63) -Method calls method in (PulsarSerdeUtils.java:115) -Method has generic parameter type > with type argument depending on in (PulsarSerdeUtils.java:0) -Method has parameter of type in (PulsarSerdeUtils.java:0) -Method calls method in (PulsarTransactionUtils.java:68) -Method calls method in (PulsarSinkBuilder.java:469) -Method calls method in (PulsarSinkBuilder.java:471) -Method calls method in (PulsarSinkBuilder.java:472) -Method calls method in (PulsarSinkBuilder.java:428) -Method calls method in (PulsarSinkBuilder.java:431) -Method calls method in (PulsarSinkBuilder.java:468) -Method calls method in (PulsarSinkBuilder.java:471) -Method calls method in (PulsarSinkBuilder.java:472) -Method calls method in (PulsarSinkBuilder.java:302) -Method calls method in (PulsarSinkBuilder.java:188) -Method calls method in (PulsarSinkBuilder.java:315) -Method calls method in (PulsarSinkBuilder.java:218) -Method calls method in (PulsarSinkBuilder.java:200) -Method calls method in (PulsarSinkBuilder.java:203) -Method calls method in (PulsarSinkBuilder.java:173) -Method calls method in (PulsarWriter.java:278) -Method calls method in (PulsarMessage.java:101) -Method calls method in (PulsarMessage.java:90) -Method calls method in (PulsarMessage.java:91) -Method calls method in (PulsarMessageBuilder.java:92) -Method calls method in (PulsarMessageBuilder.java:72) -Method calls method in (PulsarMessageBuilder.java:63) -Method calls method in (PulsarMessageBuilder.java:105) -Method calls method in (PulsarMessageBuilder.java:99) -Method calls method in (PulsarMessageBuilder.java:117) -Method calls method in (KeyHashTopicRouter.java:55) -Method calls method in (RoundRobinTopicRouter.java:56) Method is annotated with in (MetadataListener.java:0) -Method calls method in (PulsarSourceBuilder.java:595) -Method calls method in (PulsarSourceBuilder.java:597) -Method calls method in (PulsarSourceBuilder.java:598) -Method calls method in (PulsarSourceBuilder.java:599) -Method calls method in (PulsarSourceBuilder.java:600) -Method calls method in (PulsarSourceBuilder.java:546) -Method calls method in (PulsarSourceBuilder.java:566) -Method calls method in (PulsarSourceBuilder.java:569) -Method calls method in (PulsarSourceBuilder.java:594) -Method calls method in (PulsarSourceBuilder.java:597) -Method calls method in (PulsarSourceBuilder.java:598) -Method calls method in (PulsarSourceBuilder.java:599) -Method calls method in (PulsarSourceBuilder.java:600) -Method calls method in (PulsarSourceBuilder.java:473) -Method calls method in (PulsarSourceBuilder.java:490) -Method calls method in (PulsarSourceBuilder.java:366) -Method calls method in (PulsarSourceBuilder.java:458) -Method calls method in (PulsarSourceBuilder.java:307) -Method calls method in (PulsarSourceBuilder.java:319) -Method calls method in (PulsarSourceBuilder.java:345) Method is annotated with in (SplitAssignerImpl.java:0) -Method calls method in (TopicPatternSubscriber.java:72) -Method calls method in (TopicNameUtils.java:59) -Method calls method in (FixedKeysRangeGenerator.java:106) -Method calls method in (TopicRangeUtils.java:115) -Method calls method in (TopicRangeUtils.java:130) -Method calls method in (TopicRangeUtils.java:101) Method calls method in (PulsarPartitionSplitReader.java:128) Method calls method in (PulsarPartitionSplitReader.java:132) Method calls method in (PulsarPartitionSplitReader.java:137) Method is annotated with in (PulsarPartitionSplitReader.java:0) -Method calls method in (PulsarPartitionSplitReader.java:184) -Method calls method in (PulsarPartitionSplitReader.java:244) Method calls method in (PulsarTypeInformationWrapper.java:58) Method calls method in (PulsarTableOptionUtils.java:146) -Method calls method in (PulsarTableOptionUtils.java:137) Method calls method in (PulsarTableOptionUtils.java:171) -Method calls method in (PulsarTableOptionUtils.java:169) -Method calls method in (PulsarTableOptionUtils.java:306) -Method calls method in (PulsarTableOptionUtils.java:271) Method has parameter of type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (PulsarTableSerializationSchema.java:0) Method calls method in (PulsarTableSerializationSchemaFactory.java:110) Method has return type <[Lorg.apache.flink.table.data.RowData$FieldGetter;> in (PulsarTableSerializationSchemaFactory.java:0) diff --git a/flink-connector-pulsar/archunit-violations/c10e9875-cfe1-4167-b920-9d84d8d91f18 b/flink-connector-pulsar/archunit-violations/c10e9875-cfe1-4167-b920-9d84d8d91f18 index 1638c6a4..77490911 100644 --- a/flink-connector-pulsar/archunit-violations/c10e9875-cfe1-4167-b920-9d84d8d91f18 +++ b/flink-connector-pulsar/archunit-violations/c10e9875-cfe1-4167-b920-9d84d8d91f18 @@ -1,17 +1,5 @@ org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\ * reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ -* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule -org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: only one of the following predicates match:\ -* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension\ -* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ -* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule -org.apache.flink.connector.pulsar.sink.PulsarSinkITCase does not satisfy: only one of the following predicates match:\ -* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ @@ -21,4 +9,4 @@ org.apache.flink.connector.pulsar.source.PulsarSourceITCase does not satisfy: on * reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\ * reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\ * reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\ - or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule + or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule \ No newline at end of file diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java index 559da3ae..6b208eeb 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSink.java @@ -24,6 +24,8 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback; +import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallbackFactory; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittableSerializer; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommitter; @@ -90,6 +92,7 @@ public class PulsarSink implements TwoPhaseCommittingSink topicRouter; private final MessageDelayer messageDelayer; private final PulsarCrypto pulsarCrypto; + private final SinkUserCallbackFactory sinkUserCallbackFactory; PulsarSink( SinkConfiguration sinkConfiguration, @@ -98,7 +101,8 @@ public class PulsarSink implements TwoPhaseCommittingSink topicRouter, MessageDelayer messageDelayer, - PulsarCrypto pulsarCrypto) { + PulsarCrypto pulsarCrypto, + @Nullable SinkUserCallbackFactory sinkUserCallbackFactory) { this.sinkConfiguration = checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); this.metadataListener = checkNotNull(metadataListener); @@ -115,6 +119,7 @@ public class PulsarSink implements TwoPhaseCommittingSink PulsarSinkBuilder builder() { @Override public PrecommittingSinkWriter createWriter(InitContext initContext) throws PulsarClientException { + SinkUserCallback userCallback = null; + if (sinkUserCallbackFactory != null) { + userCallback = sinkUserCallbackFactory.create(); + } + return new PulsarWriter<>( sinkConfiguration, serializationSchema, @@ -138,7 +148,8 @@ public PrecommittingSinkWriter createWriter(InitContext i topicRouter, messageDelayer, pulsarCrypto, - initContext); + initContext, + userCallback); } @Internal diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java index ee6e7db7..c8f3fb0f 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkBuilder.java @@ -26,7 +26,10 @@ import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback; +import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallbackFactory; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; +import org.apache.flink.connector.pulsar.sink.writer.PulsarWriter; import org.apache.flink.connector.pulsar.sink.writer.delayer.MessageDelayer; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRouter; import org.apache.flink.connector.pulsar.sink.writer.router.TopicRoutingMode; @@ -112,6 +115,7 @@ public class PulsarSinkBuilder { private TopicRouter topicRouter; private MessageDelayer messageDelayer; private PulsarCrypto pulsarCrypto; + private SinkUserCallbackFactory userCallbackFactory; // private builder constructor. PulsarSinkBuilder() { @@ -387,6 +391,19 @@ public PulsarSinkBuilder setProperties(Properties properties) { return this; } + /** + * Set a factory for the {@link SinkUserCallback}. A callback is instantiated in each {@link PulsarWriter} + * and disposed of when the app shuts down. + * + * @param userCallbackFactory the factory. + * @return this PuslarSourceBuilder + */ + public PulsarSinkBuilder setUserCallbackFactory( + SinkUserCallbackFactory userCallbackFactory) { + this.userCallbackFactory = userCallbackFactory; + return this; + } + /** * Build the {@link PulsarSink}. * @@ -483,7 +500,8 @@ public PulsarSink build() { topicRoutingMode, topicRouter, messageDelayer, - pulsarCrypto); + pulsarCrypto, + userCallbackFactory); } // ------------- private helpers -------------- diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallback.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallback.java new file mode 100644 index 00000000..80b4c12f --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallback.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.callback; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.connector.pulsar.sink.writer.message.PulsarMessage; + +import org.apache.pulsar.client.api.MessageId; + +/** + * An optional callback interface that users can plug into PulsarSink. + * + * @param The input type of the sink + */ +@PublicEvolving +public interface SinkUserCallback extends AutoCloseable { + /** + * This method is called before the message is sent to the topic. The user can modify the + * message. By default, the same message will be returned. + * + * @param element the element received from the previous operator. + * @param message the message wrapper with the element already serialized. + * @param topic the pulsar topic or partition that the message will be routed to. + */ + void beforeSend(IN element, PulsarMessage message, String topic); + + /** + * This method is called after producer has tried to write the message to the topic. + * + * @param element the element received from the previous operator. + * @param message the message that was sent to the topic. + * @param messageId the topic MessageId, if the send operation was successful. + */ + void onSendSucceeded(IN element, PulsarMessage message, String topic, MessageId messageId); + + /** + * This method is called after producer has tried to write the message to the topic. + * + * @param element the element received from the previous operator. + * @param message the message that was sent to the topic. + * @param topic the topic or partition that the message was sent to. + * @param ex the exception. + */ + void onSendFailed(IN element, PulsarMessage message, String topic, Throwable ex); +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallbackFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallbackFactory.java new file mode 100644 index 00000000..4f2e763e --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/callback/SinkUserCallbackFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.sink.callback; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * A serializable factory for SinkUserCallback. + * + * @param the input type of the Sink + */ +@PublicEvolving +public interface SinkUserCallbackFactory extends Serializable { + SinkUserCallback create(); +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java index 5c93ef3b..0282db78 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java @@ -26,6 +26,7 @@ import org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink.PrecommittingSinkWriter; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.sink.callback.SinkUserCallback; import org.apache.flink.connector.pulsar.sink.committer.PulsarCommittable; import org.apache.flink.connector.pulsar.sink.config.SinkConfiguration; import org.apache.flink.connector.pulsar.sink.writer.context.PulsarSinkContext; @@ -43,12 +44,15 @@ import org.apache.pulsar.client.api.MessageId; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.TypedMessageBuilder; import org.apache.pulsar.client.impl.TypedMessageBuilderImpl; import org.apache.pulsar.shade.com.google.common.base.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collection; import java.util.List; @@ -80,6 +84,8 @@ public class PulsarWriter implements PrecommittingSinkWriter userCallback; + /** * Constructor creating a Pulsar writer. * @@ -91,8 +97,10 @@ public class PulsarWriter implements PrecommittingSinkWriter topicRouter, MessageDelayer messageDelayer, PulsarCrypto pulsarCrypto, - InitContext initContext) + InitContext initContext, + @Nullable SinkUserCallback userCallback) throws PulsarClientException { checkNotNull(sinkConfiguration); this.serializationSchema = checkNotNull(serializationSchema); @@ -131,6 +140,8 @@ public PulsarWriter( throw new FlinkRuntimeException("Cannot initialize schema.", e); } + this.userCallback = userCallback; + // Create this producer register after opening serialization schema! SinkWriterMetricGroup metricGroup = initContext.metricGroup(); this.producerRegister = new ProducerRegister(sinkConfiguration, pulsarCrypto, metricGroup); @@ -157,10 +168,17 @@ public void write(IN element, Context context) throws IOException, InterruptedEx builder.deliverAt(deliverAt); } + // invoke user callback before send + invokeUserCallbackBeforeSend(element, message, topic); + // Perform message sending. if (deliveryGuarantee == DeliveryGuarantee.NONE) { // We would just ignore the sending exception. This may cause data loss. - builder.sendAsync(); + CompletableFuture future = builder.sendAsync(); + future.whenComplete( + (id, ex) -> { + invokeUserCallbackAfterSend(element, message, topic, id, ex); + }); } else { // Increase the pending message count. pendingMessages.incrementAndGet(); @@ -175,10 +193,44 @@ public void write(IN element, Context context) throws IOException, InterruptedEx } else { LOG.debug("Sent message to Pulsar {} with message id {}", topic, id); } + invokeUserCallbackAfterSend(element, message, topic, id, ex); }); } } + private void callSafely(Runnable r) { + try { + r.run(); + } catch (Throwable t) { + LOG.warn("Exception from user callback", t); + } + } + + private void invokeUserCallbackBeforeSend(IN element, PulsarMessage message, String topic) { + if (userCallback == null) { + return; + } + + callSafely(() -> userCallback.beforeSend(element, message, topic)); + } + + private void invokeUserCallbackAfterSend( + IN element, + PulsarMessage message, + String topic, + MessageId messageId, + Throwable exception) { + if (userCallback == null) { + return; + } + + if (exception == null) { + callSafely(() -> userCallback.onSendSucceeded(element, message, topic, messageId)); + } else { + callSafely(() -> userCallback.onSendFailed(element, message, topic, exception)); + } + } + private void throwSendingException(String topic, Throwable ex) { throw new FlinkRuntimeException("Failed to send data to Pulsar: " + topic, ex); } @@ -275,6 +327,6 @@ public Collection prepareCommit() { @Override public void close() throws Exception { // Close all the resources and throw the exception at last. - closeAll(metadataListener, producerRegister); + closeAll(metadataListener, producerRegister, userCallback); } } diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java index 447867cc..c324477b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSource.java @@ -29,6 +29,7 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallbackFactory; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumState; import org.apache.flink.connector.pulsar.source.enumerator.PulsarSourceEnumStateSerializer; @@ -45,6 +46,8 @@ import org.apache.pulsar.client.api.PulsarClientException; +import javax.annotation.Nullable; + /** * The Source implementation of Pulsar. Please use a {@link PulsarSourceBuilder} to construct a * {@link PulsarSource}. The following example shows how to create a PulsarSource emitting records @@ -92,6 +95,8 @@ public final class PulsarSource private final PulsarCrypto pulsarCrypto; + private final SourceUserCallbackFactory userCallbackFactory; + /** * The constructor for PulsarSource, it's package protected for forcing using {@link * PulsarSourceBuilder}. @@ -104,7 +109,8 @@ public final class PulsarSource StopCursor stopCursor, Boundedness boundedness, PulsarDeserializationSchema deserializationSchema, - PulsarCrypto pulsarCrypto) { + PulsarCrypto pulsarCrypto, + @Nullable SourceUserCallbackFactory userCallbackFactory) { this.sourceConfiguration = sourceConfiguration; this.subscriber = subscriber; this.rangeGenerator = rangeGenerator; @@ -113,6 +119,7 @@ public final class PulsarSource this.boundedness = boundedness; this.deserializationSchema = deserializationSchema; this.pulsarCrypto = pulsarCrypto; + this.userCallbackFactory = userCallbackFactory; } /** @@ -134,7 +141,7 @@ public Boundedness getBoundedness() { public SourceReader createReader(SourceReaderContext readerContext) throws Exception { return PulsarSourceReader.create( - sourceConfiguration, deserializationSchema, pulsarCrypto, readerContext); + sourceConfiguration, deserializationSchema, pulsarCrypto, userCallbackFactory, readerContext); } @Internal diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java index 687e9b8f..62f154e6 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceBuilder.java @@ -28,6 +28,8 @@ import org.apache.flink.connector.pulsar.common.config.PulsarConfigBuilder; import org.apache.flink.connector.pulsar.common.config.PulsarOptions; import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallback; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallbackFactory; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StartCursor; import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor; @@ -135,6 +137,7 @@ public final class PulsarSourceBuilder { private Boundedness boundedness; private PulsarDeserializationSchema deserializationSchema; private PulsarCrypto pulsarCrypto; + private SourceUserCallbackFactory userCallbackFactory; // private builder constructor. PulsarSourceBuilder() { @@ -535,6 +538,18 @@ public PulsarSourceBuilder setProperties(Properties properties) { return this; } + /** + * Set a factory for the {@link SourceUserCallback}. + * + * @param callbackFactory the factory. + * @return this PulsarSourceBuilder. + */ + public PulsarSourceBuilder setUserCallbackFactory( + SourceUserCallbackFactory callbackFactory) { + this.userCallbackFactory = callbackFactory; + return this; + } + /** * Build the {@link PulsarSource}. * @@ -611,7 +626,8 @@ public PulsarSource build() { stopCursor, boundedness, deserializationSchema, - pulsarCrypto); + pulsarCrypto, + userCallbackFactory); } // ------------- private helpers -------------- diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallback.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallback.java new file mode 100644 index 00000000..73698348 --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallback.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.callback; + +import org.apache.flink.annotation.PublicEvolving; + +import org.apache.pulsar.client.api.Message; + +/** + * An optional interface that users can plug into PulsarSource. + * + * @param The output type of Source + */ +@PublicEvolving +public interface SourceUserCallback extends AutoCloseable { + /** + * This method is called after the message is handed off to the Collector, with the raw message + * from pulsar, as well as the deserialized value. + * + *

Modifications to the message will not carry forward. + * + * @param rawMessage the raw message from the pulsar topic + * @param deserializedElement the deserialized message body + */ + void process(Message rawMessage, T deserializedElement); +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallbackFactory.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallbackFactory.java new file mode 100644 index 00000000..2896eda9 --- /dev/null +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/callback/SourceUserCallbackFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connector.pulsar.source.callback; + +import org.apache.flink.annotation.PublicEvolving; + +import java.io.Serializable; + +/** + * A serializable factory for SourceUserCallback. + * + * @param the output type of the source + */ +@PublicEvolving +public interface SourceUserCallbackFactory extends Serializable { + SourceUserCallback create(); +} diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java index 2b472975..835c1478 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java @@ -32,6 +32,7 @@ import java.time.Duration; import java.util.Objects; +import java.util.function.Function; import static org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY; import static org.apache.flink.connector.pulsar.common.config.PulsarOptions.PULSAR_STATS_INTERVAL_SECONDS; @@ -77,7 +78,7 @@ public SourceConfiguration(Configuration configuration) { this.enableAutoAcknowledgeMessage = get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE); this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL); this.fetchOneMessageTime = getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0); - this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis); + this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, (Function) Duration::ofMillis); this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS); this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS); this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME); diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java index b7ff3ff3..c4f3227b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarRecordEmitter.java @@ -20,6 +20,7 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.connector.base.source.reader.RecordEmitter; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallback; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplitState; import org.apache.flink.util.Collector; @@ -34,10 +35,12 @@ public class PulsarRecordEmitter implements RecordEmitter, T, PulsarPartitionSplitState> { private final PulsarDeserializationSchema deserializationSchema; + private final SourceUserCallback userCallback; private final SourceOutputWrapper sourceOutputWrapper; - public PulsarRecordEmitter(PulsarDeserializationSchema deserializationSchema) { + public PulsarRecordEmitter(PulsarDeserializationSchema deserializationSchema, SourceUserCallback userCallback) { this.deserializationSchema = deserializationSchema; + this.userCallback = userCallback; this.sourceOutputWrapper = new SourceOutputWrapper<>(); } @@ -49,8 +52,28 @@ public void emitRecord( sourceOutputWrapper.setSourceOutput(output); sourceOutputWrapper.setTimestamp(element); + /** + * We need to wrap the source output collector to pass the message to the user callback. + * As the deserialization logic directly passes to the collector instead of returning a value. + * Since the collector only in takes the deserialized value we cannot re-use the collector object. + * Instead, we create an anonymous collector to capture the element and pass it to the callback. + */ + Collector sourceCallbackCollector = new Collector() { + + @Override + public void collect(T record) { + userCallback.process(element, record); + sourceOutputWrapper.collect(record); + } + + @Override + public void close() { + sourceOutputWrapper.close(); + } + }; + // Deserialize the message and since it to output. - deserializationSchema.deserialize(element, sourceOutputWrapper); + deserializationSchema.deserialize(element, sourceCallbackCollector); splitState.setLatestConsumedId(element.getMessageId()); // Release the messages if we use message pool in Pulsar. diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java index 014d3038..9028f2c9 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReader.java @@ -29,6 +29,8 @@ import org.apache.flink.connector.pulsar.common.crypto.PulsarCrypto; import org.apache.flink.connector.pulsar.common.schema.BytesSchema; import org.apache.flink.connector.pulsar.common.schema.PulsarSchema; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallback; +import org.apache.flink.connector.pulsar.source.callback.SourceUserCallbackFactory; import org.apache.flink.connector.pulsar.source.config.SourceConfiguration; import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema; @@ -46,11 +48,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.SortedMap; import java.util.TreeMap; import java.util.concurrent.ConcurrentHashMap; @@ -77,6 +82,7 @@ public class PulsarSourceReader private final SourceConfiguration sourceConfiguration; private final PulsarClient pulsarClient; + SourceUserCallback userCallback; @VisibleForTesting final SortedMap> cursorsToCommit; private final ConcurrentMap cursorsOfFinishedSplits; private final AtomicReference cursorCommitThrowable; @@ -89,16 +95,18 @@ private PulsarSourceReader( PulsarDeserializationSchema deserializationSchema, SourceConfiguration sourceConfiguration, PulsarClient pulsarClient, + SourceUserCallback userCallback, SourceReaderContext context) { super( elementsQueue, fetcherManager, - new PulsarRecordEmitter<>(deserializationSchema), + new PulsarRecordEmitter<>(deserializationSchema, userCallback), sourceConfiguration, context); this.sourceConfiguration = sourceConfiguration; this.pulsarClient = pulsarClient; + this.userCallback = userCallback; this.cursorsToCommit = Collections.synchronizedSortedMap(new TreeMap<>()); this.cursorsOfFinishedSplits = new ConcurrentHashMap<>(); @@ -213,6 +221,9 @@ public void close() throws Exception { // Close the all the consumers. super.close(); + if (userCallback != null) { + userCallback.close(); + } // Close shared pulsar resources. pulsarClient.shutdown(); @@ -249,9 +260,15 @@ public static PulsarSourceReader create( SourceConfiguration sourceConfiguration, PulsarDeserializationSchema deserializationSchema, PulsarCrypto pulsarCrypto, + @Nullable SourceUserCallbackFactory userCallbackFactory, SourceReaderContext readerContext) throws Exception { + SourceUserCallback userCallback = + Optional.ofNullable(userCallbackFactory) + .map(SourceUserCallbackFactory::create) + .orElse(null); + // Create a message queue with the predefined source option. int queueCapacity = sourceConfiguration.getMessageQueueCapacity(); FutureCompletingBlockingQueue>> elementsQueue = @@ -295,6 +312,7 @@ public static PulsarSourceReader create( deserializationSchema, sourceConfiguration, pulsarClient, + userCallback, readerContext); } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java index 0c653362..a6ffd767 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java @@ -41,6 +41,7 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -65,7 +66,7 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr public MiniClusterTestEnvironment() { Configuration conf = new Configuration(); - conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS); + conf.set(METRIC_FETCHER_UPDATE_INTERVAL, Duration.ofMillis(METRIC_FETCHER_UPDATE_INTERVAL_MS)); TaskExecutorResourceUtils.adjustForLocalExecution(conf); this.miniCluster = new MiniClusterWithClientResource( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java index 01914ccf..c5cdd142 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.pulsar.sink.writer; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -41,12 +43,8 @@ import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition; import org.apache.flink.connector.pulsar.testutils.PulsarTestSuiteBase; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.metrics.groups.OperatorIOMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; -import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; -import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; @@ -63,6 +61,7 @@ import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.flink.connector.base.DeliveryGuarantee.EXACTLY_ONCE; import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_WRITE_SCHEMA_EVOLUTION; +import static org.apache.flink.metrics.groups.UnregisteredMetricsGroup.createSinkWriterMetricGroup; import static org.apache.pulsar.client.api.Schema.STRING; import static org.assertj.core.api.Assertions.assertThat; @@ -105,7 +104,8 @@ private void writeMessageAndVerify( router, delayer, PulsarCrypto.disabled(), - initContext); + initContext, + null); writer.flush(false); writer.prepareCommit(); @@ -162,18 +162,11 @@ public TopicPartition route( private static class MockInitContext implements InitContext { - private final MetricListener metricListener; - private final OperatorIOMetricGroup ioMetricGroup; private final SinkWriterMetricGroup metricGroup; private final ProcessingTimeService timeService; private MockInitContext() { - this.metricListener = new MetricListener(); - this.ioMetricGroup = - UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() - .getIOMetricGroup(); - MetricGroup metricGroup = metricListener.getMetricGroup(); - this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup, ioMetricGroup); + this.metricGroup = createSinkWriterMetricGroup(); this.timeService = new TestProcessingTimeService(); } @@ -226,6 +219,16 @@ public JobID getJobId() { return null; } + @Override + public JobInfo getJobInfo() { + return null; + } + + @Override + public TaskInfo getTaskInfo() { + return null; + } + @Override public SinkWriterMetricGroup metricGroup() { return metricGroup; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java index 54ed4b25..85c114c6 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java @@ -247,7 +247,7 @@ context, operator().client()), SourceConfiguration sourceConfiguration = new SourceConfiguration(configuration); return PulsarSourceReader.create( - sourceConfiguration, deserializationSchema, PulsarCrypto.disabled(), context); + sourceConfiguration, deserializationSchema, PulsarCrypto.disabled(), null, context); } private void setupSourceReader( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java index de0a7298..0a983385 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java @@ -102,24 +102,26 @@ void createFromPulsarSchema() throws Exception { @Test void createFromFlinkTypeInformation() throws Exception { - PulsarDeserializationSchema schema = - new PulsarTypeInformationWrapper<>(Types.STRING, null); - schema.open(new PulsarTestingDeserializationContext(), sourceConfig); - assertThatCode(() -> InstantiationUtil.clone(schema)).doesNotThrowAnyException(); - - String content = "test-content-" + randomAlphanumeric(10); - Message message = - getMessage( - content, - s -> { - DataOutputSerializer serializer = new DataOutputSerializer(10); - StringValue.writeString(s, serializer); - return serializer.getSharedBuffer(); - }); - SingleMessageCollector collector = new SingleMessageCollector<>(); - schema.deserialize(message, collector); - - assertThat(collector.result).isNotNull().isEqualTo(content); + try (StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment()) { + PulsarDeserializationSchema schema = + new PulsarTypeInformationWrapper<>(Types.STRING, executionEnvironment.getConfig()); + schema.open(new PulsarTestingDeserializationContext(), sourceConfig); + assertThatCode(() -> InstantiationUtil.clone(schema)).doesNotThrowAnyException(); + + String content = "test-content-" + randomAlphanumeric(10); + Message message = + getMessage( + content, + s -> { + DataOutputSerializer serializer = new DataOutputSerializer(10); + StringValue.writeString(s, serializer); + return serializer.getSharedBuffer(); + }); + SingleMessageCollector collector = new SingleMessageCollector<>(); + schema.deserialize(message, collector); + + assertThat(collector.result).isNotNull().isEqualTo(content); + } } @Test diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java index c448a087..e392a261 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.AggregatePhaseStrategy; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -53,7 +54,7 @@ void testPulsarDebeziumChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Debezium json into Pulsar ------------------- List lines = readLines("debezium-data-schema-exclude.txt"); @@ -182,7 +183,7 @@ public void testPulsarCanalChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Canal json into Pulsar ------------------- List lines = readLines("canal-data.txt"); @@ -323,7 +324,7 @@ public void testPulsarMaxwellChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Maxwell json into Pulsar ------------------- List lines = readLines("maxwell-data.txt"); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java index fe29b4b8..8c89fa3e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java @@ -97,7 +97,7 @@ public static void waitingExpectedResults( Collections.sort(expected); CommonTestUtils.waitUtil( () -> { - List actual = TestValuesTableFactory.getResults(sinkName); + List actual = TestValuesTableFactory.getResultsAsStrings(sinkName); Collections.sort(actual); return expected.equals(actual); }, diff --git a/pom.xml b/pom.xml index 210b074a..56349679 100644 --- a/pom.xml +++ b/pom.xml @@ -51,8 +51,8 @@ under the License. - 1.18.0 - 1.18.0 + 1.20.0 + 1.20.0 3.0.2 2.12 1.69