diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java index 3346fed8..72b20261 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/PulsarSourceOptions.java @@ -103,26 +103,26 @@ private PulsarSourceOptions() { "The source would use pulsar client's internal mechanism and commit cursor in a given interval.") .build()); - public static final ConfigOption PULSAR_AUTO_COMMIT_CURSOR_INTERVAL = + public static final ConfigOption PULSAR_AUTO_COMMIT_CURSOR_INTERVAL = ConfigOptions.key(SOURCE_CONFIG_PREFIX + "autoCommitCursorInterval") - .longType() - .defaultValue(Duration.ofSeconds(5).toMillis()) + .durationType() + .defaultValue(Duration.ofSeconds(5)) .withDescription( Description.builder() .text( "This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription.") .text( - " We would automatically commit the cursor using the given period (in ms).") + " We would automatically commit the cursor using the given duration.") .build()); - public static final ConfigOption PULSAR_FETCH_ONE_MESSAGE_TIME = + public static final ConfigOption PULSAR_FETCH_ONE_MESSAGE_TIME = ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime") - .intType() + .durationType() .noDefaultValue() .withDescription( Description.builder() .text( - "The time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar.") + "The time to wait for fetching one message from Pulsar. If time exceeded and no message returned from Pulsar.") .text( " We would consider there is no record at the current topic partition and stop fetching until next switch.") .linebreak() @@ -134,13 +134,13 @@ private PulsarSourceOptions() { " Add this option in source builder avoiding waiting too long.") .build()); - public static final ConfigOption PULSAR_MAX_FETCH_TIME = + public static final ConfigOption PULSAR_MAX_FETCH_TIME = ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime") - .longType() - .defaultValue(Duration.ofSeconds(10).toMillis()) + .durationType() + .defaultValue(Duration.ofSeconds(10)) .withDescription( Description.builder() - .text("The maximum time (in ms) to wait when fetching records.") + .text("The maximum time to wait when fetching records.") .text(" A longer time increases throughput but also latency.") .text( " A fetch batch might be finished earlier because of %s.", diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java index b10a2bb8..fbeeb76b 100644 --- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java +++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/config/SourceConfiguration.java @@ -60,7 +60,7 @@ public class SourceConfiguration extends PulsarConfiguration { private final long partitionDiscoveryIntervalMs; private final boolean enableAutoAcknowledgeMessage; private final long autoCommitCursorInterval; - private final int fetchOneMessageTime; + private final long fetchOneMessageTime; private final Duration maxFetchTime; private final int maxFetchRecords; private final CursorVerification verifyInitialOffsets; @@ -78,9 +78,10 @@ public SourceConfiguration(Configuration configuration) { this.messageQueueCapacity = get(ELEMENT_QUEUE_CAPACITY); this.partitionDiscoveryIntervalMs = get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS); this.enableAutoAcknowledgeMessage = get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE); - this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL); - this.fetchOneMessageTime = getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0); - this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis); + this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL).toMillis(); + this.fetchOneMessageTime = + getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).map(Duration::toMillis).orElse(0L); + this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME); this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS); this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS); this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME); @@ -136,7 +137,7 @@ public long getAutoCommitCursorInterval() { * messages in {@link RecordsWithSplitIds} when meet this timeout and no message consumed. */ public int getFetchOneMessageTime() { - return fetchOneMessageTime; + return (int) fetchOneMessageTime; } /** diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java index 0c653362..83a7f36e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/common/MiniClusterTestEnvironment.java @@ -41,6 +41,7 @@ import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.concurrent.ExecutionException; @@ -65,7 +66,9 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr public MiniClusterTestEnvironment() { Configuration conf = new Configuration(); - conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS); + conf.set( + METRIC_FETCHER_UPDATE_INTERVAL, + Duration.ofMillis(METRIC_FETCHER_UPDATE_INTERVAL_MS)); TaskExecutorResourceUtils.adjustForLocalExecution(conf); this.miniCluster = new MiniClusterWithClientResource( diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java index 01914ccf..d24f2305 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriterTest.java @@ -19,6 +19,8 @@ package org.apache.flink.connector.pulsar.sink.writer; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.JobInfo; +import org.apache.flink.api.common.TaskInfo; import org.apache.flink.api.common.operators.MailboxExecutor; import org.apache.flink.api.common.operators.ProcessingTimeService; import org.apache.flink.api.common.serialization.SerializationSchema; @@ -45,7 +47,7 @@ import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; -import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.util.UserCodeClassLoader; @@ -173,7 +175,8 @@ private MockInitContext() { UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() .getIOMetricGroup(); MetricGroup metricGroup = metricListener.getMetricGroup(); - this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup, ioMetricGroup); + this.metricGroup = + MetricsGroupTestUtils.mockWriterMetricGroup(metricGroup, ioMetricGroup); this.timeService = new TestProcessingTimeService(); } @@ -251,6 +254,16 @@ public UserCodeClassLoader getUserCodeClassLoader() { } }; } + + @Override + public JobInfo getJobInfo() { + return null; + } + + @Override + public TaskInfo getTaskInfo() { + return null; + } } private static class MockSinkWriterContext implements SinkWriter.Context { diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java index fc490730..2536753f 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/enumerator/cursor/StopCursorTest.java @@ -33,6 +33,8 @@ import org.apache.pulsar.client.api.Schema; import org.junit.jupiter.api.Test; +import java.time.Duration; + import static java.util.Collections.singletonList; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; @@ -95,8 +97,8 @@ void publishTimeStopCursor() throws Exception { private SourceConfiguration sourceConfig() { Configuration config = operator().config(); config.set(PULSAR_MAX_FETCH_RECORDS, 1); - config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000); - config.set(PULSAR_MAX_FETCH_TIME, 3000L); + config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2)); + config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3)); config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true); return new SourceConfiguration(config); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java index b2ef7d99..4b439a06 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarPartitionSplitReaderTest.java @@ -37,6 +37,7 @@ import org.junit.jupiter.api.Test; import java.io.IOException; +import java.time.Duration; import java.util.ArrayList; import java.util.BitSet; import java.util.List; @@ -107,6 +108,7 @@ void consumeMessageCreatedBeforeHandleSplitsChanges() throws Exception { String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); seekStartPositionAndHandleSplit(splitReader, topicName, 0); fetchedMessages(splitReader, 0, true); } @@ -118,6 +120,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition() String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.earliest); fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true); } @@ -128,6 +131,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() th String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.latest); fetchedMessages(splitReader, 0, true); } @@ -139,6 +143,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCurs String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); MessageIdImpl lastMessageId = (MessageIdImpl) operator() @@ -214,6 +219,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek() throws Exceptio String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); handleSplit(splitReader, topicName, 0); fetchedMessages(splitReader, 0, true); } @@ -225,6 +231,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithou String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); handleSplit(splitReader, topicName, 0, MessageId.latest); fetchedMessages(splitReader, 0, true); } @@ -236,6 +243,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWith String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10)); + waitForTopicMetadataReady(topicName, 0); handleSplit(splitReader, topicName, 0, MessageId.earliest); fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true); } @@ -247,6 +255,8 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou String topicName = randomAlphabetic(10); operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false); + waitForTopicMetadataReady(topicName, 0); + MessageIdImpl lastMessageId = (MessageIdImpl) operator() @@ -272,6 +282,7 @@ void consumeBatchMessageFromRecover() throws Exception { int numRecords = 20; operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true); + waitForTopicMetadataReady(topicName, 0); MessageIdImpl lastMessageId = (MessageIdImpl) operator() @@ -310,8 +321,8 @@ private PulsarPartitionSplitReader splitReader() { private SourceConfiguration sourceConfig() { Configuration config = operator().config(); config.set(PULSAR_MAX_FETCH_RECORDS, 1); - config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000); - config.set(PULSAR_MAX_FETCH_TIME, 3000L); + config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2)); + config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3)); config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true); @@ -402,4 +413,21 @@ private List> fetchedMessages( return messages; } + + // Wait for topic metadata to stabilize after setupTopic + private void waitForTopicMetadataReady(String topicName, int partitionId) throws Exception { + String partitionTopicName = topicNameWithPartition(topicName, partitionId); + waitUtil( + () -> { + try { + MessageId id = + operator().admin().topics().getLastMessageId(partitionTopicName); + return id != null; + } catch (Exception e) { + return false; + } + }, + ofSeconds(30), + "Topic metadata not ready for " + partitionTopicName); + } } diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java index 54ed4b25..3d4619dd 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/PulsarSourceReaderTest.java @@ -228,8 +228,8 @@ private PulsarSourceReader sourceReader() throws Exception { Configuration configuration = operator().config(); configuration.set(PULSAR_MAX_FETCH_RECORDS, 1); - configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000); - configuration.set(PULSAR_MAX_FETCH_TIME, 3000L); + configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2)); + configuration.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3)); configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10)); PulsarDeserializationSchema deserializationSchema = diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java index de0a7298..ecdd820e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/reader/deserializer/PulsarDeserializationSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.pulsar.source.reader.deserializer; +import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; @@ -103,7 +104,7 @@ void createFromPulsarSchema() throws Exception { @Test void createFromFlinkTypeInformation() throws Exception { PulsarDeserializationSchema schema = - new PulsarTypeInformationWrapper<>(Types.STRING, null); + new PulsarTypeInformationWrapper<>(Types.STRING, new ExecutionConfig()); schema.open(new PulsarTestingDeserializationContext(), sourceConfig); assertThatCode(() -> InstantiationUtil.clone(schema)).doesNotThrowAnyException(); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java index c448a087..f95cb8f5 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/PulsarChangelogTableITCase.java @@ -23,6 +23,7 @@ import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory; import org.apache.flink.table.api.TableConfig; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.config.AggregatePhaseStrategy; import org.apache.flink.table.api.config.ExecutionConfigOptions; import org.apache.flink.table.api.config.OptimizerConfigOptions; import org.apache.flink.test.junit5.MiniClusterExtension; @@ -53,7 +54,9 @@ void testPulsarDebeziumChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, + AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Debezium json into Pulsar ------------------- List lines = readLines("debezium-data-schema-exclude.txt"); @@ -182,7 +185,9 @@ public void testPulsarCanalChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, + AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Canal json into Pulsar ------------------- List lines = readLines("canal-data.txt"); @@ -323,7 +328,9 @@ public void testPulsarMaxwellChangelogSource() throws Exception { tableConf.set( ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1)); tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L); - tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE"); + tableConf.set( + OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, + AggregatePhaseStrategy.TWO_PHASE); // ---------- Write the Maxwell json into Pulsar ------------------- List lines = readLines("maxwell-data.txt"); diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java index fe29b4b8..8c89fa3e 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/table/testutils/PulsarTableTestUtils.java @@ -97,7 +97,7 @@ public static void waitingExpectedResults( Collections.sort(expected); CommonTestUtils.waitUtil( () -> { - List actual = TestValuesTableFactory.getResults(sinkName); + List actual = TestValuesTableFactory.getResultsAsStrings(sinkName); Collections.sort(actual); return expected.equals(actual); }, diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java index 097927ea..9bd22524 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java @@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime { private static final String PULSAR_ADMIN_URL = String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT); - private static final String CURRENT_VERSION = "3.0.0"; + private static final String CURRENT_VERSION = "3.0.5"; private final PulsarContainer container; private final AtomicBoolean started; diff --git a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java index aece2846..76d058dc 100644 --- a/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java +++ b/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/source/PulsarSourceTestContext.java @@ -50,7 +50,7 @@ public abstract class PulsarSourceTestContext extends PulsarTestContext implements DataStreamSourceExternalContext { - private static final long DISCOVERY_INTERVAL = 1000L; + private static final long DISCOVERY_INTERVAL_MS = 1000L; private static final int BATCH_DATA_SIZE = 300; protected PulsarSourceTestContext(PulsarTestEnvironment environment) { @@ -65,7 +65,7 @@ protected PulsarSourceTestContext(PulsarTestEnvironment environment) { .setServiceUrl(operator.serviceUrl()) .setTopicPattern(topicPattern(), AllTopics) .setSubscriptionName(subscriptionName()) - .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, DISCOVERY_INTERVAL); + .setConfig(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS, DISCOVERY_INTERVAL_MS); // Set extra configuration for source builder. setSourceBuilder(builder); @@ -117,7 +117,7 @@ protected void setSourceBuilder(PulsarSourceBuilder builder) { /** * The topic pattern which is used in Pulsar topic auto discovery. It was discovered every - * {@link #DISCOVERY_INTERVAL} ms; + * {@link #DISCOVERY_INTERVAL_MS}; */ protected abstract String topicPattern(); diff --git a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE index d8f02d81..92433289 100644 --- a/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE +++ b/flink-sql-connector-pulsar/src/main/resources/META-INF/NOTICE @@ -7,9 +7,9 @@ The Apache Software Foundation (http://www.apache.org/). This project bundles the following dependencies under the Apache Software License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt) - com.fasterxml.jackson.core:jackson-annotations:2.13.4 -- org.apache.pulsar:pulsar-client-admin-api:3.0.2 -- org.apache.pulsar:pulsar-client-all:3.0.2 -- org.apache.pulsar:pulsar-client-api:3.0.2 +- org.apache.pulsar:pulsar-client-admin-api:3.0.5 +- org.apache.pulsar:pulsar-client-all:3.0.5 +- org.apache.pulsar:pulsar-client-api:3.0.5 This project bundles the following dependencies under the Bouncy Castle license. See bundled license files for details. diff --git a/pom.xml b/pom.xml index 210b074a..b31790af 100644 --- a/pom.xml +++ b/pom.xml @@ -51,9 +51,8 @@ under the License. - 1.18.0 - 1.18.0 - 3.0.2 + 1.20.3 + 3.0.5 2.12 1.69 @@ -81,6 +80,7 @@ under the License. 2.24.0 3.3 2.13.4.20221013 + 1.1.10.4 1.7.0 0.6.1 @@ -391,7 +391,7 @@ under the License. ${jackson-bom.version} - + org.apache.commons @@ -399,38 +399,30 @@ under the License. ${commons-compress.version} - org.apache.commons commons-lang3 ${commons-lang3.version} - commons-io commons-io ${commons-io.version} - - net.bytebuddy byte-buddy ${byte-buddy.version} - - com.esotericsoftware.kryo kryo ${kryo.version} - - org.objenesis objenesis @@ -443,7 +435,6 @@ under the License. ${protobuf.version} - org.scala-lang scala-reflect @@ -455,6 +446,14 @@ under the License. scala-library ${scala-library.version} + + + org.xerial.snappy + snappy-java + ${snappy.java.version} + + + @@ -492,7 +491,7 @@ under the License. org.apache.flink flink-ci-tools - ${flink-ci-tools.version} + ${flink.version}