Skip to content

Issues when running under cp-kafka-connect v8.0.0 #341

@wijnanjo

Description

@wijnanjo

When running the connector in Confluent Platform's connect docker image v8.0.0 (confluentinc/cp-kafka-connect:8.0.0) it works fine up to the moment that OpenSearch returns a bulkresponse containing erroritems. For example, OpenSearch may return 'version_conflict_engine_exception' which the connector handles as configured (eg. log the error and continue). This is where things don't work anymore (note that everything works up until latest v7.x.x of cp-kafka-connect) as after a few retries the connector stops with a generic 'flush timeout'.

I observe this behavior with connector version 3.1.1 and main branch.

After adding extra logstatments inside callWithRetry of BulkProcessor.java we get an explicit class initialization error. Here's the adapted code and the logs:

return callWithRetry("bulk processing", () -> {
                try {
                    final var response = client.bulk(new BulkRequest()
                            .add(batch.stream().map(DocWriteWrapper::getDocWriteRequest).collect(Collectors.toList())),
                            RequestOptions.DEFAULT);
                    if (!response.hasFailures()) {
                        // We only logged failures, so log the success immediately after a failure ...
                        LOGGER.debug("Completed batch {} of {} records", batchId, batch.size());
                        return response;
                    }
                    LOGGER.debug("V8: batch failed {}", batchId);
                    for (final var itemResponse : response.getItems()) {
                        LOGGER.debug("V8: have itemResponse for batch {}: {}", batchId, itemResponse);                        
                        if (itemResponse.isFailed()) {
                            LOGGER.debug("V8: itemResponse failed for batch {}", batchId);
                            LOGGER.debug("accessing failure");
                            // !!! java.lang.NoClassDefFoundError !!!
                            var failure = itemResponse.getFailure();
                            LOGGER.debug("V8: failure: {}", failure);
...
[2025-07-31 10:09:29,287] DEBUG request [POST https://query.icts-q-es-cloud-playground-sec.service.qsvcd:9200/_bulk?timeout=1m] returned [HTTP/1.1 200 OK] (org.opensearch.client.RestClient)
[2025-07-31 10:09:29,289] DEBUG V8: batch failed 6 (io.aiven.kafka.connect.opensearch.BulkProcessor)
[2025-07-31 10:09:29,289] DEBUG V8: have itemResponse for batch 6: org.opensearch.action.bulk.BulkItemResponse@23c4dcc9 (io.aiven.kafka.connect.opensearch.BulkProcessor)
[2025-07-31 10:09:29,289] DEBUG V8: itemResponse failed for batch 6 (io.aiven.kafka.connect.opensearch.BulkProcessor)
[2025-07-31 10:09:29,289] DEBUG accessing failure (io.aiven.kafka.connect.opensearch.BulkProcessor)
[2025-07-31 10:09:29,289] DEBUG V8: failure: [!!!org.opensearch.action.bulk.BulkItemResponse$Failure@7a940d28=>java.lang.NoClassDefFoundError:Could not initialize class org.opensearch.ExceptionsHelper!!!] (io.aiven.kafka.connect.opensearch.BulkProcessor)
[2025-07-31 10:09:29,290] DEBUG V8: itemResponse NOT aborted for batch 6: org.opensearch.action.bulk.BulkItemResponse@23c4dcc9 (io.aiven.kafka.connect.opensearch.BulkProcessor)

For some reason class org.opensearch.ExceptionsHelper fails to initialize, this probably blocks the thread, preventing the batch to be completed and ultimately stopping the connector.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions