Skip to content

Commit c0be7d5

Browse files
committed
[FLINK-36278] Reduce log size by avoiding container logs by default
Currently, container logs appear under an o.a.f logger and thus are visible on CI. This results in compressed log size >40MB for a run and often leads to download errors. This PR reroutes container logs to a special container logger. It also uses a custom format to significantly reduce the size of each log line. The logs for containers are disabled by default.
1 parent 52e7e58 commit c0be7d5

File tree

14 files changed

+125
-85
lines changed

14 files changed

+125
-85
lines changed

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientSchemaRegistryITCase.java

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import org.apache.flink.api.common.time.Deadline;
2222
import org.apache.flink.connector.kafka.testutils.DockerImageVersions;
23+
import org.apache.flink.connector.kafka.testutils.KafkaUtil;
2324
import org.apache.flink.connector.testframe.container.FlinkContainers;
2425
import org.apache.flink.connector.testframe.container.TestcontainersSettings;
2526
import org.apache.flink.test.resources.ResourceTestUtils;
@@ -40,11 +41,8 @@
4041
import org.junit.ClassRule;
4142
import org.junit.Test;
4243
import org.junit.rules.Timeout;
43-
import org.slf4j.Logger;
44-
import org.slf4j.LoggerFactory;
4544
import org.testcontainers.containers.KafkaContainer;
4645
import org.testcontainers.containers.Network;
47-
import org.testcontainers.containers.output.Slf4jLogConsumer;
4846
import org.testcontainers.utility.DockerImageName;
4947

5048
import java.nio.file.Path;
@@ -60,9 +58,6 @@
6058

