Skip to content

Commit 71f53d8

Browse files
Handle exception instead and add observer test
1 parent 9fc75ca commit 71f53d8

File tree

8 files changed

+68
-66
lines changed

8 files changed

+68
-66
lines changed

flink-autoscaler/src/main/java/org/apache/flink/runtime/rest/messages/job/JobDetailsInfo.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.api.common.JobStatus;
2424
import org.apache.flink.runtime.execution.ExecutionState;
2525
import org.apache.flink.runtime.instance.SlotSharingGroupId;
26-
import org.apache.flink.runtime.jobgraph.JobType;
2726
import org.apache.flink.runtime.jobgraph.JobVertexID;
2827
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
2928
import org.apache.flink.runtime.rest.messages.ResponseBody;
@@ -46,8 +45,8 @@
4645
import java.util.Map;
4746
import java.util.Objects;
4847

49-
/** Copied from Flink. */
50-
/** The difference compared to 1.20 is that slot sharing group and job type are optional here. */
48+
/** Copied from Flink. Should be removed once the client dependency is upgraded to 1.20. */
49+
/** The difference compared to 1.18 is that slot sharing group is optional here. */
5150
public class JobDetailsInfo implements ResponseBody {
5251

5352
public static final String FIELD_NAME_JOB_ID = "jid";
@@ -58,8 +57,6 @@ public class JobDetailsInfo implements ResponseBody {
5857

5958
public static final String FIELD_NAME_JOB_STATUS = "state";
6059

61-
public static final String FIELD_NAME_JOB_TYPE = "job-type";
62-
6360
public static final String FIELD_NAME_START_TIME = "start-time";
6461

6562
public static final String FIELD_NAME_END_TIME = "end-time";
@@ -92,9 +89,6 @@ public class JobDetailsInfo implements ResponseBody {
9289
@JsonProperty(FIELD_NAME_JOB_STATUS)
9390
private final JobStatus jobStatus;
9491

95-
@JsonProperty(FIELD_NAME_JOB_TYPE)
96-
private final JobType jobType;
97-
9892
@JsonProperty(FIELD_NAME_START_TIME)
9993
private final long startTime;
10094

@@ -129,7 +123,6 @@ public JobDetailsInfo(
129123
@JsonProperty(FIELD_NAME_JOB_NAME) String name,
130124
@JsonProperty(FIELD_NAME_IS_STOPPABLE) boolean isStoppable,
131125
@JsonProperty(FIELD_NAME_JOB_STATUS) JobStatus jobStatus,
132-
@JsonProperty(FIELD_NAME_JOB_TYPE) JobType jobType,
133126
@JsonProperty(FIELD_NAME_START_TIME) long startTime,
134127
@JsonProperty(FIELD_NAME_END_TIME) long endTime,
135128
@JsonProperty(FIELD_NAME_DURATION) long duration,
@@ -145,7 +138,6 @@ public JobDetailsInfo(
145138
this.name = Preconditions.checkNotNull(name);
146139
this.isStoppable = isStoppable;
147140
this.jobStatus = Preconditions.checkNotNull(jobStatus);
148-
this.jobType = jobType;
149141
this.startTime = startTime;
150142
this.endTime = endTime;
151143
this.duration = duration;
@@ -175,7 +167,6 @@ public boolean equals(Object o) {
175167
&& Objects.equals(jobId, that.jobId)
176168
&& Objects.equals(name, that.name)
177169
&& jobStatus == that.jobStatus
178-
&& jobType == that.jobType
179170
&& Objects.equals(timestamps, that.timestamps)
180171
&& Objects.equals(jobVertexInfos, that.jobVertexInfos)
181172
&& Objects.equals(jobVerticesPerState, that.jobVerticesPerState)
@@ -189,7 +180,6 @@ public int hashCode() {
189180
name,
190181
isStoppable,
191182
jobStatus,
192-
jobType,
193183
startTime,
194184
endTime,
195185
duration,
@@ -221,11 +211,6 @@ public JobStatus getJobStatus() {
221211
return jobStatus;
222212
}
223213

224-
@JsonIgnore
225-
public JobType getJobType() {
226-
return jobType;
227-
}
228-
229214
@JsonIgnore
230215
public long getStartTime() {
231216
return startTime;
@@ -276,7 +261,7 @@ public String getJsonPlan() {
276261
// ---------------------------------------------------
277262

278263
/** Detailed information about a job vertex. */
279-
// @Schema(name = "JobDetailsVertexInfo")
264+
// @Schema(name = "JobDetailsVertexInfo")
280265
public static final class JobVertexDetailsInfo {
281266

282267
public static final String FIELD_NAME_JOB_VERTEX_ID = "id";

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.flink.client.program.rest.RestClusterClient;
2929
import org.apache.flink.configuration.Configuration;
3030
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
31-
import org.apache.flink.runtime.jobgraph.JobType;
3231
import org.apache.flink.runtime.jobgraph.JobVertexID;
3332
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
3433
import org.apache.flink.runtime.rest.messages.MessageHeaders;
@@ -80,7 +79,6 @@ public void testJobTopologyParsingFromJobDetails() throws Exception {
8079
+ " \"name\": \"State machine job\",\n"
8180
+ " \"isStoppable\": false,\n"
8281
+ " \"state\": \"RUNNING\",\n"
83-
+ " \"job-type\": \"STREAMING\",\n"
8482
+ " \"start-time\": 1707893512027,\n"
8583
+ " \"end-time\": -1,\n"
8684
+ " \"duration\": 214716,\n"
@@ -239,7 +237,6 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
239237
+ " \"name\": \"State machine job\",\n"
240238
+ " \"isStoppable\": false,\n"
241239
+ " \"state\": \"RUNNING\",\n"
242-
+ " \"job-type\": \"STREAMING\",\n"
243240
+ " \"start-time\": 1707893512027,\n"
244241
+ " \"end-time\": -1,\n"
245242
+ " \"duration\": 214716,\n"
@@ -360,7 +357,7 @@ public void testJobTopologyParsingThrowsNotReadyException() throws Exception {
360357
@Test
361358
public void testJobTopologyParsingFromJobDetailsWithSlotSharingGroup() throws Exception {
362359
String s =
363-
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"job-type\":\"STREAMING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
360+
"{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1697114719143,\"end-time\":-1,\"duration\":60731,\"maxParallelism\":-1,\"now\":1697114779874,\"timestamps\":{\"CANCELLING\":0,\"INITIALIZING\":1697114719143,\"RUNNING\":1697114719743,\"CANCELED\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"FAILING\":0,\"CREATED\":1697114719343,\"SUSPENDED\":0,\"RECONCILING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Source: Events Generator Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724603,\"end-time\":-1,\"duration\":55271,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1985978,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":92037,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":78319,\"accumulated-busy-time\":13347.0}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"slotSharingGroupId\":\"a9c52ec4c7200ab4bd141cbae8022105\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1697114724639,\"end-time\":-1,\"duration\":55235,\"tasks\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"metrics\":{\"read-bytes\":2019044,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":91881,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":91352,\"accumulated-busy-time\":273.0}}],\"status-counts\":{\"FAILED\":0,\"CANCELED\":0,\"SCHEDULED\":0,\"FINISHED\":0,\"CREATED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"INITIALIZING\":0,\"RUNNING\":2},\"plan\":{\"jid\":\"a1b1b53c7c71e7199aa8c43bc703fe7f\",\"name\":\"basic-example\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"operator_metadata\":[{},{}],\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Events Generator Source<br/>\",\"operator_metadata\":[{}],\"optimizer_properties\":{}}]}}\n";
364361
JobDetailsInfo jobDetailsInfo = new ObjectMapper().readValue(s, JobDetailsInfo.class);
365362

366363
var metricsCollector = new RestApiMetricsCollector();
@@ -375,7 +372,6 @@ public void testJobUpdateTsLogic() {
375372
"",
376373
false,
377374
org.apache.flink.api.common.JobStatus.RUNNING,
378-
JobType.STREAMING,
379375
0,
380376
0,
381377
0,

flink-autoscaler/src/test/java/org/apache/flink/autoscaler/TestingAutoscalerUtils.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import org.apache.flink.configuration.Configuration;
2424
import org.apache.flink.configuration.MemorySize;
2525
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
26-
import org.apache.flink.runtime.jobgraph.JobType;
2726
import org.apache.flink.runtime.metrics.MetricRegistry;
2827
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
2928
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
@@ -97,7 +96,6 @@ public CompletableFuture<JobDetailsInfo> getJobDetails(JobID jobId) {
9796
"",
9897
false,
9998
JobStatus.RUNNING,
100-
JobType.STREAMING,
10199
0,
102100
0,
103101
0,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/observer/SnapshotObserver.java

Lines changed: 24 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
3838
import org.apache.flink.kubernetes.operator.utils.FlinkStateSnapshotUtils;
3939
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
40-
import org.apache.flink.runtime.jobgraph.JobType;
40+
import org.apache.flink.runtime.rest.util.RestClientException;
4141
import org.apache.flink.util.CollectionUtil;
42+
import org.apache.flink.util.ExceptionUtils;
4243

4344
import org.slf4j.Logger;
4445
import org.slf4j.LoggerFactory;
@@ -84,11 +85,6 @@ public void observeSavepointStatus(FlinkResourceContext<CR> ctx) {
8485
var jobStatus = resource.getStatus().getJobStatus();
8586
var jobId = jobStatus.getJobId();
8687

87-
if (isBatchJob(ctx, jobId)) {
88-
LOG.debug("Skipping checkpoint observation for BATCH job");
89-
return;
90-
}
91-
9288
// If any manual or periodic savepoint is in progress, observe it
9389
if (SnapshotUtils.savepointInProgress(jobStatus)) {
9490
observeTriggeredSavepoint(ctx, jobId);
@@ -447,33 +443,33 @@ private long getMaxCountForSnapshotType(
447443
}
448444

449445
private void observeLatestCheckpoint(FlinkResourceContext<CR> ctx, String jobId) {
450-
451446
var status = ctx.getResource().getStatus();
452447
var jobStatus = status.getJobStatus();
453448

454-
ctx.getFlinkService()
455-
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
456-
.ifPresentOrElse(
457-
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
458-
() -> {
459-
if (ReconciliationUtils.isJobCancelled(status)) {
460-
// For cancelled jobs the observed savepoint is always definite,
461-
// so if empty we know the job doesn't have any
462-
// checkpoints/savepoints
463-
jobStatus.setUpgradeSavepointPath(null);
464-
}
465-
});
466-
}
467-
468-
private boolean isBatchJob(FlinkResourceContext<CR> ctx, String jobId) {
469449
try {
470-
var jobDetails =
471-
ctx.getFlinkService()
472-
.getJobDetails(JobID.fromHexString(jobId), ctx.getObserveConfig());
473-
return jobDetails.getJobType() == JobType.BATCH;
450+
ctx.getFlinkService()
451+
.getLastCheckpoint(JobID.fromHexString(jobId), ctx.getObserveConfig())
452+
.ifPresentOrElse(
453+
snapshot -> jobStatus.setUpgradeSavepointPath(snapshot.getLocation()),
454+
() -> {
455+
if (ReconciliationUtils.isJobCancelled(status)) {
456+
// For cancelled jobs the observed savepoint is always definite,
457+
// so if empty we know the job doesn't have any
458+
// checkpoints/savepoints
459+
jobStatus.setUpgradeSavepointPath(null);
460+
}
461+
});
474462
} catch (Exception e) {
475-
LOG.debug("Could not determine job type, assuming streaming job", e);
476-
return false;
463+
if (ExceptionUtils.findThrowable(e, RestClientException.class)
464+
.map(ex -> ex.getMessage().contains("Checkpointing has not been enabled"))
465+
.orElse(false)) {
466+
LOG.warn(
467+
"Checkpointing not enabled for job {}, skipping checkpoint observation",
468+
jobId,
469+
e);
470+
return;
471+
}
472+
throw e;
477473
}
478474
}
479475
}

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,6 @@
7575
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointTriggerRequestBody;
7676
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
7777
import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatisticsHeaders;
78-
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
7978
import org.apache.flink.runtime.rest.messages.job.metrics.JobMetricsHeaders;
8079
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalRequest;
8180
import org.apache.flink.runtime.rest.messages.job.savepoints.SavepointDisposalTriggerHeaders;
@@ -838,15 +837,6 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
838837
(c, e) -> new StandaloneClientHAServices(restServerAddress));
839838
}
840839

841-
@Override
842-
public JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception {
843-
try (var clusterClient = getClusterClient(conf)) {
844-
return clusterClient
845-
.getJobDetails(jobId)
846-
.get(operatorConfig.getFlinkClientTimeout().getSeconds(), TimeUnit.SECONDS);
847-
}
848-
}
849-
850840
@VisibleForTesting
851841
protected void runJar(
852842
JobSpec job,

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/FlinkService.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import org.apache.flink.kubernetes.operator.observer.SavepointFetchResult;
3434
import org.apache.flink.runtime.client.JobStatusMessage;
3535
import org.apache.flink.runtime.jobmaster.JobResult;
36-
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
3736

3837
import io.fabric8.kubernetes.api.model.ObjectMeta;
3938
import io.fabric8.kubernetes.api.model.PodList;
@@ -128,8 +127,6 @@ Map<String, String> getMetrics(Configuration conf, String jobId, List<String> me
128127

129128
RestClusterClient<String> getClusterClient(Configuration conf) throws Exception;
130129

131-
JobDetailsInfo getJobDetails(JobID jobId, Configuration conf) throws Exception;
132-
133130
/** Result of a cancel operation. */
134131
@AllArgsConstructor
135132
class CancelResult {

flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/TestingFlinkService.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
4444
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
4545
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
46+
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
4647
import org.apache.flink.kubernetes.operator.exception.UpgradeFailureException;
4748
import org.apache.flink.kubernetes.operator.observer.CheckpointFetchResult;
4849
import org.apache.flink.kubernetes.operator.observer.CheckpointStatsResult;
@@ -138,6 +139,7 @@ public class TestingFlinkService extends AbstractFlinkService {
138139
@Getter private final Map<String, Boolean> savepointTriggers = new HashMap<>();
139140
@Getter private final Map<String, Boolean> checkpointTriggers = new HashMap<>();
140141
private final Map<Long, String> checkpointStats = new HashMap<>();
142+
@Setter private boolean throwCheckpointingDisabledError = false;
141143

142144
@Getter private int desiredReplicas = 0;
143145
@Getter private int cancelJobCallCount = 0;
@@ -574,6 +576,13 @@ public void disposeSavepoint(String savepointPath, Configuration conf) throws Ex
574576

575577
@Override
576578
public Optional<Savepoint> getLastCheckpoint(JobID jobId, Configuration conf) {
579+
if (throwCheckpointingDisabledError) {
580+
throw new ReconciliationException(
581+
"Could not observe latest savepoint information",
582+
new RestClientException(
583+
"Checkpointing has not been enabled", HttpResponseStatus.BAD_REQUEST));
584+
}
585+
577586
jobs.stream()
578587
.filter(js -> js.f1.getJobId().equals(jobId))
579588
.findAny()

0 commit comments

Comments
 (0)