Skip to content

[BUG] DLQ failing to write on shutdown #6321

@dlvenable

Description

@dlvenable

Describe the bug

When Data Prepper is shutting down, the opensearch sink can report DLQ errors.

2025-11-21T23:36:24.393 [s3-pipeline-sink-worker-2-thread-2] ERROR org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink - Failed to write a document to the DLQ
java.io.IOException: Failed to write to S3 dlq.
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.putObject(S3DlqWriter.java:140)
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.lambda$timedPutObject$0(S3DlqWriter.java:126)
        at io.micrometer.core.instrument.composite.CompositeTimer.recordCallable(CompositeTimer.java:129)
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.timedPutObject(S3DlqWriter.java:126)
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.doWrite(S3DlqWriter.java:114)
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.write(S3DlqWriter.java:95)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.logFailureForDlqObjects(OpenSearchSink.java:640)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.logFailureForBulkRequests(OpenSearchSink.java:601)
        at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:470)
        at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleFailures(BulkRetryStrategy.java:332)
        at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetriesAndFailures(BulkRetryStrategy.java:312)
        at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.handleRetry(BulkRetryStrategy.java:351)
        at org.opensearch.dataprepper.plugins.sink.opensearch.BulkRetryStrategy.execute(BulkRetryStrategy.java:217)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.lambda$flushBatch$17(OpenSearchSink.java:585)
        at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:582)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.flushBatch(OpenSearchSink.java:755)
        at org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSink.doOutput(OpenSearchSink.java:537)
        at org.opensearch.dataprepper.model.sink.AbstractSink.lambda$output$0(AbstractSink.java:71)
        at io.micrometer.core.instrument.composite.CompositeTimer.record(CompositeTimer.java:141)
        at org.opensearch.dataprepper.model.sink.AbstractSink.output(AbstractSink.java:71)
        at org.opensearch.dataprepper.core.pipeline.Pipeline.lambda$publishToSinks$8(Pipeline.java:407)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.lang.IllegalStateException: Interrupted waiting to refresh a cached value.
        at software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:224)
        at software.amazon.awssdk.utils.cache.CachedSupplier.get(CachedSupplier.java:135)
        at software.amazon.awssdk.services.sts.auth.StsCredentialsProvider.resolveCredentials(StsCredentialsProvider.java:106)
        at software.amazon.awssdk.auth.credentials.AwsCredentialsProvider.resolveIdentity(AwsCredentialsProvider.java:54)
        at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.lambda$trySelectAuthScheme$6(S3AuthSchemeInterceptor.java:169)
        at software.amazon.awssdk.core.internal.util.MetricUtils.reportDuration(MetricUtils.java:81)
        at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.trySelectAuthScheme(S3AuthSchemeInterceptor.java:169)
        at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.selectAuthScheme(S3AuthSchemeInterceptor.java:87)
        at software.amazon.awssdk.services.s3.auth.scheme.internal.S3AuthSchemeInterceptor.beforeExecution(S3AuthSchemeInterceptor.java:67)
        at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.lambda$beforeExecution$1(ExecutionInterceptorChain.java:59)
        at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
        at software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain.beforeExecution(ExecutionInterceptorChain.java:59)
        at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.runInitialInterceptors(AwsExecutionContextBuilder.java:319)
        at software.amazon.awssdk.awscore.internal.AwsExecutionContextBuilder.invokeInterceptorsAndCreateExecutionContext(AwsExecutionContextBuilder.java:155)
        at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.invokeInterceptorsAndCreateExecutionContext(AwsSyncClientHandler.java:67)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:76)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:182)
        at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:74)
        at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
        at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:53)
        at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:11202)
        at org.opensearch.dataprepper.plugins.dlq.s3.S3DlqWriter.putObject(S3DlqWriter.java:137)
        ... 26 more
Caused by: java.lang.InterruptedException
        at java.base/java.util.concurrent.locks.ReentrantLock$Sync.tryLockNanos(ReentrantLock.java:167)
        at java.base/java.util.concurrent.locks.ReentrantLock.tryLock(ReentrantLock.java:479)
        at software.amazon.awssdk.utils.cache.CachedSupplier.refreshCache(CachedSupplier.java:194)
        ... 47 more

Relevant code in the opensearch sink for shutting down.

  1. shutdown calls closeFiles:
  2. closeFiles calls close on DlqWriter:
  3. S3DlqWriter closes the AWS SDK client for S3:
    @Override
    public void close() throws IOException {
    s3Client.close();
    }

I believe that the issue here may even be broader than the DLQ. The sinks generally have their own buffers. These buffers hold custom data for each sink. However, the Data Prepper shutdown process doesn't have a mechanism to know whether those buffers are full or not. The data in these buffers has already been checkpointed in the pipeline buffer.

There may be a couple different ways to handle this.
Option 1: Use the end-to-end acknowledgement framework to track in-flight data rather than then buffers.
Option 2: Provide a way to flush sinks before shutting down the sinks.

To Reproduce
N/A

Expected behavior

Data Prepper sinks should flush their data.

Screenshots

N/A

Environment (please complete the following information):

Data Prepper 2.12

Additional context

N/A

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    Status

    Unplanned

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions