Skip to content

Commit 61ad2cd

Browse files
committed
[FLINK-37730][Review] Address comments
1 parent 6fdbf84 commit 61ad2cd

File tree

6 files changed

+47
-50
lines changed

6 files changed

+47
-50
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkDeploymentContext.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,7 @@
2929
import org.apache.flink.kubernetes.operator.service.FlinkService;
3030

3131
import io.javaoperatorsdk.operator.api.reconciler.Context;
32-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3332

34-
import java.util.Map;
3533
import java.util.function.Function;
3634

3735
/** Context for reconciling a Flink resource. */
@@ -43,15 +41,14 @@ public FlinkDeploymentContext(
4341
KubernetesResourceMetricGroup resourceMetricGroup,
4442
FlinkConfigManager configManager,
4543
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
46-
Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
47-
lastRecordedExceptionCache) {
44+
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
4845
super(
4946
resource,
5047
josdkContext,
5148
resourceMetricGroup,
5249
configManager,
5350
flinkServiceFactory,
54-
lastRecordedExceptionCache);
51+
exceptionCacheEntry);
5552
}
5653

5754
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkResourceContext.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,11 @@
3535

3636
import io.fabric8.kubernetes.client.KubernetesClient;
3737
import io.javaoperatorsdk.operator.api.reconciler.Context;
38-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3938
import lombok.Getter;
4039
import lombok.RequiredArgsConstructor;
4140

4241
import javax.annotation.Nullable;
4342

44-
import java.util.Map;
4543
import java.util.function.Function;
4644

4745
/** Context for reconciling a Flink resource. */
@@ -54,9 +52,7 @@ public abstract class FlinkResourceContext<CR extends AbstractFlinkResource<?, ?
5452
protected final FlinkConfigManager configManager;
5553
private final Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory;
5654

57-
@Getter
58-
private final Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
59-
lastRecordedExceptionCache;
55+
@Getter private final FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry;
6056

6157
private FlinkOperatorConfiguration operatorConfig;
6258
private Configuration observeConfig;

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/controller/FlinkSessionJobContext.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,7 @@
3030
import org.apache.flink.kubernetes.operator.service.FlinkService;
3131

3232
import io.javaoperatorsdk.operator.api.reconciler.Context;
33-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3433

35-
import java.util.Map;
3634
import java.util.function.Function;
3735

3836
import static org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler.sessionClusterReady;
@@ -48,15 +46,14 @@ public FlinkSessionJobContext(
4846
KubernetesResourceMetricGroup resourceMetricGroup,
4947
FlinkConfigManager configManager,
5048
Function<FlinkResourceContext<?>, FlinkService> flinkServiceFactory,
51-
Map<ResourceID, FlinkResourceContextFactory.ExceptionCacheEntry>
52-
lastRecordedExceptionCache) {
49+
FlinkResourceContextFactory.ExceptionCacheEntry exceptionCacheEntry) {
5350
super(
5451
resource,
5552
josdkContext,
5653
resourceMetricGroup,
5754
configManager,
5855
flinkServiceFactory,
59-
lastRecordedExceptionCache);
56+
exceptionCacheEntry);
6057
}
6158

6259
@Override

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserver.java

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,11 @@
2727
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
2828
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
2929
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
30-
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3130
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3231
import org.apache.flink.kubernetes.operator.utils.ExceptionUtils;
3332
import org.apache.flink.runtime.client.JobStatusMessage;
3433
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
3534

36-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
3735
import org.slf4j.Logger;
3836
import org.slf4j.LoggerFactory;
3937

@@ -118,16 +116,9 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
118116
var resource = ctx.getResource();
119117
var operatorConfig = ctx.getOperatorConfig();
120118
var jobStatus = resource.getStatus().getJobStatus();
121-
// Ideally should not happen
122-
if (jobStatus == null || jobStatus.getJobId() == null) {
123-
LOG.warn(
124-
"No jobId found for deployment '{}', skipping exception observation.",
125-
resource.getMetadata().getName());
126-
return;
127-
}
128119

129120
try {
130-
JobID jobId = JobID.fromHexString(jobStatus.getJobId());
121+
var jobId = JobID.fromHexString(jobStatus.getJobId());
131122
// TODO: Ideally the best way to restrict the number of events is to use the query param
132123
// `maxExceptions`
133124
// but the JobExceptionsMessageParameters does not expose the parameters and nor does
@@ -153,13 +144,16 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
153144
jobId);
154145
}
155146

156-
ResourceID resourceID = ResourceID.fromResource(resource);
157147
String currentJobId = jobStatus.getJobId();
158148
Instant lastRecorded = null; // first reconciliation
159149

160-
FlinkResourceContextFactory.ExceptionCacheEntry cacheEntry =
161-
ctx.getLastRecordedExceptionCache().get(resourceID);
162-
if (cacheEntry != null && cacheEntry.getJobId().equals(currentJobId)) {
150+
var cacheEntry = ctx.getExceptionCacheEntry();
151+
// a cache entry is created should always be present. The timestamp for the first
152+
// reconciliation would be
153+
// when the job was created. This check is still necessary because even though there
154+
// might be an entry,
155+
// the jobId could have changed since the job was first created.
156+
if (cacheEntry.getJobId().equals(currentJobId)) {
163157
lastRecorded = Instant.ofEpochMilli(cacheEntry.getLastTimestamp());
164158
}
165159

@@ -179,16 +173,10 @@ protected void observeJobManagerExceptions(FlinkResourceContext<R> ctx) {
179173
break;
180174
}
181175
}
182-
ctx.getLastRecordedExceptionCache()
183-
.put(
184-
resourceID,
185-
new FlinkResourceContextFactory.ExceptionCacheEntry(
186-
currentJobId, now.toEpochMilli()));
176+
ctx.getExceptionCacheEntry().setJobId(currentJobId);
177+
ctx.getExceptionCacheEntry().setLastTimestamp(now.toEpochMilli());
187178
} catch (Exception e) {
188-
LOG.warn(
189-
"Failed to fetch JobManager exception info for deployment '{}'.",
190-
resource.getMetadata().getName(),
191-
e);
179+
LOG.warn("Failed to fetch JobManager exception info.", e);
192180
}
193181
}
194182

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkResourceContextFactory.java

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import io.javaoperatorsdk.operator.api.reconciler.Context;
4141
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4242
import lombok.Getter;
43+
import lombok.Setter;
4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
4546

