Skip to content

Commit e655f5c

Browse files
authored
Support log level override for direct runner (#37100)
1 parent 92913b7 commit e655f5c

File tree

2 files changed

+49
-1
lines changed

2 files changed

+49
-1
lines changed

runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.beam.sdk.metrics.MetricResults;
3838
import org.apache.beam.sdk.metrics.MetricsEnvironment;
3939
import org.apache.beam.sdk.options.PipelineOptions;
40+
import org.apache.beam.sdk.options.SdkHarnessOptions;
4041
import org.apache.beam.sdk.runners.PTransformOverride;
4142
import org.apache.beam.sdk.transforms.PTransform;
4243
import org.apache.beam.sdk.util.UserCodeException;
@@ -184,7 +185,7 @@ public DirectPipelineResult run(Pipeline pipeline) {
184185

185186
DisplayDataValidator.validatePipeline(pipeline);
186187
DisplayDataValidator.validateOptions(options);
187-
188+
SdkHarnessOptions.getConfiguredLoggerFromOptions(options.as(SdkHarnessOptions.class));
188189
ExecutorService metricsPool =
189190
Executors.newCachedThreadPool(
190191
new ThreadFactoryBuilder()

runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import java.util.concurrent.TimeoutException;
5050
import java.util.concurrent.atomic.AtomicInteger;
5151
import java.util.concurrent.atomic.AtomicLong;
52+
import java.util.logging.Level;
53+
import java.util.logging.LogManager;
5254
import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
5355
import org.apache.beam.sdk.Pipeline;
5456
import org.apache.beam.sdk.PipelineResult;
@@ -75,6 +77,7 @@
7577
import org.apache.beam.sdk.transforms.Create;
7678
import org.apache.beam.sdk.transforms.DoFn;
7779
import org.apache.beam.sdk.transforms.Flatten;
80+
import org.apache.beam.sdk.transforms.Impulse;
7881
import org.apache.beam.sdk.transforms.MapElements;
7982
import org.apache.beam.sdk.transforms.PTransform;
8083
import org.apache.beam.sdk.transforms.ParDo;
@@ -744,6 +747,50 @@ public interface TestSerializationOfOptions extends PipelineOptions {
744747
void setIgnoredField(String value);
745748
}
746749

750+
@Test
751+
public void testLogLevel() {
752+
PipelineOptions options =
753+
PipelineOptionsFactory.fromArgs(
754+
new String[] {
755+
"--runner=DirectRunner",
756+
"--defaultSdkHarnessLogLevel=ERROR",
757+
"--sdkHarnessLogLevelOverrides={\"org.apache.beam.runners.direct.DirectRunnerTest\":\"INFO\"}"
758+
})
759+
.create();
760+
Pipeline pipeline = Pipeline.create(options);
761+
762+
LogManager logManager = LogManager.getLogManager();
763+
// use full name to avoid conflicts with org.slf4j.Logger
764+
java.util.logging.Logger rootLogger = logManager.getLogger("");
765+
Level originalLevel = rootLogger.getLevel();
766+
767+
try {
768+
pipeline
769+
.apply(Impulse.create())
770+
.apply(
771+
ParDo.of(
772+
new DoFn<byte[], byte[]>() {
773+
@ProcessElement
774+
public void process(@Element byte[] element, OutputReceiver<byte[]> o) {
775+
LogManager logManager = LogManager.getLogManager();
776+
java.util.logging.Logger rootLogger = logManager.getLogger("");
777+
// check loglevel here. Whether actual logs are rendered depends on slf4j impl
778+
// and upstream configs.
779+
assertEquals(Level.SEVERE, rootLogger.getLevel());
780+
assertEquals(
781+
Level.INFO,
782+
java.util.logging.Logger.getLogger(
783+
"org.apache.beam.runners.direct.DirectRunnerTest")
784+
.getLevel());
785+
}
786+
}));
787+
pipeline.run();
788+
} finally {
789+
// resume original log level
790+
rootLogger.setLevel(originalLevel);
791+
}
792+
}
793+
747794
private static class LongNoDecodeCoder extends AtomicCoder<Long> {
748795
@Override
749796
public void encode(Long value, OutputStream outStream) throws IOException {}

0 commit comments

Comments
 (0)