Skip to content
This repository was archived by the owner on May 8, 2025. It is now read-only.

Commit 806b644

Browse files
authored
Allow customization of logging configuration (#287)
1 parent fd709e3 commit 806b644

File tree

8 files changed

+213
-20
lines changed

8 files changed

+213
-20
lines changed

api/v1beta1/flinkcluster_types.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,11 @@ type FlinkClusterSpec struct {
426426
// Config for GCP.
427427
GCPConfig *GCPConfig `json:"gcpConfig,omitempty"`
428428

429+
// The logging configuration, which should have keys 'log4j-console.properties' and 'logback-console.xml'.
430+
// These will end up in the 'flink-config-volume' ConfigMap, which gets mounted at /opt/flink/conf.
431+
// If not provided, defaults that log to console only will be used.
432+
LogConfig map[string]string `json:"logConfig,omitempty"`
433+
429434
// The maximum number of revision history to keep, default: 10.
430435
RevisionHistoryLimit *int32 `json:"revisionHistoryLimit,omitempty"`
431436
}

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

config/crd/bases/flinkoperator.k8s.io_flinkclusters.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3161,6 +3161,10 @@ spec:
31613161
required:
31623162
- accessScope
31633163
type: object
3164+
logConfig:
3165+
additionalProperties:
3166+
type: string
3167+
type: object
31643168
revisionHistoryLimit:
31653169
format: int32
31663170
type: integer

controllers/flinkcluster_converter.go

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -554,9 +554,9 @@ func getDesiredConfigMap(
554554
}
555555
flinkProps[k] = v
556556
}
557-
// TODO: Provide logging options: log4j-console.properties and log4j.properties
558-
var log4jPropName = "log4j-console.properties"
559-
var logbackXMLName = "logback-console.xml"
557+
var configData = getLogConf(flinkCluster.Spec)
558+
configData["flink-conf.yaml"] = getFlinkProperties(flinkProps)
559+
configData["submit-job.sh"] = submitJobScript
560560
var configMap = &corev1.ConfigMap{
561561
ObjectMeta: metav1.ObjectMeta{
562562
Namespace: clusterNamespace,
@@ -565,12 +565,7 @@ func getDesiredConfigMap(
565565
ToOwnerReference(flinkCluster)},
566566
Labels: labels,
567567
},
568-
Data: map[string]string{
569-
"flink-conf.yaml": getFlinkProperties(flinkProps),
570-
log4jPropName: getLogConf()[log4jPropName],
571-
logbackXMLName: getLogConf()[logbackXMLName],
572-
"submit-job.sh": submitJobScript,
573-
},
568+
Data: configData,
574569
}
575570

576571
return configMap
@@ -1051,10 +1046,8 @@ func mergeLabels(labels1 map[string]string, labels2 map[string]string) map[strin
10511046
return mergedLabels
10521047
}
10531048

1054-
// TODO: Wouldn't it be better to create a file, put it in an operator image, and read from them?.
1055-
// Provide logging profiles
1056-
func getLogConf() map[string]string {
1057-
var log4jConsoleProperties = `log4j.rootLogger=INFO, console
1049+
const (
1050+
DefaultLog4jConfig = `log4j.rootLogger=INFO, console
10581051
log4j.logger.akka=INFO
10591052
log4j.logger.org.apache.kafka=INFO
10601053
log4j.logger.org.apache.hadoop=INFO
@@ -1063,7 +1056,7 @@ log4j.appender.console=org.apache.log4j.ConsoleAppender
10631056
log4j.appender.console.layout=org.apache.log4j.PatternLayout
10641057
log4j.appender.console.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
10651058
log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, console`
1066-
var logbackConsoleXML = `<configuration>
1059+
DefaultLogbackConfig = `<configuration>
10671060
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
10681061
<encoder>
10691062
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
@@ -1088,9 +1081,20 @@ log4j.logger.org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannel
10881081
<appender-ref ref="console"/>
10891082
</logger>
10901083
</configuration>`
1084+
)
10911085