@@ -55,9 +56,10 @@ public class FlinkResourceContextFactory {
5556

5657
/** The cache entry for the last recorded exception timestamp for a JobID. */
5758
@Getter
59+
@Setter
5860
public static final class ExceptionCacheEntry {
59-
final String jobId;
60-
final long lastTimestamp;
61+
private String jobId;
62+
private long lastTimestamp;
6163

6264
public ExceptionCacheEntry(String jobId, long lastTimestamp) {
6365
this.jobId = jobId;
@@ -110,26 +112,44 @@ public FlinkStateSnapshotContext getFlinkStateSnapshotContext(
110112
r ->
111113
OperatorMetricUtils.createResourceMetricGroup(
112114
operatorMetricGroup, configManager, resource));
113-
115+
String jobId = null;
116+
if (resource.getStatus() != null) {
117+
if (resource.getStatus().getJobStatus() != null) {
118+
jobId = resource.getStatus().getJobStatus().getJobId();
119+
}
120+
}
114121
if (resource instanceof FlinkDeployment) {
115122
var flinkDep = (FlinkDeployment) resource;
123+
var resourceId = ResourceID.fromResource(flinkDep);
124+
var flinkDepJobId = jobId;
116125
return (FlinkResourceContext<CR>)
117126
new FlinkDeploymentContext(
118127
flinkDep,
119128
josdkContext,
120129
resMg,
121130
configManager,
122131
this::getFlinkService,
123-
lastRecordedExceptionCache);
132+
lastRecordedExceptionCache.computeIfAbsent(
133+
resourceId,
134+
id ->
135+
new ExceptionCacheEntry(
136+
flinkDepJobId, System.currentTimeMillis())));
124137
} else if (resource instanceof FlinkSessionJob) {
138+
var resourceId = ResourceID.fromResource(resource);
139+
var flinkSessionJobId = jobId;
125140
return (FlinkResourceContext<CR>)
126141
new FlinkSessionJobContext(
127142
(FlinkSessionJob) resource,
128143
josdkContext,
129144
resMg,
130145
configManager,
131146
this::getFlinkService,
132-
lastRecordedExceptionCache);
147+
lastRecordedExceptionCache.computeIfAbsent(
148+
resourceId,
149+
id ->
150+
new ExceptionCacheEntry(
151+
flinkSessionJobId,
152+
System.currentTimeMillis())));
133153
} else {
134154
throw new IllegalArgumentException(
135155
"Unknown resource type " + resource.getClass().getSimpleName());

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/observer/JobStatusObserverTest.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,12 @@
3131
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
3232
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
3333
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
34-
import org.apache.flink.kubernetes.operator.service.FlinkResourceContextFactory;
3534
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3635
import org.apache.flink.util.SerializedThrowable;
3736

3837
import io.fabric8.kubernetes.api.model.MicroTime;
3938
import io.fabric8.kubernetes.client.KubernetesClient;
4039
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
41-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
4240
import lombok.Getter;
4341
import org.junit.jupiter.api.Test;
4442
import org.junit.jupiter.params.ParameterizedTest;
@@ -174,6 +172,8 @@ public void testExceptionLimitConfig() throws Exception {
174172
getResourceContext(deployment, operatorConfig); // set a non-terminal state
175173

176174
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
175+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
176+
ctx.getExceptionCacheEntry().setLastTimestamp(500L);
177177

178178
flinkService.submitApplicationCluster(
179179
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
@@ -214,6 +214,8 @@ public void testStackTraceTruncationConfig() throws Exception {
214214
flinkService.submitApplicationCluster(
215215
deployment.getSpec().getJob(), ctx.getDeployConfig(deployment.getSpec()), false);
216216
ReconciliationUtils.updateStatusForDeployedSpec(deployment, new Configuration());
217+
ctx.getExceptionCacheEntry().setJobId(jobId.toHexString());
218+
ctx.getExceptionCacheEntry().setLastTimestamp(3000L);
217219

218220
long exceptionTime = 4000L;
219221
String longTrace = "line1\nline2\nline3\nline4";
@@ -251,11 +253,8 @@ public void testIgnoreOldExceptions() throws Exception {
251253
jobStatus.setState(JobStatus.RUNNING); // set a non-terminal state
252254

253255
FlinkResourceContext<AbstractFlinkResource<?, ?>> ctx = getResourceContext(deployment);
254-
ctx.getLastRecordedExceptionCache()
255-
.put(
256-
ResourceID.fromResource(deployment),
257-
new FlinkResourceContextFactory.ExceptionCacheEntry(
258-
deployment.getStatus().getJobStatus().getJobId(), 2500L));
256+
ctx.getExceptionCacheEntry().setJobId(deployment.getStatus().getJobStatus().getJobId());
257+
ctx.getExceptionCacheEntry().setLastTimestamp(2500L);
259258

260259
var jobId = JobID.fromHexString(deployment.getStatus().getJobStatus().getJobId());
261260
flinkService.submitApplicationCluster(

0 commit comments

Comments
 (0)