@@ -851,32 +851,74 @@ public RestClusterClient<String> getClusterClient(Configuration conf) throws Exc
851851
852852 @ Override
853853 public JobExceptionsInfoWithHistory getJobExceptions (
854- AbstractFlinkResource resource , JobID jobId , Configuration observeConfig ) {
854+ AbstractFlinkResource resource , JobID jobId , Configuration observeConfig )
855+ throws IOException {
855856 JobExceptionsHeaders jobExceptionsHeaders = JobExceptionsHeaders .getInstance ();
857+ // port is specified at the same place for both session and application jobs
856858 int port = observeConfig .getInteger (RestOptions .PORT );
857- String host =
858- ObjectUtils .firstNonNull (
859- operatorConfig .getFlinkServiceHostOverride (),
860- ExternalServiceDecorator .getNamespacedExternalServiceName (
861- resource .getMetadata ().getName (),
862- resource .getMetadata ().getNamespace ()));
859+ String host ;
860+ // In case of session job, there is a single rest service for all the jobs, while in the
861+ // application mode,
862+ // the rest service is per job.
863+
864+ Configuration operatorRestConf = observeConfig ;
865+ if (SecurityOptions .isRestSSLEnabled (observeConfig )) {
866+ operatorRestConf = getOperatorRestConfig (observeConfig );
867+ }
868+
869+ if (resource instanceof FlinkSessionJob ) {
870+ // For session jobs, we need to use the deployment name from the spec
871+ FlinkSessionJob sessionJob = (FlinkSessionJob ) resource ;
872+ final String clusterId = observeConfig .get (KubernetesConfigOptions .CLUSTER_ID );
873+ final String namespace = observeConfig .get (KubernetesConfigOptions .NAMESPACE );
874+ host =
875+ ObjectUtils .firstNonNull (
876+ operatorConfig .getFlinkServiceHostOverride (),
877+ ExternalServiceDecorator .getNamespacedExternalServiceName (
878+ clusterId , namespace ));
879+ } else {
880+ // For application jobs (FlinkDeployment), use the resource name directly
881+ FlinkDeployment deployment = (FlinkDeployment ) resource ;
882+ host =
883+ ObjectUtils .firstNonNull (
884+ operatorConfig .getFlinkServiceHostOverride (),
885+ ExternalServiceDecorator .getNamespacedExternalServiceName (
886+ deployment .getMetadata ().getName (),
887+ resource .getMetadata ().getNamespace ()));
888+ }
889+
863890 JobExceptionsMessageParameters params = new JobExceptionsMessageParameters ();
864891 params .jobPathParameter .resolve (jobId );
865- try (var restClient = getRestClient (observeConfig )) {
866- return restClient
867- .sendRequest (
868- host ,
869- port ,
870- jobExceptionsHeaders ,
871- params ,
872- EmptyRequestBody .getInstance ())
873- .get (operatorConfig .getFlinkClientTimeout ().toSeconds (), TimeUnit .SECONDS );
874- } catch (Exception e ) {
875- LOG .warn (
876- String .format (
877- "Failed to fetch job exceptions from REST API for jobId %s" , jobId ),
878- e );
879- return null ;
892+
893+ if (resource instanceof FlinkSessionJob ) {
894+ try (var clusterClient = getClusterClient (observeConfig )) {
895+ return clusterClient
896+ .sendRequest (jobExceptionsHeaders , params , EmptyRequestBody .getInstance ())
897+ .get (operatorConfig .getFlinkClientTimeout ().toSeconds (), TimeUnit .SECONDS );
898+ } catch (Exception e ) {
899+ LOG .warn (
900+ String .format (
901+ "Failed to fetch job exceptions from REST API for jobId %s" , jobId ),
902+ e );
903+ return null ;
904+ }
905+ } else {
906+ try (var restClient = getRestClient (observeConfig )) {
907+ return restClient
908+ .sendRequest (
909+ host ,
910+ port ,
911+ jobExceptionsHeaders ,
912+ params ,
913+ EmptyRequestBody .getInstance ())
914+ .get (operatorConfig .getFlinkClientTimeout ().toSeconds (), TimeUnit .SECONDS );
915+ } catch (Exception e ) {
916+ LOG .warn (
917+ String .format (
918+ "Failed to fetch job exceptions from REST API for jobId %s" , jobId ),
919+ e );
920+ return null ;
921+ }
880922 }
881923 }
882924
0 commit comments