Skip to content

Commit b8fe7bb

Browse files
committed
Use only cluster client
1 parent d8d25ea commit b8fe7bb

File tree

1 file changed

+10
-62
lines changed

1 file changed

+10
-62
lines changed

flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java

Lines changed: 10 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -854,71 +854,19 @@ public JobExceptionsInfoWithHistory getJobExceptions(
854854
AbstractFlinkResource resource, JobID jobId, Configuration observeConfig)
855855
throws IOException {
856856
JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders.getInstance();
857-
// port is specified at the same place for both session and application jobs
858-
int port = observeConfig.getInteger(RestOptions.PORT);
859-
String host;
860-
// In case of session job, there is a single rest service for all the jobs, while in the
861-
// application mode,
862-
// the rest service is per job.
863-
864-
Configuration operatorRestConf = observeConfig;
865-
if (SecurityOptions.isRestSSLEnabled(observeConfig)) {
866-
operatorRestConf = getOperatorRestConfig(observeConfig);
867-
}
868-
869-
if (resource instanceof FlinkSessionJob) {
870-
// For session jobs, we need to use the deployment name from the spec
871-
FlinkSessionJob sessionJob = (FlinkSessionJob) resource;
872-
final String clusterId = observeConfig.get(KubernetesConfigOptions.CLUSTER_ID);
873-
final String namespace = observeConfig.get(KubernetesConfigOptions.NAMESPACE);
874-
host =
875-
ObjectUtils.firstNonNull(
876-
operatorConfig.getFlinkServiceHostOverride(),
877-
ExternalServiceDecorator.getNamespacedExternalServiceName(
878-
clusterId, namespace));
879-
} else {
880-
// For application jobs (FlinkDeployment), use the resource name directly
881-
FlinkDeployment deployment = (FlinkDeployment) resource;
882-
host =
883-
ObjectUtils.firstNonNull(
884-
operatorConfig.getFlinkServiceHostOverride(),
885-
ExternalServiceDecorator.getNamespacedExternalServiceName(
886-
deployment.getMetadata().getName(),
887-
resource.getMetadata().getNamespace()));
888-
}
889-
890857
JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
891858
params.jobPathParameter.resolve(jobId);
892859

893-
if (resource instanceof FlinkSessionJob) {
894-
try (var clusterClient = getClusterClient(observeConfig)) {
895-
return clusterClient
896-
.sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance())
897-
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
898-
} catch (Exception e) {
899-
LOG.warn(
900-
String.format(
901-
"Failed to fetch job exceptions from REST API for jobId %s", jobId),
902-
e);
903-
return null;
904-
}
905-
} else {
906-
try (var restClient = getRestClient(observeConfig)) {
907-
return restClient
908-
.sendRequest(
909-
host,
910-
port,
911-
jobExceptionsHeaders,
912-
params,
913-
EmptyRequestBody.getInstance())
914-
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
915-
} catch (Exception e) {
916-
LOG.warn(
917-
String.format(
918-
"Failed to fetch job exceptions from REST API for jobId %s", jobId),
919-
e);
920-
return null;
921-
}
860+
try (var clusterClient = getClusterClient(observeConfig)) {
861+
return clusterClient
862+
.sendRequest(jobExceptionsHeaders, params, EmptyRequestBody.getInstance())
863+
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
864+
} catch (Exception e) {
865+
LOG.warn(
866+
String.format(
867+
"Failed to fetch job exceptions from REST API for jobId %s", jobId),
868+
e);
869+
return null;
922870
}
923871
}
924872

0 commit comments

Comments
 (0)