6159
/** End-to-end test for SQL client using Avro Confluent Registry format. */
6260
public class SQLClientSchemaRegistryITCase {
63-
private static final Logger LOG = LoggerFactory.getLogger(SQLClientSchemaRegistryITCase.class);
64-
private static final Slf4jLogConsumer LOG_CONSUMER = new Slf4jLogConsumer(LOG);
65-
6661
public static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
6762
public static final String INTER_CONTAINER_REGISTRY_ALIAS = "registry";
6863
private static final Path sqlAvroJar = ResourceTestUtils.getResource(".*avro.jar");
@@ -78,10 +73,9 @@ public class SQLClientSchemaRegistryITCase {
7873

7974
@ClassRule
8075
public static final KafkaContainer KAFKA =
81-
new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
76+
KafkaUtil.createKafkaContainer(SQLClientSchemaRegistryITCase.class)
8277
.withNetwork(NETWORK)
83-
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS)
84-
.withLogConsumer(LOG_CONSUMER);
78+
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
8579

8680
@ClassRule
8781
public static final SchemaRegistryContainer REGISTRY =
@@ -92,7 +86,11 @@ public class SQLClientSchemaRegistryITCase {
9286
.dependsOn(KAFKA);
9387

9488
public final TestcontainersSettings testcontainersSettings =
95-
TestcontainersSettings.builder().network(NETWORK).logger(LOG).dependsOn(KAFKA).build();
89+
TestcontainersSettings.builder()
90+
.network(NETWORK)
91+
.logger(KafkaUtil.getLogger("flink", SQLClientSchemaRegistryITCase.class))
92+
.dependsOn(KAFKA)
93+
.build();
9694

9795
public final FlinkContainers flink =
9896
FlinkContainers.builder().withTestcontainersSettings(testcontainersSettings).build();

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SmokeKafkaITCase.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@
4545
import org.junit.jupiter.api.Test;
4646
import org.junit.jupiter.api.extension.ExtendWith;
4747
import org.junit.jupiter.api.extension.RegisterExtension;
48-
import org.slf4j.Logger;
49-
import org.slf4j.LoggerFactory;
5048
import org.testcontainers.containers.KafkaContainer;
5149
import org.testcontainers.containers.Network;
5250
import org.testcontainers.junit.jupiter.Container;
@@ -62,7 +60,6 @@
6260
import java.util.UUID;
6361
import java.util.stream.Collectors;
6462

65-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
6663
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
6764
import static org.assertj.core.api.Assertions.assertThat;
6865

@@ -71,20 +68,22 @@
7168
@Testcontainers
7269
class SmokeKafkaITCase {
7370

74-
private static final Logger LOG = LoggerFactory.getLogger(SmokeKafkaITCase.class);
7571
private static final String INTER_CONTAINER_KAFKA_ALIAS = "kafka";
7672
private static final Network NETWORK = Network.newNetwork();
7773
private static final String EXAMPLE_JAR_MATCHER = "flink-streaming-kafka-test.*";
7874

7975
@Container
8076
public static final KafkaContainer KAFKA_CONTAINER =
81-
createKafkaContainer(KAFKA, LOG)
77+
createKafkaContainer(SmokeKafkaITCase.class)
8278
.withEmbeddedZookeeper()
8379
.withNetwork(NETWORK)
8480
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
8581

8682
public static final TestcontainersSettings TESTCONTAINERS_SETTINGS =
87-
TestcontainersSettings.builder().logger(LOG).dependsOn(KAFKA_CONTAINER).build();
83+
TestcontainersSettings.builder()
84+
.logger(KafkaUtil.getLogger("flink", SmokeKafkaITCase.class))
85+
.dependsOn(KAFKA_CONTAINER)
86+
.build();
8887

8988
@RegisterExtension
9089
public static final FlinkContainers FLINK =

flink-connector-kafka-e2e-tests/flink-end-to-end-tests-common-kafka/src/test/resources/log4j2-test.properties

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,3 +32,27 @@ appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
3232
#logger.yarn.name = org.testcontainers.shaded.com.github.dockerjava.core
3333
#logger.yarn.level = WARN
3434
#logger.yarn.appenderRef.console.ref = TestLogger
35+
36+
# Logger configuration for containers, by default this is off
37+
# If you want to investigate test failures, overwrite the level as above
38+
logger.container.name = container
39+
logger.container.level = OFF
40+
logger.container.additivity = false # This prevents messages from being logged by the root logger
41+
logger.container.appenderRef.containerappender.ref = ContainerLogger
42+
43+
logger.kafkacontainer.name = container.kafka
44+
logger.kafkacontainer.level = OFF
45+
46+
logger.flinkcontainer.name = container.flink
47+
logger.flinkcontainer.level = OFF
48+
49+
logger.flinkenv.name = org.apache.flink.connector.testframe.container.FlinkContainerTestEnvironment
50+
logger.flinkenv.level = OFF
51+
logger.flinkenv.additivity = false # This prevents messages from being logged by the root logger
52+
logger.flinkenv.appenderRef.containerappender.ref = ContainerLogger
53+
54+
appender.containerappender.name = ContainerLogger
55+
appender.containerappender.type = CONSOLE
56+
appender.containerappender.target = SYSTEM_ERR
57+
appender.containerappender.layout.type = PatternLayout
58+
appender.containerappender.layout.pattern = [%c{1}] %m%n

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,6 @@
3434
import org.junit.jupiter.api.extension.ExtendWith;
3535
import org.junit.jupiter.params.ParameterizedTest;
3636
import org.junit.jupiter.params.provider.MethodSource;
37-
import org.slf4j.Logger;
38-
import org.slf4j.LoggerFactory;
3937
import org.testcontainers.containers.KafkaContainer;
4038
import org.testcontainers.junit.jupiter.Container;
4139
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -47,7 +45,6 @@
4745
import java.util.function.Consumer;
4846
import java.util.stream.Collectors;
4947

50-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
5148
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
5249
import static org.assertj.core.api.Assertions.assertThat;
5350
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -56,12 +53,9 @@
5653
@ExtendWith(TestLoggerExtension.class)
5754
class FlinkKafkaInternalProducerITCase {
5855

59-
private static final Logger LOG =
60-
LoggerFactory.getLogger(FlinkKafkaInternalProducerITCase.class);
61-
6256
@Container
6357
private static final KafkaContainer KAFKA_CONTAINER =
64-
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
58+
createKafkaContainer(FlinkKafkaInternalProducerITCase.class).withEmbeddedZookeeper();
6559

6660
@Test
6761
void testInitTransactionId() {

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,6 @@
101101
import java.util.stream.Collectors;
102102
import java.util.stream.LongStream;
103103

104-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
105104
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
106105
import static org.assertj.core.api.Assertions.assertThat;
107106
import static org.assertj.core.api.Assertions.fail;
@@ -124,7 +123,7 @@ public class KafkaSinkITCase extends TestLogger {
124123

125124
@ClassRule
126125
public static final KafkaContainer KAFKA_CONTAINER =
127-
createKafkaContainer(KAFKA, LOG)
126+
createKafkaContainer(KafkaSinkITCase.class)
128127
.withEmbeddedZookeeper()
129128
.withNetwork(NETWORK)
130129
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@
4444
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.Ongoing;
4545
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareAbort;
4646
import static org.apache.flink.connector.kafka.sink.KafkaTransactionLog.TransactionState.PrepareCommit;
47-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
4847
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
4948
import static org.assertj.core.api.Assertions.assertThat;
5049

@@ -57,7 +56,7 @@ public class KafkaTransactionLogITCase extends TestLogger {
5756

5857
@ClassRule
5958
public static final KafkaContainer KAFKA_CONTAINER =
60-
createKafkaContainer(KAFKA, LOG).withEmbeddedZookeeper();
59+
createKafkaContainer(KafkaTransactionLogITCase.class).withEmbeddedZookeeper();
6160

6261
private final List<Producer<byte[], Integer>> openProducers = new ArrayList<>();
6362

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterTestBase.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import java.util.concurrent.ScheduledFuture;
5656
import java.util.function.Consumer;
5757

58-
import static org.apache.flink.connector.kafka.testutils.DockerImageVersions.KAFKA;
5958
import static org.apache.flink.connector.kafka.testutils.KafkaUtil.createKafkaContainer;
6059

6160
/** Test base for KafkaWriter. */
@@ -73,7 +72,7 @@ public abstract class KafkaWriterTestBase {
7372
protected TriggerTimeService timeService;
7473

7574
protected static final KafkaContainer KAFKA_CONTAINER =
76-
createKafkaContainer(KAFKA, LOG)
75+
createKafkaContainer(KafkaWriterTestBase.class)
7776
.withEmbeddedZookeeper()
7877
.withNetwork(NETWORK)
7978
.withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);

flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaUtil.java

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818

1919
package org.apache.flink.connector.kafka.testutils;
2020

21-
import org.apache.flink.util.StringUtils;
22-
2321
import org.apache.kafka.clients.consumer.ConsumerConfig;
2422
import org.apache.kafka.clients.consumer.ConsumerRecord;
2523
import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -49,24 +47,34 @@ public class KafkaUtil {
4947

5048
private KafkaUtil() {}
5149

52-
/**
53-
* This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
54-
* levels with the ones used by the capturing logger.
55-
*
56-
* @param dockerImageVersion describing the Kafka image
57-
* @param logger to derive the log level from
58-
* @return configured Kafka container
59-
*/
60-
public static KafkaContainer createKafkaContainer(String dockerImageVersion, Logger logger) {
61-
return createKafkaContainer(dockerImageVersion, logger, null);
50+
/** This method helps to set commonly used Kafka configurations and sets up the logger. */
51+
public static KafkaContainer createKafkaContainer(Class<?> testCase) {
52+
return createKafkaContainer(getContainerName("kafka", testCase));
6253
}
6354

64-
/**
65-
* This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
66-
* levels with the ones used by the capturing logger, and set the prefix of logger.
67-
*/
68-
public static KafkaContainer createKafkaContainer(
69-
String dockerImageVersion, Logger logger, String loggerPrefix) {
55+
/** This method helps to set commonly used Kafka configurations and sets up the logger. */
56+
public static KafkaContainer createKafkaContainer(String containerName) {
57+
Logger logger = getLogger(containerName);
58+
59+
String logLevel = inferLogLevel(logger);
60+
61+
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger, true);
62+
return new KafkaContainer(DockerImageName.parse(DockerImageVersions.KAFKA))
63+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
64+
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
65+
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
66+
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
67+
.withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)
68+
.withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)
69+
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
70+
.withEnv(
71+
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
72+
String.valueOf(Duration.ofHours(2).toMillis()))
73+
.withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)
74+
.withLogConsumer(logConsumer);
75+
}
76+
77+
private static String inferLogLevel(Logger logger) {
7078
String logLevel;
7179
if (logger.isTraceEnabled()) {
7280
logLevel = "TRACE";
@@ -81,24 +89,19 @@ public static KafkaContainer createKafkaContainer(
8189
} else {
8290
logLevel = "OFF";
8391
}
92+
return logLevel;
93+
}
8494

85-
Slf4jLogConsumer logConsumer = new Slf4jLogConsumer(logger);
86-
if (!StringUtils.isNullOrWhitespaceOnly(loggerPrefix)) {
87-
logConsumer.withPrefix(loggerPrefix);
88-
}
89-
return new KafkaContainer(DockerImageName.parse(dockerImageVersion))
90-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
91-
.withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
92-
.withEnv("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
93-
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
94-
.withEnv("KAFKA_LOG4J_ROOT_LOGLEVEL", logLevel)
95-
.withEnv("KAFKA_LOG4J_LOGGERS", "state.change.logger=" + logLevel)
96-
.withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
97-
.withEnv(
98-
"KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
99-
String.valueOf(Duration.ofHours(2).toMillis()))
100-
.withEnv("KAFKA_LOG4J_TOOLS_ROOT_LOGLEVEL", logLevel)
101-
.withLogConsumer(logConsumer);
95+
public static Logger getLogger(String containerName) {
96+
return LoggerFactory.getLogger("container." + containerName);
97+
}
98+
99+
public static Logger getLogger(String type, Class<?> testClass) {
100+
return getLogger(getContainerName(type, testClass));
101+
}
102+
103+
private static String getContainerName(String type, Class<?> testClass) {
104+
return type + "." + testClass.getSimpleName();
102105
}
103106

104107
/**

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerMigrationTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ protected Properties createProperties() {
5454
Properties properties = new Properties();
5555
properties.putAll(standardProps);
5656
properties.putAll(secureProps);
57-
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-client-id");
5857
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "producer-transaction-id");
5958
properties.put(FlinkKafkaProducer.KEY_DISABLE_METRICS, "true");
6059
return properties;

flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -434,7 +434,7 @@ private KafkaContainer createKafkaContainer(
434434
int brokerID, @Nullable GenericContainer<?> zookeeper) {
435435
String brokerName = String.format("Kafka-%d", brokerID);
436436
KafkaContainer broker =
437-
KafkaUtil.createKafkaContainer(DockerImageVersions.KAFKA, LOG, brokerName)
437+
KafkaUtil.createKafkaContainer(brokerName)
438438
.withNetworkAliases(brokerName)
439439
.withEnv("KAFKA_BROKER_ID", String.valueOf(brokerID))
440440
.withEnv("KAFKA_MESSAGE_MAX_BYTES", String.valueOf(50 * 1024 * 1024))

0 commit comments

Comments
 (0)