Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ public class S3SinkConnectorConfig extends StorageSinkConnectorConfig {

public static final String BEHAVIOR_ON_NULL_VALUES_CONFIG = "behavior.on.null.values";
public static final String BEHAVIOR_ON_NULL_VALUES_DEFAULT = OutputWriteBehavior.FAIL.toString();
public static final String IGNORE_NULL_OR_EMPTY_HEADERS_CONFIG = "ignore.null.or.empty.headers";
public static final Boolean IGNORE_NULL_OR_EMPTY_HEADERS_DEFAULT = false;

public static final String REPORT_NULL_RECORDS_TO_DLQ = "report.null.values.to.dlq";
public static final boolean REPORT_NULL_RECORDS_TO_DLQ_DEFAULT = true;
Expand Down Expand Up @@ -724,6 +726,23 @@ public static ConfigDef newConfigDef() {
"Behavior for null-valued records"
);

configDef.define(
IGNORE_NULL_OR_EMPTY_HEADERS_CONFIG,
Type.BOOLEAN,
IGNORE_NULL_OR_EMPTY_HEADERS_DEFAULT,
Importance.LOW,
"How to handle records with a null or empty headers when store headers is enabled."
+ " If true, the message will be saved even if headers are missing and store "
+ " headers is enabled."
+ " Else, an exception will be thrown if headers are missing and store headers is"
+ " enabled."
+ " This option has no effect if store headers is disabled.",
group,
++orderInGroup,
Width.SHORT,
"Whether to ignore null or empty headers when storing message headers is enabled."
);

configDef.define(
REPORT_NULL_RECORDS_TO_DLQ,
Type.BOOLEAN,
Expand Down Expand Up @@ -1424,6 +1443,10 @@ public boolean shouldRotateOnPartitionChange() {
return getBoolean(ROTATE_FILE_ON_PARTITION_CHANGE);
}

public boolean ignoreNullOrEmptyHeaders() {
return getBoolean(IGNORE_NULL_OR_EMPTY_HEADERS_CONFIG);
}

public enum IgnoreOrFailBehavior {
IGNORE,
FAIL;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void write(SinkRecord sinkRecord) {

// headerWriter != null means writing headers is turned on
if (headerWriter.isPresent()
&& !conf.ignoreNullOrEmptyHeaders()
&& (sinkRecord.headers() == null || sinkRecord.headers().isEmpty())) {
throw new DataException(
String.format("Headers cannot be null for SinkRecord: %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -972,19 +972,7 @@ protected void verify(List<SinkRecord> sinkRecords, long[] validOffsets,
verifyFileListing(validOffsets, partitions, EXTENSION);
}

for (TopicPartition tp : partitions) {
for (int i = 1, j = 0; i < validOffsets.length; ++i) {
long startOffset = validOffsets[i - 1];
long size = validOffsets[i] - startOffset;

FileUtils.fileKeyToCommit(topicsDir, getDirectory(tp.topic(), tp.partition()), tp, startOffset, EXTENSION, ZERO_PAD_FMT);
Collection<Object> records = readRecords(topicsDir, getDirectory(tp.topic(), tp.partition()), tp, startOffset,
EXTENSION, ZERO_PAD_FMT, S3_TEST_BUCKET_NAME, s3);
assertEquals(size, records.size());
verifyContents(sinkRecords, j, records);
j += size;
}
}
verifyFileContents(partitions, validOffsets, sinkRecords);
}

protected void verifyOffsets(
Expand Down Expand Up @@ -1020,5 +1008,20 @@ protected void verifyRawOffsets(
}
assertEquals(actualOffsets, expectedOffsets);
}
}

protected void verifyFileContents(Set<TopicPartition> partitions, long[] validOffsets, List<SinkRecord> sinkRecords) throws IOException {
for (TopicPartition tp : partitions) {
for (int i = 1, j = 0; i < validOffsets.length; ++i) {
long startOffset = validOffsets[i - 1];
long size = validOffsets[i] - startOffset;

FileUtils.fileKeyToCommit(topicsDir, getDirectory(tp.topic(), tp.partition()), tp, startOffset, EXTENSION, ZERO_PAD_FMT);
Collection<Object> records = readRecords(topicsDir, getDirectory(tp.topic(), tp.partition()), tp, startOffset,
EXTENSION, ZERO_PAD_FMT, S3_TEST_BUCKET_NAME, s3);
assertEquals(size, records.size());
verifyContents(sinkRecords, j, records);
j += size;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@
package io.confluent.connect.s3;

import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.sink.ErrantRecordReporter;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
Expand All @@ -35,9 +39,8 @@
import org.powermock.modules.junit4.PowerMockRunner;

import java.io.ByteArrayInputStream;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.io.IOException;
import java.util.*;

import io.confluent.connect.s3.format.avro.AvroUtils;
import io.confluent.connect.s3.storage.S3Storage;
Expand All @@ -46,6 +49,7 @@
import io.confluent.connect.storage.partitioner.DefaultPartitioner;
import io.confluent.connect.storage.partitioner.PartitionerConfig;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -145,6 +149,57 @@ public void testWriteNullRecords() throws Exception {
Mockito.verify(reporter, times(2)).report(any(), any(DataException.class));
}

@Test
public void testAllowNullAndEmptyHeaders() throws Exception {
localProps.put(S3SinkConnectorConfig.ALLOW_NULL_AND_EMPTY_HEADERS_CONFIG, "true");
localProps.put(S3SinkConnectorConfig.STORE_KAFKA_HEADERS_CONFIG, "true");
setUp();

replayAll();
task = new S3SinkTask();
task.initialize(context);
task.start(properties);
verifyAll();

String key = "key";
Schema schema = createSchema();
Struct record = createRecord(schema);

Headers headers = new ConnectHeaders()
.addString("string", "string")
.addInt("int", 12)
.addBoolean("boolean", false);

List<SinkRecord> sinkRecords = createRecords(4, 0);
SinkRecord recordWithHeaders = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 0, null, null, headers);
SinkRecord recordWithNullHeaders = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 1, null, null, null);
SinkRecord recordWithEmptyHeaders = new SinkRecord(TOPIC, PARTITION, Schema.STRING_SCHEMA, key, schema, record, 2, null, null, Collections.emptyList());

sinkRecords.add(recordWithHeaders);
sinkRecords.add(recordWithNullHeaders);
sinkRecords.add(recordWithEmptyHeaders);

task.put(sinkRecords);

task.close(context.assignment());
task.stop();

Set<TopicPartition> partitions = Collections.singleton(new TopicPartition(TOPIC, PARTITION));
String valueExtension = ".avro";
String headersExtension = ".headers.avro";

long[] validOffsets = {0, 3, 6};

List<String> expectedFiles = new ArrayList<>();
for (TopicPartition tp : partitions) {
expectedFiles.addAll(getExpectedFiles(validOffsets, tp, valueExtension));
expectedFiles.addAll(getExpectedFiles(validOffsets, tp, headersExtension));
}

verifyFileListing(expectedFiles);
verifyFileContents(partitions, validOffsets, sinkRecords);
}

@Test
public void testWriteRecordWithPrimitives() throws Exception {
setUp();
Expand Down Expand Up @@ -260,6 +315,4 @@ public void testAclCannedConfig() throws Exception {
task.initialize(context);
task.start(properties);
}

}