Skip to content

Commit f1e9320

Browse files
committed
Addresses review comments
1 parent 61ad2cd commit f1e9320

File tree

7 files changed

+270
-18
lines changed

7 files changed

+270
-18
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
3030
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3131
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
32+
import org.apache.flink.kubernetes.operator.utils.K8sAnnotationsSanitizer;
3233
import org.apache.flink.runtime.client.JobStatusMessage;
3334
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
3435

@@ -40,6 +41,7 @@
4041
import java.util.HashMap;
4142
import java.util.Map;
4243
import java.util.concurrent.TimeoutException;
44+
import java.util.regex.Pattern;
4345

4446
import static org.apache.flink.kubernetes.operator.utils.FlinkResourceExceptionUtils.updateFlinkResourceException;
4547

@@ -49,6 +51,8 @@ public class JobStatusObserver<R extends AbstractFlinkResource<?, ?>> {
4951
private static final Logger LOG = LoggerFactory.getLogger(JobStatusObserver.class);
5052

5153
public static final String JOB_NOT_FOUND_ERR = "Job Not Found";
54+
private static final Pattern VALID_K8S_ANNOTATION_KEY_PATTERN =
55+
Pattern.compile("^[a-zA-Z0-9./-]{1,63}$");
5256

5357
protected final EventRecorder eventRecorder;
5458

@@ -85,11 +89,9 @@ public boolean observe(FlinkResourceContext<R> ctx) {
8589
var newJobStatus = newJobStatusOpt.get();
8690
updateJobStatus(ctx, newJobStatus);
8791
ReconciliationUtils.checkAndUpdateStableSpec(resource.getStatus());
88-
// now check if the job is in a terminal state. This might not be need as we have
89-
// already
90-
// verified that the REST API server is available
91-
// now check if the job is NOT in a terminal state
92-
if (!newJobStatus.getJobState().isGloballyTerminalState()) {
92+
// see if the JM server is up, try to get the exceptions
93+
// in case the new
94+
if (!previousJobStatus.isGloballyTerminalState()) {
9395
observeJobManagerExceptions(ctx);
9496
}
9597
return true;
@@ -124,9 +126,7 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
124126
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
125127
// it have setters.
126128
var history =
127-
ctx.getFlinkService()
128-
.getJobExceptions(
129-
resource, jobId, ctx.getDeployConfig(resource.getSpec()));
129+
ctx.getFlinkService().getJobExceptions(resource, jobId, ctx.getObserveConfig());
130130

131131
if (history == null || history.getExceptionHistory() == null) {
132132
return;
@@ -241,7 +241,7 @@ private void emitJobManagerExceptionEvent(
241241
EventRecorder.Component.JobManagerDeployment,
242242
"jobmanager-exception-" + keyMessage.hashCode(),
243243
ctx.getKubernetesClient(),
244-
annotations);
244+
K8sAnnotationsSanitizer.sanitizeAnnotations(annotations));
245245
}
246246

247247
/**

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -851,9 +851,9 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
851851

852852
@Override
853853
public JobExceptionsInfoWithHistory getJobExceptions(
854-
AbstractFlinkResource resource, JobID jobId, Configuration deployConfig) {
854+
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
855855
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
856-
int port = deployConfig.getInteger(RestOptions.PORT);
856+
int port = observeConfig.getInteger(RestOptions.PORT);
857857
String host =
858858
ObjectUtils.firstNonNull(
859859
operatorConfig.getFlinkServiceHostOverride(),
@@ -862,7 +862,7 @@ public JobExceptionsInfoWithHistory getJobExceptions(
862862
resource.getMetadata().getNamespace()));
863863
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
864864
params.jobPathParameter.resolve(jobId);
865-
try (var restClient = getRestClient(deployConfig)) {
865+
try (var restClient = getRestClient(observeConfig)) {
866866
return restClient
867867
.sendRequest(
868868
host,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,7 @@
3939

4040
import io.javaoperatorsdk.operator.api.reconciler.Context;
4141
import io.javaoperatorsdk.operator.processing.event.ResourceID;
42-
import lombok.Getter;
43-
import lombok.Setter;
42+
import lombok.Data;
4443
import org.slf4j.Logger;
4544
import org.slf4j.LoggerFactory;
4645

@@ -55,8 +54,7 @@ public class FlinkResourceContextFactory {
5554
private static final Logger LOG = LoggerFactory.getLogger(FlinkResourceContextFactory.class);
5655

5756
/** The cache entry for the last recorded exception timestamp for a JobID. */
58-
@Getter
59-
@Setter
57+
@Data
6058
public static final class ExceptionCacheEntry {
6159
private String jobId;
6260
private long lastTimestamp;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me
130130
RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
131131

132132
JobExceptionsInfoWithHistory getJobExceptions(
133-
AbstractFlinkResource resource, JobID jobId, Configuration deployConfig)
133+
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig)
134134
throws Exception;
135135

136136
/** Result of a cancel operation. */
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.utils;
19+
20+
import org.apache.flink.annotation.VisibleForTesting;
21+
22+
import java.util.HashMap;
23+
import java.util.Map;
24+
import java.util.regex.Pattern;
25+
26+
/**
27+
* Utility class for sanitizing Kubernetes annotations as per k8s rules. See <a
28+
* href="https://kubernetes.io/docs/concepts/overview/working-with-objects/annotations/#syntax-and-character-set">Kubernetes
29+
* Annotations Syntax and Character Set</a>
30+
*/
31+
public class K8sAnnotationsSanitizer {
32+
33+
private static final int MAX_PREFIX_LENGTH = 253;
34+
private static final int MAX_NAME_LENGTH = 63;
35+
36+
// Name: starts and ends with alphanumeric, allows alphanum, '-', '_', '.' in middle
37+
private static final Pattern NAME_PATTERN =
38+
Pattern.compile("^[a-zA-Z0-9]([a-zA-Z0-9_.-]*[a-zA-Z0-9])?$");
39+
40+
// DNS label: alphanumeric, may have hyphens inside, length ≤ 63
41+
private static final Pattern DNS_LABEL_PATTERN =
42+
Pattern.compile("^[a-zA-Z0-9]([-a-zA-Z0-9]*[a-zA-Z0-9])?$");
43+
44+
/**
45+
* Sanitizes the input annotations by validating keys and cleaning values. Only includes entries
46+
* with valid keys and non-null values.
47+
*/
48+
public static Map<String, String> sanitizeAnnotations(Map<String, String> annotations) {
49+
Map<String, String> sanitized = new HashMap<>();
50+
if (annotations == null) {
51+
return sanitized;
52+
}
53+
54+
for (Map.Entry<String, String> entry : annotations.entrySet()) {
55+
String key = entry.getKey();
56+
String value = entry.getValue();
57+
58+
if (isValidAnnotationKey(key)) {
59+
String sanitizedValue = sanitizeAnnotationValue(value);
60+
if (sanitizedValue != null) {
61+
sanitized.put(key, sanitizedValue);
62+
}
63+
}
64+
}
65+
return sanitized;
66+
}
67+
68+
/** Validates the annotation key according to Kubernetes rules: Optional prefix + "/" + name. */
69+
@VisibleForTesting
70+
static boolean isValidAnnotationKey(String key) {
71+
if (key == null || key.isEmpty()) {
72+
return false;
73+
}
74+
75+
String[] parts = key.split("/", 2);
76+
if (parts.length == 2) {
77+
return isValidPrefix(parts[0]) && isValidName(parts[1]);
78+
} else {
79+
return isValidName(parts[0]);
80+
}
81+
}
82+
83+
/**
84+
* Validates the prefix as a DNS subdomain: series of DNS labels separated by dots, total length
85+
* ≤ 253.
86+
*/
87+
private static boolean isValidPrefix(String prefix) {
88+
if (prefix.length() > MAX_PREFIX_LENGTH) {
89+
return false;
90+
}
91+
if (prefix.endsWith(".")) {
92+
return false; // no trailing dot allowed
93+
}
94+
String[] labels = prefix.split("\\.");
95+
for (String label : labels) {
96+
if (label.isEmpty() || label.length() > 63) {
97+
return false;
98+
}
99+
if (!DNS_LABEL_PATTERN.matcher(label).matches()) {
100+
return false;
101+
}
102+
}
103+
return true;
104+
}
105+
106+
/** Validates the name part of the key. */
107+
private static boolean isValidName(String name) {
108+
return name != null
109+
&& name.length() <= MAX_NAME_LENGTH
110+
&& NAME_PATTERN.matcher(name).matches();
111+
}
112+
113+
/**
114+
* Sanitizes the annotation value by trimming and removing control characters, replacing
115+
* newlines and tabs with spaces. No length limit.
116+
*/
117+
@VisibleForTesting
118+
static String sanitizeAnnotationValue(String value) {
119+
if (value == null) {
120+
return null;
121+
}
122+
123+
// Trim whitespace, remove control chars except \r \n \t, replace those with space
124+
String sanitized =
125+
value.trim()
126+
.replaceAll("[\\p{Cntrl}&&[^\r\n\t]]", "")
127+
.replaceAll("[\\r\\n\\t]+", " ");
128+
129+
return sanitized;
130+
}
131+
}

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -315,7 +315,7 @@ public Optional<JobStatusMessage> getJobStatus(Configuration conf, JobID jobID)
315315

316316
@Override
317317
public JobExceptionsInfoWithHistory getJobExceptions(
318-
AbstractFlinkResource resource, JobID jobId, Configuration deployConfig) {
318+
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig) {
319319
return jobExceptionsMap.getOrDefault(jobId, null);
320320
}
321321

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.kubernetes.operator.utils;
19+
20+
import org.junit.jupiter.api.Test;
21+
22+
import java.util.Map;
23+
24+
import static org.assertj.core.api.Assertions.assertThat;
25+
26+
/** Tests for {@link K8sAnnotationsSanitizer}. */
27+
public class K8sAnnotationsSanitizerTest {
28+
29+
@Test
30+
public void testValidKeysWithoutPrefix() {
31+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo")).isTrue();
32+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo-bar")).isTrue();
33+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo.bar")).isTrue();
34+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo_bar")).isTrue();
35+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("f")).isTrue();
36+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("f1")).isTrue();
37+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("1f")).isTrue();
38+
}
39+
40+
@Test
41+
public void testInvalidKeysWithoutPrefix() {
42+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("")).isFalse();
43+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(null)).isFalse();
44+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("-foo")).isFalse();
45+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo-")).isFalse();
46+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(".foo")).isFalse();
47+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo.")).isFalse();
48+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("_foo")).isFalse();
49+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo_")).isFalse();
50+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo@bar")).isFalse();
51+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("foo/bar/baz"))
52+
.isFalse(); // multiple slashes invalid
53+
}
54+
55+
@Test
56+
public void testValidKeysWithPrefix() {
57+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo")).isTrue();
58+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("a.b.c/foo-bar")).isTrue();
59+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("abc/foo_bar")).isTrue();
60+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("abc/foo.bar")).isTrue();
61+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("a/foo")).isTrue();
62+
}
63+
64+
@Test
65+
public void testInvalidPrefix() {
66+
String longPrefix = "a".repeat(254);
67+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(longPrefix + "/foo")).isFalse();
68+
69+
String longLabel = "a".repeat(64);
70+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(longLabel + ".com/foo")).isFalse();
71+
72+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("ex_ample.com/foo")).isFalse();
73+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("-example.com/foo")).isFalse();
74+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example-.com/foo")).isFalse();
75+
76+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey(".example.com/foo")).isFalse();
77+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example..com/foo")).isFalse();
78+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com./foo")).isFalse();
79+
}
80+
81+
@Test
82+
public void testInvalidNameWithPrefix() {
83+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/-foo")).isFalse();
84+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo-")).isFalse();
85+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/.foo")).isFalse();
86+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo.")).isFalse();
87+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/foo@bar")).isFalse();
88+
assertThat(K8sAnnotationsSanitizer.isValidAnnotationKey("example.com/")).isFalse();
89+
}
90+
91+
@Test
92+
public void testSanitizeAnnotationValue() {
93+
assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(null)).isNull();
94+
95+
assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(" value ")).isEqualTo("value");
96+
97+
String rawValue = "line1\nline2\r\nline3\tend\u0007"; // \u0007 is bell (control char)
98+
String expected = "line1 line2 line3 end";
99+
assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(rawValue)).isEqualTo(expected);
100+
101+
String unicode = "café résumé – test";
102+
assertThat(K8sAnnotationsSanitizer.sanitizeAnnotationValue(unicode)).isEqualTo(unicode);
103+
}
104+
105+
@Test
106+
public void testSanitizeAnnotations() {
107+
Map<String, String> input =
108+
Map.of(
109+
"valid-key", " some value ",
110+
"example.com/valid-name", "value\nwith\nnewlines",
111+
"invalid key", "value",
112+
"prefix_with_underscore/foo", "value",
113+
"validprefix/foo-", "value",
114+
"example.com/invalid_name@", "value");
115+
116+
Map<String, String> sanitized = K8sAnnotationsSanitizer.sanitizeAnnotations(input);
117+
118+
assertThat(sanitized).hasSize(2).containsKeys("valid-key", "example.com/valid-name");
119+
120+
assertThat(sanitized.get("valid-key")).isEqualTo("some value");
121+
assertThat(sanitized.get("example.com/valid-name")).isEqualTo("value with newlines");
122+
}
123+
}

0 commit comments

Comments
 (0)