11package com .cloudera .cyber .restcli .controller ;
22
33import com .cloudera .cyber .restcli .configuration .AppWorkerConfig ;
4- import com .cloudera .cyber .restcli .service .JobService ;
54import com .cloudera .cyber .restcli .service .FilePipelineService ;
5+ import com .cloudera .cyber .restcli .service .JobService ;
66import com .cloudera .service .common .Utils ;
77import com .cloudera .service .common .request .RequestBody ;
88import com .cloudera .service .common .request .RequestType ;
99import com .cloudera .service .common .response .ClusterMeta ;
1010import com .cloudera .service .common .response .Job ;
1111import com .cloudera .service .common .response .ResponseBody ;
1212import com .cloudera .service .common .response .ResponseType ;
13+ import java .io .IOException ;
14+ import java .util .Collections ;
15+ import java .util .List ;
1316import lombok .RequiredArgsConstructor ;
1417import lombok .extern .slf4j .Slf4j ;
1518import org .springframework .kafka .annotation .KafkaListener ;
2225import org .springframework .messaging .support .MessageHeaderAccessor ;
2326import org .springframework .stereotype .Component ;
2427
25- import java .io .IOException ;
26- import java .util .Collections ;
27- import java .util .List ;
28-
2928@ Component
3029@ RequiredArgsConstructor
3130@ Slf4j
@@ -37,7 +36,9 @@ public class KafkaListenerController {
3736 //TODO: Rewrite to Spring events. Probably split the events into separate types, such as cluster event, job event, pipeline event, etc.
3837 @ KafkaListener (topics = "#{kafkaProperties.getRequestTopic()}" , containerFactory = "kafkaListenerContainerFactory" )
3938 @ SendTo ({"#{kafkaProperties.getReplyTopic()}" })
40- public Message <ResponseBody > handleMessage (RequestBody requestBody , @ Header (KafkaHeaders .RECEIVED_MESSAGE_KEY ) String key , @ Header (KafkaHeaders .REPLY_TOPIC ) byte [] replyTo ,
39+ public Message <ResponseBody > handleMessage (RequestBody requestBody ,
40+ @ Header (KafkaHeaders .RECEIVED_MESSAGE_KEY ) String key ,
41+ @ Header (KafkaHeaders .REPLY_TOPIC ) byte [] replyTo ,
4142 @ Header (KafkaHeaders .CORRELATION_ID ) byte [] correlationId ) {
4243 log .info ("Start processing message\n Message key: '{}' \n value: '{}'" , key , requestBody );
4344
@@ -53,18 +54,19 @@ public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(Kafk
5354 try {
5455 Job job = jobService .restartJob (requestBody .getJobIdHex ());
5556 ResponseBody responseBody = ResponseBody .builder ()
56- .jobs (Collections .singletonList (job ))
57- .build ();
58- return buildResponseMessage (responseBody , ResponseType .RESTART_JOB_RESPONSE , replyTo , correlationId );
57+ .jobs (Collections .singletonList (job ))
58+ .build ();
59+ return buildResponseMessage (responseBody , ResponseType .RESTART_JOB_RESPONSE , replyTo ,
60+ correlationId );
5961 } catch (IOException e ) {
6062 return handleErrorResponse (e , replyTo , correlationId );
6163 }
6264 case STOP_JOB_REQUEST :
6365 try {
6466 Job job = jobService .stopJob (requestBody .getJobIdHex ());
6567 ResponseBody responseBody = ResponseBody .builder ()
66- .jobs (Collections .singletonList (job ))
67- .build ();
68+ .jobs (Collections .singletonList (job ))
69+ .build ();
6870 return buildResponseMessage (responseBody , ResponseType .STOP_JOB_RESPONSE , replyTo , correlationId );
6971 } catch (IOException e ) {
7072 return handleErrorResponse (e , replyTo , correlationId );
@@ -75,46 +77,53 @@ public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(Kafk
7577 try {
7678 jobService .updateConfig (requestBody .getPayload ());
7779 final ResponseBody responseBody = ResponseBody .builder ().build ();
78- return buildResponseMessage (responseBody , ResponseType .UPDATE_JOB_CONFIG_RESPONSE , replyTo , correlationId );
80+ return buildResponseMessage (responseBody , ResponseType .UPDATE_JOB_CONFIG_RESPONSE , replyTo ,
81+ correlationId );
7982 } catch (IOException e ) {
8083 return handleErrorResponse (e , replyTo , correlationId );
8184 }
8285 case CREATE_EMPTY_PIPELINE :
8386 try {
8487 pipelineService .createEmptyPipeline (requestBody .getPipelineName (), requestBody .getBranch ());
8588 final ResponseBody responseBody = ResponseBody .builder ().build ();
86- return buildResponseMessage (responseBody , ResponseType .CREATE_EMPTY_PIPELINE_RESPONSE , replyTo , correlationId );
89+ return buildResponseMessage (responseBody , ResponseType .CREATE_EMPTY_PIPELINE_RESPONSE , replyTo ,
90+ correlationId );
8791 } catch (Exception e ) {
8892 return handleErrorResponse (e , replyTo , correlationId );
8993 }
9094 case START_PIPELINE :
9195 try {
92- pipelineService .extractPipeline (requestBody .getPayload (), requestBody .getPipelineName (), requestBody .getBranch ());
93- pipelineService .startPipelineJob (requestBody .getPipelineName (), requestBody .getBranch (), requestBody .getProfileName (), requestBody .getProfileName (), requestBody .getJobs ());
96+ pipelineService .extractPipeline (requestBody .getPayload (), requestBody .getPipelineName (),
97+ requestBody .getBranch ());
98+ pipelineService .startPipelineJob (requestBody .getPipelineName (), requestBody .getBranch (),
99+ requestBody .getProfileName (), requestBody .getProfileName (), requestBody .getJobs ());
94100 final ResponseBody responseBody = ResponseBody .builder ().build ();
95- return buildResponseMessage (responseBody , ResponseType .START_PIPELINE_RESPONSE , replyTo , correlationId );
101+ return buildResponseMessage (responseBody , ResponseType .START_PIPELINE_RESPONSE , replyTo ,
102+ correlationId );
96103 } catch (Exception e ) {
97104 log .error ("Exception while processing the Start All request {}" , e .getMessage ());
98105 return handleErrorResponse (e , replyTo , correlationId );
99106
100107 }
101-
108+ default :
109+ return null ;
102110 }
103111 return null ;
104112 }
105113
106- private Message <ResponseBody > getResponseBodyMessage (byte [] replyTo , byte [] correlationId , ResponseType responseType ) {
114+ private Message <ResponseBody > getResponseBodyMessage (byte [] replyTo , byte [] correlationId ,
115+ ResponseType responseType ) {
107116 try {
108117 List <Job > jobs = jobService .getJobs ();
109118 ResponseBody responseBody = ResponseBody .builder ()
110- .jobs (jobs )
111- .clusterMeta (ClusterMeta .builder ()
112- .name (config .getName ())
113- .clusterId (config .getId ())
114- .clusterStatus (config .getStatus ())
115- .version (config .getVersion ())
116- .build ())
117- .build ();
119+ .jobs (jobs )
120+ .clusterMeta (ClusterMeta .builder ()
121+ .name (config .getName ())
122+ .clusterId (config .getId ())
123+ .clusterStatus (config .getStatus ())
124+ .version (config .getVersion ())
125+ .build ())
126+ .build ();
118127 return buildResponseMessage (responseBody , responseType , replyTo , correlationId );
119128 } catch (IOException e ) {
120129 return handleErrorResponse (e , replyTo , correlationId );
@@ -123,12 +132,14 @@ private Message<ResponseBody> getResponseBodyMessage(byte[] replyTo, byte[] corr
123132
124133 private Message <ResponseBody > handleErrorResponse (Exception e , byte [] replyTo , byte [] correlationId ) {
125134 ResponseBody responseBody = ResponseBody .builder ()
126- .errorMessage (Collections .singletonMap (e .getClass ().toString (), e .getMessage ()))
127- .build ();
135+ .errorMessage (
136+ Collections .singletonMap (e .getClass ().toString (), e .getMessage ()))
137+ .build ();
128138 return buildResponseMessage (responseBody , ResponseType .ERROR_RESPONSE , replyTo , correlationId );
129139 }
130140
131- private Message <ResponseBody > buildResponseMessage (ResponseBody body , ResponseType responseType , byte [] replyTo , byte [] correlationId ) {
141+ private Message <ResponseBody > buildResponseMessage (ResponseBody body , ResponseType responseType , byte [] replyTo ,
142+ byte [] correlationId ) {
132143 MessageHeaderAccessor accessor = new MessageHeaderAccessor ();
133144 accessor .setHeader (KafkaHeaders .MESSAGE_KEY , responseType .name ());
134145 accessor .setHeader (KafkaHeaders .CORRELATION_ID , correlationId );
0 commit comments