1092-
return map[string]string{
1093-
"log4j-console.properties": log4jConsoleProperties,
1094-
"logback-console.xml": logbackConsoleXML,
1086+
// TODO: Wouldn't it be better to create a file, put it in an operator image, and read from them?.
1087+
// Provide logging profiles
1088+
func getLogConf(spec v1beta1.FlinkClusterSpec) map[string]string {
1089+
result := spec.LogConfig
1090+
if result == nil {
1091+
result = make(map[string]string, 2)
1092+
}
1093+
if _, isPresent := result["log4j-console.properties"]; !isPresent {
1094+
result["log4j-console.properties"] = DefaultLog4jConfig
1095+
}
1096+
if _, isPresent := result["logback-console.xml"]; !isPresent {
1097+
result["logback-console.xml"] = DefaultLogbackConfig
10951098
}
1099+
return result
10961100
}

controllers/flinkcluster_converter_test.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package controllers
1818

1919
import (
20+
"reflect"
2021
"testing"
2122
"time"
2223

@@ -254,6 +255,11 @@ func TestGetDesiredClusterState(t *testing.T) {
254255
MountPath: "/etc/gcp_service_account/",
255256
},
256257
},
258+
LogConfig: map[string]string{
259+
"extra-file.txt": "hello!",
260+
"log4j-console.properties": "foo",
261+
"logback-console.xml": "bar",
262+
},
257263
},
258264
Status: v1beta1.FlinkClusterStatus{
259265
NextRevision: "flinkjobcluster-sample-85dc8f749-1",
@@ -888,8 +894,9 @@ taskmanager.rpc.port: 6122
888894
},
889895
Data: map[string]string{
890896
"flink-conf.yaml": flinkConfYaml,
891-
"log4j-console.properties": getLogConf()["log4j-console.properties"],
892-
"logback-console.xml": getLogConf()["logback-console.xml"],
897+
"extra-file.txt": "hello!",
898+
"log4j-console.properties": "foo",
899+
"logback-console.xml": "bar",
893900
"submit-job.sh": submitJobScript,
894901
},
895902
}
@@ -969,3 +976,73 @@ func TestCalFlinkHeapSize(t *testing.T) {
969976
flinkHeapSize = calFlinkHeapSize(cluster)
970977
assert.Assert(t, len(flinkHeapSize) == 0)
971978
}
979+
980+
func Test_getLogConf(t *testing.T) {
981+
type args struct {
982+
spec v1beta1.FlinkClusterSpec
983+
}
984+
tests := []struct {
985+
name string
986+
args args
987+
want map[string]string
988+
}{
989+
{
990+
name: "nil map uses defaults",
991+
args: args{v1beta1.FlinkClusterSpec{LogConfig: nil}},
992+
want: map[string]string{
993+
"log4j-console.properties": DefaultLog4jConfig,
994+
"logback-console.xml": DefaultLogbackConfig,
995+
},
996+
},
997+
{
998+
name: "map missing log4j-console.properties uses default",
999+
args: args{v1beta1.FlinkClusterSpec{LogConfig: map[string]string{
1000+
"logback-console.xml": "xyz",
1001+
}}},
1002+
want: map[string]string{
1003+
"log4j-console.properties": DefaultLog4jConfig,
1004+
"logback-console.xml": "xyz",
1005+
},
1006+
},
1007+
{
1008+
name: "map missing logback-console.xml uses default",
1009+
args: args{v1beta1.FlinkClusterSpec{LogConfig: map[string]string{
1010+
"log4j-console.properties": "xyz",
1011+
}}},
1012+
want: map[string]string{
1013+
"log4j-console.properties": "xyz",
1014+
"logback-console.xml": DefaultLogbackConfig,
1015+
},
1016+
},
1017+
{
1018+
name: "map with both keys overrides defaults",
1019+
args: args{v1beta1.FlinkClusterSpec{LogConfig: map[string]string{
1020+
"log4j-console.properties": "hello",
1021+
"logback-console.xml": "world",
1022+
}}},
1023+
want: map[string]string{
1024+
"log4j-console.properties": "hello",
1025+
"logback-console.xml": "world",
1026+
},
1027+
},
1028+
{
1029+
name: "extra keys preserved",
1030+
args: args{v1beta1.FlinkClusterSpec{LogConfig: map[string]string{
1031+
"log4j-console.properties": "abc",
1032+
"file.txt": "def",
1033+
}}},
1034+
want: map[string]string{
1035+
"log4j-console.properties": "abc",
1036+
"logback-console.xml": DefaultLogbackConfig,
1037+
"file.txt": "def",
1038+
},
1039+
},
1040+
}
1041+
for _, tt := range tests {
1042+
t.Run(tt.name, func(t *testing.T) {
1043+
if got := getLogConf(tt.args.spec); !reflect.DeepEqual(got, tt.want) {
1044+
t.Errorf("getLogConf() = %v, want %v", got, tt.want)
1045+
}
1046+
})
1047+
}
1048+
}

