Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,17 @@
import io.opentelemetry.api.metrics.MeterProvider;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* Builder class for {@link BatchLogRecordProcessor}.
*
* @since 1.27.0
*/
public final class BatchLogRecordProcessorBuilder {
private static final Logger logger =
Logger.getLogger(BatchLogRecordProcessorBuilder.class.getName());

// Visible for testing
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 1000;
Expand Down Expand Up @@ -103,6 +107,9 @@ long getExporterTimeoutNanos() {
*/
public BatchLogRecordProcessorBuilder setMaxQueueSize(int maxQueueSize) {
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
if (maxExportBatchSize > maxQueueSize) {
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
}
this.maxQueueSize = maxQueueSize;
return this;
}
Expand All @@ -124,6 +131,9 @@ int getMaxQueueSize() {
*/
public BatchLogRecordProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
if (maxExportBatchSize > maxQueueSize) {
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
}
Comment on lines +134 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this generate a false positive depending on the order the setters are called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about change the log level in build() to warning, remove these or use fine level here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I think removing them from here would be ok in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

made the change in #7148

this.maxExportBatchSize = maxExportBatchSize;
return this;
}
Expand All @@ -150,6 +160,10 @@ int getMaxExportBatchSize() {
* @return a new {@link BatchLogRecordProcessor}.
*/
public BatchLogRecordProcessor build() {
if (maxExportBatchSize > maxQueueSize) {
maxExportBatchSize = maxQueueSize;
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
}
return new BatchLogRecordProcessor(
logRecordExporter,
meterProvider,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,21 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.api.internal.GuardedBy;
import io.opentelemetry.internal.testing.slf4j.SuppressLogger;
import io.opentelemetry.sdk.common.CompletableResultCode;
import io.opentelemetry.sdk.logs.SdkLoggerProvider;
import io.opentelemetry.sdk.logs.data.LogRecordData;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -117,6 +121,46 @@ void builderInvalidConfig() {
.hasMessage("maxQueueSize must be positive.");
}

@Test
void builderAdjustMaxBatchSize() {
LogRecordExporter dummyExporter = new CompletableLogRecordExporter();

BatchLogRecordProcessorBuilder builder =
BatchLogRecordProcessor.builder(dummyExporter)
.setMaxQueueSize(513)
.setMaxExportBatchSize(1000);
builder.build();

assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
}

@Test
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
// maxQueueSize logs are emitted, export is triggered and that the queue is fully drained and
// exported.
int maxQueueSize = 2048;
when(mockLogRecordExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
SdkLoggerProvider sdkLoggerProvider =
SdkLoggerProvider.builder()
.addLogRecordProcessor(
BatchLogRecordProcessor.builder(mockLogRecordExporter)
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
.setMaxExportBatchSize(2049)
.setMaxQueueSize(maxQueueSize)
.build())
.build();

for (int i = 0; i < maxQueueSize; i++) {
emitLog(sdkLoggerProvider, "log " + i);
}

Thread.sleep(10);

verify(mockLogRecordExporter, times(1)).export(any());
}

@Test
void emitMultipleLogs() {
WaitingLogRecordExporter waitingLogRecordExporter =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
import io.opentelemetry.api.metrics.MeterProvider;
import java.time.Duration;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

/** Builder class for {@link BatchSpanProcessor}. */
public final class BatchSpanProcessorBuilder {
private static final Logger logger = Logger.getLogger(BatchSpanProcessorBuilder.class.getName());

// Visible for testing
static final long DEFAULT_SCHEDULE_DELAY_MILLIS = 5000;
Expand Down Expand Up @@ -111,6 +114,9 @@ long getExporterTimeoutNanos() {
*/
public BatchSpanProcessorBuilder setMaxQueueSize(int maxQueueSize) {
checkArgument(maxQueueSize > 0, "maxQueueSize must be positive.");
if (maxExportBatchSize > maxQueueSize) {
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
}
this.maxQueueSize = maxQueueSize;
return this;
}
Expand All @@ -132,6 +138,9 @@ int getMaxQueueSize() {
*/
public BatchSpanProcessorBuilder setMaxExportBatchSize(int maxExportBatchSize) {
checkArgument(maxExportBatchSize > 0, "maxExportBatchSize must be positive.");
if (maxExportBatchSize > maxQueueSize) {
logger.log(Level.WARNING, "maxExportBatchSize should not exceed maxQueueSize.");
}
this.maxExportBatchSize = maxExportBatchSize;
return this;
}
Expand All @@ -158,6 +167,10 @@ int getMaxExportBatchSize() {
* @return a new {@link BatchSpanProcessor}.
*/
public BatchSpanProcessor build() {
if (maxExportBatchSize > maxQueueSize) {
maxExportBatchSize = maxQueueSize;
logger.log(Level.FINE, "Using maxExportBatchSize: {0}", maxExportBatchSize);
}
return new BatchSpanProcessor(
spanExporter,
exportUnsampledSpans,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.opentelemetry.api.internal.GuardedBy;
Expand All @@ -26,6 +28,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.samplers.Sampler;
import io.opentelemetry.sdk.trace.samplers.SamplingResult;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -126,6 +129,44 @@ void builderInvalidConfig() {
.hasMessage("maxQueueSize must be positive.");
}

@Test
void builderAdjustMaxBatchSize() {
SpanExporter dummyExporter = new CompletableSpanExporter();

BatchSpanProcessorBuilder builder =
BatchSpanProcessor.builder(dummyExporter).setMaxQueueSize(513).setMaxExportBatchSize(1000);
builder.build();

assertThat(builder.getMaxExportBatchSize()).isEqualTo(513);
assertThat(builder.getMaxQueueSize()).isEqualTo(513);
}

@Test
void maxExportBatchSizeExceedsQueueSize() throws InterruptedException {
// Given a processor configured with a maxExportBatchSize > maxQueueSize, ensure that after n =
// maxQueueSize spans are ended, export is triggered and that the queue is fully drained and
// exported.
int maxQueueSize = 2048;
when(mockSpanExporter.export(any())).thenReturn(CompletableResultCode.ofSuccess());
sdkTracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(
BatchSpanProcessor.builder(mockSpanExporter)
.setScheduleDelay(Duration.ofSeconds(Integer.MAX_VALUE))
.setMaxExportBatchSize(2049)
.setMaxQueueSize(maxQueueSize)
.build())
.build();

for (int i = 0; i < maxQueueSize; i++) {
createEndedSpan("span " + i);
}

Thread.sleep(10);

verify(mockSpanExporter, times(1)).export(any());
}

@Test
void startEndRequirements() {
BatchSpanProcessor spansProcessor =
Expand Down
Loading