Skip to content

Commit ef09103

Browse files
committed
Address comments.
1 parent 2d9f260 commit ef09103

File tree

2 files changed

+2
-3
lines changed

2 files changed

+2
-3
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/main/java/org/apache/flink/cdc/connectors/kafka/utils/KafkaSinkUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public static Map<Selectors, String> parseSelectorsToTopicMap(String mappingRule
4141
String[] selectorsAndTopic = mapping.split(DELIMITER_SELECTOR_TOPIC);
4242
Preconditions.checkArgument(
4343
selectorsAndTopic.length == 2,
44-
"Please check you configuration of "
44+
"Please check your configuration of "
4545
+ KafkaDataSinkOptions.SINK_TABLE_ID_TO_TOPIC_MAPPING);
4646
Selectors selectors =
4747
new Selectors.SelectorsBuilder().includeTables(selectorsAndTopic[0]).build();

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-kafka/src/test/java/org/apache/flink/cdc/connectors/kafka/sink/KafkaDataSinkITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,8 +324,7 @@ void testCanalJsonFormat() throws Exception {
324324

325325
final List<ConsumerRecord<byte[], byte[]>> collectedRecords =
326326
drainAllRecordsFromTopic(topic, false, 0);
327-
final long recordsCount = 5;
328-
assertThat(recordsCount).isEqualTo(collectedRecords.size());
327+
assertThat(collectedRecords).hasSize(5);
329328
for (ConsumerRecord<byte[], byte[]> consumerRecord : collectedRecords) {
330329
assertThat(
331330
consumerRecord

0 commit comments

Comments
 (0)