docs/crd.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ FlinkCluster
8888
|__ secretName
8989
|__ keyFile
9090
|__ mountPath
91+
|__ logConfig
9192
|__ revisionHistoryLimit
9293
|__ status
9394
|__ state
@@ -279,6 +280,13 @@ FlinkCluster
279280
same namespace as the FlinkCluster.
280281
* **keyFile**: The name of the service account key file.
281282
* **mountPath**: The path where to mount the Volume of the Secret.
283+
* **logConfig** (optional): The logging configuration, a string-to-string map that becomes the ConfigMap mounted at
284+
`/opt/flink/conf` in launched Flink pods. See below for some possible keys to include:
285+
* **log4j-console.properties**: The contents of the log4j properties file to use. If not provided, a default that
286+
logs only to stdout will be provided.
287+
* **logback-console.xml**: The contents of the logback XML file to use. If not provided, a default that logs only
288+
to stdout will be provided.
289+
* Other arbitrary keys are also allowed, and will become part of the ConfigMap.
282290
* **revisionHistoryLimit** (optional): The maximum number of revision history to keep, default: 10.
283291
* **status**: Flink job or session cluster status.
284292
* **state**: The overall state of the Flink cluster.

docs/user_guide.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,4 +444,23 @@ You can check the list of revisions and their contents like this:
444444
```bash
445445
kubectl get controllerrevision
446446
kubectl get controllerrevision <REVISION-NAME> -o yaml
447-
```
447+
```
448+
449+
### Control Logging Behavior
450+
451+
The default logging configuration provided by the operator sends logs from JobManager and TaskManager to `stdout`. This
452+
has the effect of making it so that logging from Flink workloads running on Kubernetes behaves like every other
453+
Kubernetes pod. Your Flink logs should be stored wherever you generally expect to see your container logs in your
454+
environment.
455+
456+
Sometimes, however, this is not a good fit. An example of when you might want to customize logging behavior is to
457+
restore the visibility of logs in the Flink JobManager web interface. Or you might want to ship logs directly to a
458+
different sink, or using a different formatter.
459+
460+
You can use the `spec.logConfig` field to fully control the log4j and logback configuration. It is a string-to-string map,
461+
whose keys and values become filenames and contents (respectively) in the folder `/opt/flink/conf` in each container.
462+
The default Flink docker entrypoint expects this directory to contain two files: `log4j-console.properties` and
463+
`logback-console.xml`.
464+
465+
An example of using this parameter to make logs visible in both the Flink UI and on stdout
466+
[can be found here](../examples/log_config.yaml).

examples/log_config.yaml

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
apiVersion: flinkoperator.k8s.io/v1beta1
2+
kind: FlinkCluster
3+
metadata:
4+
name: my-cool-cluster
5+
spec:
6+
image:
7+
name: flink:1.11.1
8+
jobManager:
9+
resources:
10+
limits:
11+
memory: 1Gi
12+
cpu: "1.0"
13+
taskManager:
14+
replicas: 2
15+
resources:
16+
limits:
17+
memory: 1Gi
18+
cpu: "1.0"
19+
flinkProperties:
20+
taskmanager.numberOfTaskSlots: "1"
21+
logConfig:
22+
"log4j-console.properties": |
23+
rootLogger.level = INFO
24+
rootLogger.appenderRef.file.ref = LogFile
25+
rootLogger.appenderRef.console.ref = LogConsole
26+
appender.file.name = LogFile
27+
appender.file.type = File
28+
appender.file.append = false
29+
appender.file.fileName = ${sys:log.file}
30+
appender.file.layout.type = PatternLayout
31+
appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
32+
appender.console.name = LogConsole
33+
appender.console.type = CONSOLE
34+
appender.console.layout.type = PatternLayout
35+
appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
36+
logger.akka.name = akka
37+
logger.akka.level = INFO
38+
logger.kafka.name= org.apache.kafka
39+
logger.kafka.level = INFO
40+
logger.hadoop.name = org.apache.hadoop
41+
logger.hadoop.level = INFO
42+
logger.zookeeper.name = org.apache.zookeeper
43+
logger.zookeeper.level = INFO
44+
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
45+
logger.netty.level = OFF
46+
"logback-console.xml": |
47+
<configuration>
48+
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
49+
<encoder>
50+
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
51+
</encoder>
52+
</appender>
53+
<appender name="file" class="ch.qos.logback.core.FileAppender">
54+
<file>${log.file}</file>
55+
<append>false</append>
56+
<encoder>
57+
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
58+
</encoder>
59+
</appender>
60+
<root level="INFO">
61+
<appender-ref ref="console"/>
62+
<appender-ref ref="file"/>
63+
</root>
64+
<logger name="akka" level="INFO" />
65+
<logger name="org.apache.kafka" level="INFO" />
66+
<logger name="org.apache.hadoop" level="INFO" />
67+
<logger name="org.apache.zookeeper" level="INFO" />
68+
<logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="ERROR" />
69+
</configuration>

0 commit comments

Comments
 (0)