3434@ RequiredArgsConstructor
3535@ Slf4j
3636public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
37- private static final Set <ConnectorActionDTO > RESTART_ACTIONS = Set . of ( RESTART , RESTART_FAILED_TASKS ,
38- RESTART_ALL_TASKS );
37+ private static final Set <ConnectorActionDTO > RESTART_ACTIONS
38+ = Set . of ( RESTART , RESTART_FAILED_TASKS , RESTART_ALL_TASKS );
3939 private static final String CONNECTOR_NAME = "connectorName" ;
4040
4141 private final KafkaConnectService kafkaConnectService ;
4242
4343 @ Override
4444 public Mono <ResponseEntity <Flux <ConnectDTO >>> getConnects (String clusterName ,
45- ServerWebExchange exchange ) {
45+ ServerWebExchange exchange ) {
4646
4747 Flux <ConnectDTO > availableConnects = kafkaConnectService .getConnects (getCluster (clusterName ))
4848 .filterWhen (dto -> accessControlService .isConnectAccessible (dto , clusterName ));
@@ -52,7 +52,7 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
5252
5353 @ Override
5454 public Mono <ResponseEntity <Flux <String >>> getConnectors (String clusterName , String connectName ,
55- ServerWebExchange exchange ) {
55+ ServerWebExchange exchange ) {
5656
5757 var context = AccessContext .builder ()
5858 .cluster (clusterName )
@@ -61,16 +61,14 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6161 .build ();
6262
6363 return validateAccess (context )
64- .thenReturn (
65- ResponseEntity .ok (kafkaConnectService .getConnectorNames (
66- getCluster (clusterName ), connectName )))
64+ .thenReturn (ResponseEntity .ok (kafkaConnectService .getConnectorNames (getCluster (clusterName ), connectName )))
6765 .doOnEach (sig -> audit (context , sig ));
6866 }
6967
7068 @ Override
7169 public Mono <ResponseEntity <ConnectorDTO >> createConnector (String clusterName , String connectName ,
72- @ Valid Mono <NewConnectorDTO > connector ,
73- ServerWebExchange exchange ) {
70+ @ Valid Mono <NewConnectorDTO > connector ,
71+ ServerWebExchange exchange ) {
7472
7573 var context = AccessContext .builder ()
7674 .cluster (clusterName )
@@ -80,14 +78,14 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St
8078
8179 return validateAccess (context ).then (
8280 kafkaConnectService .createConnector (getCluster (clusterName ), connectName , connector )
83- .map (ResponseEntity ::ok ))
84- .doOnEach (sig -> audit (context , sig ));
81+ .map (ResponseEntity ::ok )
82+ ) .doOnEach (sig -> audit (context , sig ));
8583 }
8684
8785 @ Override
8886 public Mono <ResponseEntity <ConnectorDTO >> getConnector (String clusterName , String connectName ,
89- String connectorName ,
90- ServerWebExchange exchange ) {
87+ String connectorName ,
88+ ServerWebExchange exchange ) {
9189
9290 var context = AccessContext .builder ()
9391 .cluster (clusterName )
@@ -97,14 +95,14 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
9795
9896 return validateAccess (context ).then (
9997 kafkaConnectService .getConnector (getCluster (clusterName ), connectName , connectorName )
100- .map (ResponseEntity ::ok ))
101- .doOnEach (sig -> audit (context , sig ));
98+ .map (ResponseEntity ::ok )
99+ ) .doOnEach (sig -> audit (context , sig ));
102100 }
103101
104102 @ Override
105103 public Mono <ResponseEntity <Void >> deleteConnector (String clusterName , String connectName ,
106- String connectorName ,
107- ServerWebExchange exchange ) {
104+ String connectorName ,
105+ ServerWebExchange exchange ) {
108106
109107 var context = AccessContext .builder ()
110108 .cluster (clusterName )
@@ -115,17 +113,19 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con
115113
116114 return validateAccess (context ).then (
117115 kafkaConnectService .deleteConnector (getCluster (clusterName ), connectName , connectorName )
118- .map (ResponseEntity ::ok ))
119- .doOnEach (sig -> audit (context , sig ));
116+ .map (ResponseEntity ::ok )
117+ ) .doOnEach (sig -> audit (context , sig ));
120118 }
121119
120+
122121 @ Override
123122 public Mono <ResponseEntity <Flux <FullConnectorInfoDTO >>> getAllConnectors (
124123 String clusterName ,
125124 String search ,
126125 ConnectorColumnsToSortDTO orderBy ,
127126 SortOrderDTO sortOrder ,
128- ServerWebExchange exchange ) {
127+ ServerWebExchange exchange
128+ ) {
129129 var context = AccessContext .builder ()
130130 .cluster (clusterName )
131131 .operationName ("getAllConnectors" )
@@ -146,9 +146,9 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
146146
147147 @ Override
148148 public Mono <ResponseEntity <Map <String , Object >>> getConnectorConfig (String clusterName ,
149- String connectName ,
150- String connectorName ,
151- ServerWebExchange exchange ) {
149+ String connectName ,
150+ String connectorName ,
151+ ServerWebExchange exchange ) {
152152
153153 var context = AccessContext .builder ()
154154 .cluster (clusterName )
@@ -159,15 +159,15 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust
159159 return validateAccess (context ).then (
160160 kafkaConnectService
161161 .getConnectorConfig (getCluster (clusterName ), connectName , connectorName )
162- .map (ResponseEntity ::ok ))
163- .doOnEach (sig -> audit (context , sig ));
162+ .map (ResponseEntity ::ok )
163+ ) .doOnEach (sig -> audit (context , sig ));
164164 }
165165
166166 @ Override
167167 public Mono <ResponseEntity <ConnectorDTO >> setConnectorConfig (String clusterName , String connectName ,
168- String connectorName ,
169- Mono <Map <String , Object >> requestBody ,
170- ServerWebExchange exchange ) {
168+ String connectorName ,
169+ Mono <Map <String , Object >> requestBody ,
170+ ServerWebExchange exchange ) {
171171
172172 var context = AccessContext .builder ()
173173 .cluster (clusterName )
@@ -177,23 +177,22 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
177177 .build ();
178178
179179 return validateAccess (context ).then (
180- kafkaConnectService
181- .setConnectorConfig (getCluster (clusterName ), connectName , connectorName ,
182- requestBody )
183- .map (ResponseEntity ::ok ))
180+ kafkaConnectService
181+ .setConnectorConfig (getCluster (clusterName ), connectName , connectorName , requestBody )
182+ .map (ResponseEntity ::ok ))
184183 .doOnEach (sig -> audit (context , sig ));
185184 }
186185
187186 @ Override
188187 public Mono <ResponseEntity <Void >> updateConnectorState (String clusterName , String connectName ,
189- String connectorName ,
190- ConnectorActionDTO action ,
191- ServerWebExchange exchange ) {
188+ String connectorName ,
189+ ConnectorActionDTO action ,
190+ ServerWebExchange exchange ) {
192191 ConnectAction [] connectActions ;
193192 if (RESTART_ACTIONS .contains (action )) {
194- connectActions = new ConnectAction [] { ConnectAction .VIEW , ConnectAction .RESTART };
193+ connectActions = new ConnectAction [] {ConnectAction .VIEW , ConnectAction .RESTART };
195194 } else {
196- connectActions = new ConnectAction [] { ConnectAction .VIEW , ConnectAction .EDIT };
195+ connectActions = new ConnectAction [] {ConnectAction .VIEW , ConnectAction .EDIT };
197196 }
198197
199198 var context = AccessContext .builder ()
@@ -205,17 +204,16 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
205204
206205 return validateAccess (context ).then (
207206 kafkaConnectService
208- .updateConnectorState (getCluster (clusterName ), connectName ,
209- connectorName , action )
210- .map (ResponseEntity ::ok ))
211- .doOnEach (sig -> audit (context , sig ));
207+ .updateConnectorState (getCluster (clusterName ), connectName , connectorName , action )
208+ .map (ResponseEntity ::ok )
209+ ).doOnEach (sig -> audit (context , sig ));
212210 }
213211
214212 @ Override
215213 public Mono <ResponseEntity <Flux <TaskDTO >>> getConnectorTasks (String clusterName ,
216- String connectName ,
217- String connectorName ,
218- ServerWebExchange exchange ) {
214+ String connectName ,
215+ String connectorName ,
216+ ServerWebExchange exchange ) {
219217 var context = AccessContext .builder ()
220218 .cluster (clusterName )
221219 .connectActions (connectName , ConnectAction .VIEW )
@@ -226,15 +224,14 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
226224 return validateAccess (context ).thenReturn (
227225 ResponseEntity
228226 .ok (kafkaConnectService
229- .getConnectorTasks (getCluster (clusterName ), connectName ,
230- connectorName )))
231- .doOnEach (sig -> audit (context , sig ));
227+ .getConnectorTasks (getCluster (clusterName ), connectName , connectorName ))
228+ ).doOnEach (sig -> audit (context , sig ));
232229 }
233230
234231 @ Override
235232 public Mono <ResponseEntity <Void >> restartConnectorTask (String clusterName , String connectName ,
236- String connectorName , Integer taskId ,
237- ServerWebExchange exchange ) {
233+ String connectorName , Integer taskId ,
234+ ServerWebExchange exchange ) {
238235
239236 var context = AccessContext .builder ()
240237 .cluster (clusterName )
@@ -245,10 +242,9 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin
245242
246243 return validateAccess (context ).then (
247244 kafkaConnectService
248- .restartConnectorTask (getCluster (clusterName ), connectName ,
249- connectorName , taskId )
250- .map (ResponseEntity ::ok ))
251- .doOnEach (sig -> audit (context , sig ));
245+ .restartConnectorTask (getCluster (clusterName ), connectName , connectorName , taskId )
246+ .map (ResponseEntity ::ok )
247+ ).doOnEach (sig -> audit (context , sig ));
252248 }
253249
254250 @ Override
@@ -264,9 +260,8 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
264260 return validateAccess (context ).then (
265261 Mono .just (
266262 ResponseEntity .ok (
267- kafkaConnectService .getConnectorPlugins (
268- getCluster (clusterName ), connectName ))))
269- .doOnEach (sig -> audit (context , sig ));
263+ kafkaConnectService .getConnectorPlugins (getCluster (clusterName ), connectName )))
264+ ).doOnEach (sig -> audit (context , sig ));
270265 }
271266
272267 @ Override
0 commit comments