Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -103,26 +103,26 @@ private PulsarSourceOptions() {
"The source would use pulsar client's internal mechanism and commit cursor in a given interval.")
.build());

public static final ConfigOption<Long> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
public static final ConfigOption<Duration> PULSAR_AUTO_COMMIT_CURSOR_INTERVAL =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "autoCommitCursorInterval")
.longType()
.defaultValue(Duration.ofSeconds(5).toMillis())
.durationType()
.defaultValue(Duration.ofSeconds(5))
.withDescription(
Description.builder()
.text(
"This option is used only when the user disables the checkpoint and uses Exclusive or Failover subscription.")
.text(
" We would automatically commit the cursor using the given period (in ms).")
" We would automatically commit the cursor using the given duration.")
.build());

public static final ConfigOption<Integer> PULSAR_FETCH_ONE_MESSAGE_TIME =
public static final ConfigOption<Duration> PULSAR_FETCH_ONE_MESSAGE_TIME =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "fetchOneMessageTime")
.intType()
.durationType()
.noDefaultValue()
.withDescription(
Description.builder()
.text(
"The time (in ms) for fetching one message from Pulsar. If time exceed and no message returned from Pulsar.")
"The time to wait for fetching one message from Pulsar. If time exceeded and no message returned from Pulsar.")
.text(
" We would consider there is no record at the current topic partition and stop fetching until next switch.")
.linebreak()
Expand All @@ -134,13 +134,13 @@ private PulsarSourceOptions() {
" Add this option in source builder avoiding waiting too long.")
.build());

public static final ConfigOption<Long> PULSAR_MAX_FETCH_TIME =
public static final ConfigOption<Duration> PULSAR_MAX_FETCH_TIME =
ConfigOptions.key(SOURCE_CONFIG_PREFIX + "maxFetchTime")
.longType()
.defaultValue(Duration.ofSeconds(10).toMillis())
.durationType()
.defaultValue(Duration.ofSeconds(10))
.withDescription(
Description.builder()
.text("The maximum time (in ms) to wait when fetching records.")
.text("The maximum time to wait when fetching records.")
.text(" A longer time increases throughput but also latency.")
.text(
" A fetch batch might be finished earlier because of %s.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class SourceConfiguration extends PulsarConfiguration {
private final long partitionDiscoveryIntervalMs;
private final boolean enableAutoAcknowledgeMessage;
private final long autoCommitCursorInterval;
private final int fetchOneMessageTime;
private final long fetchOneMessageTime;
private final Duration maxFetchTime;
private final int maxFetchRecords;
private final CursorVerification verifyInitialOffsets;
Expand All @@ -78,9 +78,10 @@ public SourceConfiguration(Configuration configuration) {
this.messageQueueCapacity = get(ELEMENT_QUEUE_CAPACITY);
this.partitionDiscoveryIntervalMs = get(PULSAR_PARTITION_DISCOVERY_INTERVAL_MS);
this.enableAutoAcknowledgeMessage = get(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE);
this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL);
this.fetchOneMessageTime = getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).orElse(0);
this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME, Duration::ofMillis);
this.autoCommitCursorInterval = get(PULSAR_AUTO_COMMIT_CURSOR_INTERVAL).toMillis();
this.fetchOneMessageTime =
getOptional(PULSAR_FETCH_ONE_MESSAGE_TIME).map(Duration::toMillis).orElse(0L);
this.maxFetchTime = get(PULSAR_MAX_FETCH_TIME);
this.maxFetchRecords = get(PULSAR_MAX_FETCH_RECORDS);
this.verifyInitialOffsets = get(PULSAR_VERIFY_INITIAL_OFFSETS);
this.subscriptionName = get(PULSAR_SUBSCRIPTION_NAME);
Expand Down Expand Up @@ -136,7 +137,7 @@ public long getAutoCommitCursorInterval() {
* messages in {@link RecordsWithSplitIds} when meet this timeout and no message consumed.
*/
public int getFetchOneMessageTime() {
return fetchOneMessageTime;
return (int) fetchOneMessageTime;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
Expand All @@ -65,7 +66,9 @@ public class MiniClusterTestEnvironment implements TestEnvironment, ClusterContr

public MiniClusterTestEnvironment() {
Configuration conf = new Configuration();
conf.set(METRIC_FETCHER_UPDATE_INTERVAL, METRIC_FETCHER_UPDATE_INTERVAL_MS);
conf.set(
METRIC_FETCHER_UPDATE_INTERVAL,
Duration.ofMillis(METRIC_FETCHER_UPDATE_INTERVAL_MS));
TaskExecutorResourceUtils.adjustForLocalExecution(conf);
this.miniCluster =
new MiniClusterWithClientResource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.connector.pulsar.sink.writer;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobInfo;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.serialization.SerializationSchema;
Expand All @@ -45,7 +47,7 @@
import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
import org.apache.flink.metrics.testutils.MetricListener;
import org.apache.flink.runtime.mailbox.SyncMailboxExecutor;
import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
import org.apache.flink.runtime.metrics.groups.MetricsGroupTestUtils;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.util.UserCodeClassLoader;
Expand Down Expand Up @@ -173,7 +175,8 @@ private MockInitContext() {
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
.getIOMetricGroup();
MetricGroup metricGroup = metricListener.getMetricGroup();
this.metricGroup = InternalSinkWriterMetricGroup.mock(metricGroup, ioMetricGroup);
this.metricGroup =
MetricsGroupTestUtils.mockWriterMetricGroup(metricGroup, ioMetricGroup);
this.timeService = new TestProcessingTimeService();
}

Expand Down Expand Up @@ -251,6 +254,16 @@ public UserCodeClassLoader getUserCodeClassLoader() {
}
};
}

@Override
public JobInfo getJobInfo() {
return null;
}

@Override
public TaskInfo getTaskInfo() {
return null;
}
}

private static class MockSinkWriterContext implements SinkWriter.Context {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.pulsar.client.api.Schema;
import org.junit.jupiter.api.Test;

import java.time.Duration;

import static java.util.Collections.singletonList;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric;
Expand Down Expand Up @@ -95,8 +97,8 @@ void publishTimeStopCursor() throws Exception {
private SourceConfiguration sourceConfig() {
Configuration config = operator().config();
config.set(PULSAR_MAX_FETCH_RECORDS, 1);
config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
config.set(PULSAR_MAX_FETCH_TIME, 3000L);
config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);
return new SourceConfiguration(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
Expand Down Expand Up @@ -107,6 +108,7 @@ void consumeMessageCreatedBeforeHandleSplitsChanges() throws Exception {
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0);
fetchedMessages(splitReader, 0, true);
}
Expand All @@ -118,6 +120,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToEarliestPosition()
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.earliest);
fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
}
Expand All @@ -128,6 +131,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndResetToLatestPosition() th
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
seekStartPositionAndHandleSplit(splitReader, topicName, 0, MessageId.latest);
fetchedMessages(splitReader, 0, true);
}
Expand All @@ -139,6 +143,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageIdCurs
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand Down Expand Up @@ -214,6 +219,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesWithoutSeek() throws Exceptio
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0);
fetchedMessages(splitReader, 0, true);
}
Expand All @@ -225,6 +231,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseLatestStartCursorWithou
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.latest);
fetchedMessages(splitReader, 0, true);
}
Expand All @@ -236,6 +243,7 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseEarliestStartCursorWith
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10));
waitForTopicMetadataReady(topicName, 0);
handleSplit(splitReader, topicName, 0, MessageId.earliest);
fetchedMessages(splitReader, NUM_RECORDS_PER_PARTITION, true);
}
Expand All @@ -247,6 +255,8 @@ void consumeMessageCreatedBeforeHandleSplitsChangesAndUseSecondLastMessageWithou
String topicName = randomAlphabetic(10);

operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), 20, false);
waitForTopicMetadataReady(topicName, 0);

MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand All @@ -272,6 +282,7 @@ void consumeBatchMessageFromRecover() throws Exception {

int numRecords = 20;
operator().setupTopic(topicName, STRING, () -> randomAlphabetic(10), numRecords, true);
waitForTopicMetadataReady(topicName, 0);
MessageIdImpl lastMessageId =
(MessageIdImpl)
operator()
Expand Down Expand Up @@ -310,8 +321,8 @@ private PulsarPartitionSplitReader splitReader() {
private SourceConfiguration sourceConfig() {
Configuration config = operator().config();
config.set(PULSAR_MAX_FETCH_RECORDS, 1);
config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
config.set(PULSAR_MAX_FETCH_TIME, 3000L);
config.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
config.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
config.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));
config.set(PULSAR_ENABLE_AUTO_ACKNOWLEDGE_MESSAGE, true);

Expand Down Expand Up @@ -402,4 +413,21 @@ private List<Message<byte[]>> fetchedMessages(

return messages;
}

// Wait for topic metadata to stabilize after setupTopic
private void waitForTopicMetadataReady(String topicName, int partitionId) throws Exception {
String partitionTopicName = topicNameWithPartition(topicName, partitionId);
waitUtil(
() -> {
try {
MessageId id =
operator().admin().topics().getLastMessageId(partitionTopicName);
return id != null;
} catch (Exception e) {
return false;
}
},
ofSeconds(30),
"Topic metadata not ready for " + partitionTopicName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ private PulsarSourceReader<Integer> sourceReader() throws Exception {
Configuration configuration = operator().config();

configuration.set(PULSAR_MAX_FETCH_RECORDS, 1);
configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, 2000);
configuration.set(PULSAR_MAX_FETCH_TIME, 3000L);
configuration.set(PULSAR_FETCH_ONE_MESSAGE_TIME, Duration.ofSeconds(2));
configuration.set(PULSAR_MAX_FETCH_TIME, Duration.ofSeconds(3));
configuration.set(PULSAR_SUBSCRIPTION_NAME, randomAlphabetic(10));

PulsarDeserializationSchema<Integer> deserializationSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.pulsar.source.reader.deserializer;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
Expand Down Expand Up @@ -103,7 +104,7 @@ void createFromPulsarSchema() throws Exception {
@Test
void createFromFlinkTypeInformation() throws Exception {
PulsarDeserializationSchema<String> schema =
new PulsarTypeInformationWrapper<>(Types.STRING, null);
new PulsarTypeInformationWrapper<>(Types.STRING, new ExecutionConfig());
schema.open(new PulsarTestingDeserializationContext(), sourceConfig);
assertThatCode(() -> InstantiationUtil.clone(schema)).doesNotThrowAnyException();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.formats.json.maxwell.MaxwellJsonFormatFactory;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.config.AggregatePhaseStrategy;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.api.config.OptimizerConfigOptions;
import org.apache.flink.test.junit5.MiniClusterExtension;
Expand Down Expand Up @@ -53,7 +54,9 @@ void testPulsarDebeziumChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
tableConf.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
AggregatePhaseStrategy.TWO_PHASE);

// ---------- Write the Debezium json into Pulsar -------------------
List<String> lines = readLines("debezium-data-schema-exclude.txt");
Expand Down Expand Up @@ -182,7 +185,9 @@ public void testPulsarCanalChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
tableConf.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
AggregatePhaseStrategy.TWO_PHASE);

// ---------- Write the Canal json into Pulsar -------------------
List<String> lines = readLines("canal-data.txt");
Expand Down Expand Up @@ -323,7 +328,9 @@ public void testPulsarMaxwellChangelogSource() throws Exception {
tableConf.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ALLOW_LATENCY, Duration.ofSeconds(1));
tableConf.set(ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_SIZE, 5000L);
tableConf.set(OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY, "TWO_PHASE");
tableConf.set(
OptimizerConfigOptions.TABLE_OPTIMIZER_AGG_PHASE_STRATEGY,
AggregatePhaseStrategy.TWO_PHASE);

// ---------- Write the Maxwell json into Pulsar -------------------
List<String> lines = readLines("maxwell-data.txt");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ public static void waitingExpectedResults(
Collections.sort(expected);
CommonTestUtils.waitUtil(
() -> {
List<String> actual = TestValuesTableFactory.getResults(sinkName);
List<String> actual = TestValuesTableFactory.getResultsAsStrings(sinkName);
Collections.sort(actual);
return expected.equals(actual);
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class PulsarContainerRuntime implements PulsarRuntime {
private static final String PULSAR_ADMIN_URL =
String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT);

private static final String CURRENT_VERSION = "3.0.0";
private static final String CURRENT_VERSION = "3.0.5";

private final PulsarContainer container;
private final AtomicBoolean started;
Expand Down
Loading
Loading