1818import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BATCH_SIZE_CONFIG;
1919import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_MALFORMED_DOCS_CONFIG;
2020import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BEHAVIOR_ON_NULL_VALUES_CONFIG;
21- import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_PASSWORD_CONFIG;
2221import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_URL_CONFIG;
23- import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.CONNECTION_USERNAME_CONFIG;
2422import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_DATASET_CONFIG;
2523import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.DATA_STREAM_TYPE_CONFIG;
2624import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.IGNORE_KEY_CONFIG;
2927import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_IN_FLIGHT_REQUESTS_CONFIG;
3028import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.MAX_RETRIES_CONFIG;
3129import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.RETRY_BACKOFF_MS_CONFIG;
32- import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SECURITY_PROTOCOL_CONFIG;
33- import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SSL_CONFIG_PREFIX;
3430import static io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WRITE_METHOD_CONFIG;
3531import static org.junit.Assert.assertEquals;
3632import static org.junit.Assert.assertFalse;
4642
4743import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnMalformedDoc;
4844import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.BehaviorOnNullValues;
49- import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.SecurityProtocol;
5045import io.confluent.connect.elasticsearch.ElasticsearchSinkConnectorConfig.WriteMethod;
5146import io.confluent.connect.elasticsearch.helper.ElasticsearchContainer;
5247import io.confluent.connect.elasticsearch.helper.ElasticsearchHelperClient;
5853import java.util.Map;
5954import java.util.concurrent.CompletableFuture;
6055import java.util.concurrent.TimeUnit;
61- import org.apache.kafka.common.config.SslConfigs;
62- import org.apache.kafka.common.record.TimestampType;
6356import org.apache.kafka.connect.data.Schema;
6457import org.apache.kafka.connect.data.SchemaBuilder;
6558import org.apache.kafka.connect.data.Struct;
6659import org.apache.kafka.connect.errors.ConnectException;
6760import org.apache.kafka.connect.sink.ErrantRecordReporter;
6861import org.apache.kafka.connect.sink.SinkRecord;
69- import org.apache.kafka.test.TestUtils;
70- import org.elasticsearch.ElasticsearchStatusException;
7162import org.elasticsearch.action.DocWriteRequest;
7263import org.elasticsearch.action.bulk.BulkItemResponse;
7364import org.elasticsearch.index.VersionType;
7465import org.elasticsearch.search.SearchHit;
7566import org.junit.After;
76- import org.junit.AfterClass;
7767import org.junit.Before;
7868import org.junit.BeforeClass;
7969import org.junit.Test;
8070
81- public class ElasticsearchClientTest {
82-
83- private static final String INDEX = "index";
84- private static final String ELASTIC_SUPERUSER_NAME = "elastic";
85- private static final String ELASTIC_SUPERUSER_PASSWORD = "elastic";
86- private static final String TOPIC = "index";
87- private static final String DATA_STREAM_TYPE = "logs";
88- private static final String DATA_STREAM_DATASET = "dataset";
89-
90- private static ElasticsearchContainer container;
91-
92- private DataConverter converter;
93- private ElasticsearchHelperClient helperClient;
94- private ElasticsearchSinkConnectorConfig config;
95- private Map<String, String> props;
96- private String index;
97- private OffsetTracker offsetTracker;
71+ public class ElasticsearchClientTest extends ElasticsearchClientTestBase {
9872
9973 @BeforeClass
10074 public static void setupBeforeAll() {
10175 container = ElasticsearchContainer.fromSystemProperties();
10276 container.start();
10377 }
10478
105- @AfterClass
106- public static void cleanupAfterAll() {
107- container.close();
108- }
109-
11079 @Before
11180 public void setup() {
11281 index = TOPIC;
@@ -747,43 +716,6 @@ public void testNoVersionConflict() throws Exception {
747716 client2.close();
748717 }
749718
750- @Test
751- public void testSsl() throws Exception {
752- container.close();
753- container = ElasticsearchContainer.fromSystemProperties().withSslEnabled(true);
754- container.start();
755-
756- String address = container.getConnectionUrl(false);
757- props.put(CONNECTION_URL_CONFIG, address);
758- props.put(CONNECTION_USERNAME_CONFIG, ELASTIC_SUPERUSER_NAME);
759- props.put(CONNECTION_PASSWORD_CONFIG, ELASTIC_SUPERUSER_PASSWORD);
760- props.put(SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SSL.name());
761- props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, container.getKeystorePath());
762- props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, container.getKeystorePassword());
763- props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, container.getTruststorePath());
764- props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, container.getTruststorePassword());
765- props.put(SSL_CONFIG_PREFIX + SslConfigs.SSL_KEY_PASSWORD_CONFIG, container.getKeyPassword());
766- config = new ElasticsearchSinkConnectorConfig(props);
767- converter = new DataConverter(config);
768-
769- ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets());
770- helperClient = new ElasticsearchHelperClient(address, config,
771- container.shouldStartClientInCompatibilityMode());
772- client.createIndexOrDataStream(index);
773-
774- writeRecord(sinkRecord(0), client);
775- client.flush();
776-
777- waitUntilRecordsInES(1);
778- assertEquals(1, helperClient.getDocCount(index));
779- client.close();
780- helperClient = null;
781-
782- container.close();
783- container = ElasticsearchContainer.fromSystemProperties();
784- container.start();
785- }
786-
787719 @Test
788720 public void testWriteDataStreamInjectTimestamp() throws Exception {
789721 props.put(DATA_STREAM_TYPE_CONFIG, DATA_STREAM_TYPE);
@@ -805,72 +737,11 @@ public void testWriteDataStreamInjectTimestamp() throws Exception {
805737 client.close();
806738 }
807739
808- private String createIndexName(String name) {
809- return config.isDataStream()
810- ? String.format("%s-%s-%s", DATA_STREAM_TYPE, DATA_STREAM_DATASET, name)
811- : name;
812- }
813-
814740 @Test
815741 public void testConnectionUrlExtraSlash() {
816742 props.put(CONNECTION_URL_CONFIG, container.getConnectionUrl() + "/");
817743 config = new ElasticsearchSinkConnectorConfig(props);
818744 ElasticsearchClient client = new ElasticsearchClient(config, null, () -> offsetTracker.updateOffsets());
819745 client.close();
820746 }
821-
822- private static Schema schema() {
823- return SchemaBuilder
824- .struct()
825- .name("record")
826- .field("offset", SchemaBuilder.int32().defaultValue(0).build())
827- .field("another", SchemaBuilder.int32().defaultValue(0).build())
828- .build();
829- }
830-
831- private static SinkRecord sinkRecord(int offset) {
832- return sinkRecord("key", offset);
833- }
834-
835- private static SinkRecord sinkRecord(String key, int offset) {
836- Struct value = new Struct(schema()).put("offset", offset).put("another", offset + 1);
837- return sinkRecord(key, schema(), value, offset);
838- }
839-
840- private static SinkRecord sinkRecord(String key, Schema schema, Struct value, int offset) {
841- return new SinkRecord(
842- TOPIC,
843- 0,
844- Schema.STRING_SCHEMA,
845- key,
846- schema,
847- value,
848- offset,
849- System.currentTimeMillis(),
850- TimestampType.CREATE_TIME
851- );
852- }
853-
854- private void waitUntilRecordsInES(int expectedRecords) throws InterruptedException {
855- TestUtils.waitForCondition(
856- () -> {
857- try {
858- return helperClient.getDocCount(index) == expectedRecords;
859- } catch (ElasticsearchStatusException e) {
860- if (e.getMessage().contains("index_not_found_exception")) {
861- return false;
862- }
863-
864- throw e;
865- }
866- },
867- TimeUnit.MINUTES.toMillis(1),
868- String.format("Could not find expected documents (%d) in time.", expectedRecords)
869- );
870- }
871-
872- private void writeRecord(SinkRecord record, ElasticsearchClient client) {
873- client.index(record, converter.convertRecord(record, createIndexName(record.topic())),
874- new AsyncOffsetTracker.AsyncOffsetState(record.kafkaOffset()));
875- }
876747}
0 commit comments