|
10 | 10 | import static org.assertj.core.api.Assertions.assertThatThrownBy; |
11 | 11 | import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; |
12 | 12 | import static org.awaitility.Awaitility.await; |
| 13 | +import static org.mockito.ArgumentMatchers.any; |
13 | 14 | import static org.mockito.ArgumentMatchers.anyList; |
14 | 15 | import static org.mockito.ArgumentMatchers.argThat; |
15 | 16 | import static org.mockito.Mockito.doThrow; |
16 | 17 | import static org.mockito.Mockito.reset; |
| 18 | +import static org.mockito.Mockito.times; |
| 19 | +import static org.mockito.Mockito.verify; |
17 | 20 | import static org.mockito.Mockito.when; |
18 | 21 |
|
19 | 22 | import io.opentelemetry.api.internal.GuardedBy; |
20 | 23 | import io.opentelemetry.internal.testing.slf4j.SuppressLogger; |
21 | 24 | import io.opentelemetry.sdk.common.CompletableResultCode; |
22 | 25 | import io.opentelemetry.sdk.logs.SdkLoggerProvider; |
23 | 26 | import io.opentelemetry.sdk.logs.data.LogRecordData; |
| 27 | +import java.time.Duration; |
24 | 28 | import java.util.ArrayList; |
25 | 29 | import java.util.Arrays; |
26 | 30 | import java.util.Collection; |
@@ -139,6 +143,32 @@ void builderAdjustMaxBatchSize() { |
139 | 143 | .isEqualTo(BatchLogRecordProcessorBuilder.DEFAULT_MAX_EXPORT_BATCH_SIZE); |
140 | 144 | } |
141 | 145 |
|
| 146 | + @Test |
| 147 | + void maxExportBatchSizeExceedsQueueSize() throws InterruptedException { |
| 148 | + // Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n = |
| 149 | + // maxQueueSize logs are emitted, export is triggered and that the queue is fully drained and |
| 150 | + // exported. |
| 151 | + int maxQueueSize = 2048; |
| 152 | + when(mockLogRecordExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess()); |
| 153 | + SdkLoggerProvider sdkLoggerProvider = |
| 154 | + SdkLoggerProvider.builder() |
| 155 | + .addLogRecordProcessor( |
| 156 | + BatchLogRecordProcessor.builder(mockLogRecordExporter) |
| 157 | + .setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE)) |
| 158 | + .setMaxExportBatchSize(2049) |
| 159 | + .setMaxQueueSize(maxQueueSize) |
| 160 | + .build()) |
| 161 | + .build(); |
| 162 | + |
| 163 | + for (int i = 0; i < maxQueueSize; i++) { |
| 164 | + emitLog(sdkLoggerProvider, "log " + i); |
| 165 | + } |
| 166 | + |
| 167 | + Thread.sleep(10); |
| 168 | + |
| 169 | + verify(mockLogRecordExporter, times(1)).export(any()); |
| 170 | + } |
| 171 | + |
142 | 172 | @Test |
143 | 173 | void emitMultipleLogs() { |
144 | 174 | WaitingLogRecordExporter waitingLogRecordExporter = |
|
0 commit comments