3434@ RequiredArgsConstructor
3535@ Slf4j
3636public class KafkaConnectController extends AbstractController implements KafkaConnectApi {
37- private static final Set <ConnectorActionDTO > RESTART_ACTIONS
38- = Set . of ( RESTART , RESTART_FAILED_TASKS , RESTART_ALL_TASKS );
37+ private static final Set <ConnectorActionDTO > RESTART_ACTIONS = Set . of ( RESTART , RESTART_FAILED_TASKS ,
38+ RESTART_ALL_TASKS );
3939 private static final String CONNECTOR_NAME = "connectorName" ;
4040
4141 private final KafkaConnectService kafkaConnectService ;
4242
4343 @ Override
4444 public Mono <ResponseEntity <Flux <ConnectDTO >>> getConnects (String clusterName ,
45- ServerWebExchange exchange ) {
45+ ServerWebExchange exchange ) {
4646
4747 Flux <ConnectDTO > availableConnects = kafkaConnectService .getConnects (getCluster (clusterName ))
4848 .filterWhen (dto -> accessControlService .isConnectAccessible (dto , clusterName ));
@@ -52,7 +52,7 @@ public Mono<ResponseEntity<Flux<ConnectDTO>>> getConnects(String clusterName,
5252
5353 @ Override
5454 public Mono <ResponseEntity <Flux <String >>> getConnectors (String clusterName , String connectName ,
55- ServerWebExchange exchange ) {
55+ ServerWebExchange exchange ) {
5656
5757 var context = AccessContext .builder ()
5858 .cluster (clusterName )
@@ -61,14 +61,15 @@ public Mono<ResponseEntity<Flux<String>>> getConnectors(String clusterName, Stri
6161 .build ();
6262
6363 return validateAccess (context )
64- .thenReturn (ResponseEntity .ok (kafkaConnectService .getConnectorNames (getCluster (clusterName ), connectName )))
64+ .thenReturn (
65+ ResponseEntity .ok (kafkaConnectService .getConnectorNames (getCluster (clusterName ), connectName )))
6566 .doOnEach (sig -> audit (context , sig ));
6667 }
6768
6869 @ Override
6970 public Mono <ResponseEntity <ConnectorDTO >> createConnector (String clusterName , String connectName ,
70- @ Valid Mono <NewConnectorDTO > connector ,
71- ServerWebExchange exchange ) {
71+ @ Valid Mono <NewConnectorDTO > connector ,
72+ ServerWebExchange exchange ) {
7273
7374 var context = AccessContext .builder ()
7475 .cluster (clusterName )
@@ -78,14 +79,14 @@ public Mono<ResponseEntity<ConnectorDTO>> createConnector(String clusterName, St
7879
7980 return validateAccess (context ).then (
8081 kafkaConnectService .createConnector (getCluster (clusterName ), connectName , connector )
81- .map (ResponseEntity ::ok )
82- ) .doOnEach (sig -> audit (context , sig ));
82+ .map (ResponseEntity ::ok ))
83+ .doOnEach (sig -> audit (context , sig ));
8384 }
8485
8586 @ Override
8687 public Mono <ResponseEntity <ConnectorDTO >> getConnector (String clusterName , String connectName ,
87- String connectorName ,
88- ServerWebExchange exchange ) {
88+ String connectorName ,
89+ ServerWebExchange exchange ) {
8990
9091 var context = AccessContext .builder ()
9192 .cluster (clusterName )
@@ -95,14 +96,14 @@ public Mono<ResponseEntity<ConnectorDTO>> getConnector(String clusterName, Strin
9596
9697 return validateAccess (context ).then (
9798 kafkaConnectService .getConnector (getCluster (clusterName ), connectName , connectorName )
98- .map (ResponseEntity ::ok )
99- ) .doOnEach (sig -> audit (context , sig ));
99+ .map (ResponseEntity ::ok ))
100+ .doOnEach (sig -> audit (context , sig ));
100101 }
101102
102103 @ Override
103104 public Mono <ResponseEntity <Void >> deleteConnector (String clusterName , String connectName ,
104- String connectorName ,
105- ServerWebExchange exchange ) {
105+ String connectorName ,
106+ ServerWebExchange exchange ) {
106107
107108 var context = AccessContext .builder ()
108109 .cluster (clusterName )
@@ -113,19 +114,17 @@ public Mono<ResponseEntity<Void>> deleteConnector(String clusterName, String con
113114
114115 return validateAccess (context ).then (
115116 kafkaConnectService .deleteConnector (getCluster (clusterName ), connectName , connectorName )
116- .map (ResponseEntity ::ok )
117- ) .doOnEach (sig -> audit (context , sig ));
117+ .map (ResponseEntity ::ok ))
118+ .doOnEach (sig -> audit (context , sig ));
118119 }
119120
120-
121121 @ Override
122122 public Mono <ResponseEntity <Flux <FullConnectorInfoDTO >>> getAllConnectors (
123123 String clusterName ,
124124 String search ,
125125 ConnectorColumnsToSortDTO orderBy ,
126126 SortOrderDTO sortOrder ,
127- ServerWebExchange exchange
128- ) {
127+ ServerWebExchange exchange ) {
129128 var context = AccessContext .builder ()
130129 .cluster (clusterName )
131130 .operationName ("getAllConnectors" )
@@ -145,9 +144,9 @@ public Mono<ResponseEntity<Flux<FullConnectorInfoDTO>>> getAllConnectors(
145144
146145 @ Override
147146 public Mono <ResponseEntity <Map <String , Object >>> getConnectorConfig (String clusterName ,
148- String connectName ,
149- String connectorName ,
150- ServerWebExchange exchange ) {
147+ String connectName ,
148+ String connectorName ,
149+ ServerWebExchange exchange ) {
151150
152151 var context = AccessContext .builder ()
153152 .cluster (clusterName )
@@ -158,15 +157,15 @@ public Mono<ResponseEntity<Map<String, Object>>> getConnectorConfig(String clust
158157 return validateAccess (context ).then (
159158 kafkaConnectService
160159 .getConnectorConfig (getCluster (clusterName ), connectName , connectorName )
161- .map (ResponseEntity ::ok )
162- ) .doOnEach (sig -> audit (context , sig ));
160+ .map (ResponseEntity ::ok ))
161+ .doOnEach (sig -> audit (context , sig ));
163162 }
164163
165164 @ Override
166165 public Mono <ResponseEntity <ConnectorDTO >> setConnectorConfig (String clusterName , String connectName ,
167- String connectorName ,
168- Mono <Map <String , Object >> requestBody ,
169- ServerWebExchange exchange ) {
166+ String connectorName ,
167+ Mono <Map <String , Object >> requestBody ,
168+ ServerWebExchange exchange ) {
170169
171170 var context = AccessContext .builder ()
172171 .cluster (clusterName )
@@ -176,22 +175,22 @@ public Mono<ResponseEntity<ConnectorDTO>> setConnectorConfig(String clusterName,
176175 .build ();
177176
178177 return validateAccess (context ).then (
179- kafkaConnectService
180- .setConnectorConfig (getCluster (clusterName ), connectName , connectorName , requestBody )
181- .map (ResponseEntity ::ok ))
178+ kafkaConnectService
179+ .setConnectorConfig (getCluster (clusterName ), connectName , connectorName , requestBody )
180+ .map (ResponseEntity ::ok ))
182181 .doOnEach (sig -> audit (context , sig ));
183182 }
184183
185184 @ Override
186185 public Mono <ResponseEntity <Void >> updateConnectorState (String clusterName , String connectName ,
187- String connectorName ,
188- ConnectorActionDTO action ,
189- ServerWebExchange exchange ) {
186+ String connectorName ,
187+ ConnectorActionDTO action ,
188+ ServerWebExchange exchange ) {
190189 ConnectAction [] connectActions ;
191190 if (RESTART_ACTIONS .contains (action )) {
192- connectActions = new ConnectAction [] {ConnectAction .VIEW , ConnectAction .RESTART };
191+ connectActions = new ConnectAction [] { ConnectAction .VIEW , ConnectAction .RESTART };
193192 } else {
194- connectActions = new ConnectAction [] {ConnectAction .VIEW , ConnectAction .EDIT };
193+ connectActions = new ConnectAction [] { ConnectAction .VIEW , ConnectAction .EDIT };
195194 }
196195
197196 var context = AccessContext .builder ()
@@ -204,15 +203,15 @@ public Mono<ResponseEntity<Void>> updateConnectorState(String clusterName, Strin
204203 return validateAccess (context ).then (
205204 kafkaConnectService
206205 .updateConnectorState (getCluster (clusterName ), connectName , connectorName , action )
207- .map (ResponseEntity ::ok )
208- ) .doOnEach (sig -> audit (context , sig ));
206+ .map (ResponseEntity ::ok ))
207+ .doOnEach (sig -> audit (context , sig ));
209208 }
210209
211210 @ Override
212211 public Mono <ResponseEntity <Flux <TaskDTO >>> getConnectorTasks (String clusterName ,
213- String connectName ,
214- String connectorName ,
215- ServerWebExchange exchange ) {
212+ String connectName ,
213+ String connectorName ,
214+ ServerWebExchange exchange ) {
216215 var context = AccessContext .builder ()
217216 .cluster (clusterName )
218217 .connectActions (connectName , ConnectAction .VIEW )
@@ -223,14 +222,14 @@ public Mono<ResponseEntity<Flux<TaskDTO>>> getConnectorTasks(String clusterName,
223222 return validateAccess (context ).thenReturn (
224223 ResponseEntity
225224 .ok (kafkaConnectService
226- .getConnectorTasks (getCluster (clusterName ), connectName , connectorName ))
227- ) .doOnEach (sig -> audit (context , sig ));
225+ .getConnectorTasks (getCluster (clusterName ), connectName , connectorName )))
226+ .doOnEach (sig -> audit (context , sig ));
228227 }
229228
230229 @ Override
231230 public Mono <ResponseEntity <Void >> restartConnectorTask (String clusterName , String connectName ,
232- String connectorName , Integer taskId ,
233- ServerWebExchange exchange ) {
231+ String connectorName , Integer taskId ,
232+ ServerWebExchange exchange ) {
234233
235234 var context = AccessContext .builder ()
236235 .cluster (clusterName )
@@ -242,8 +241,8 @@ public Mono<ResponseEntity<Void>> restartConnectorTask(String clusterName, Strin
242241 return validateAccess (context ).then (
243242 kafkaConnectService
244243 .restartConnectorTask (getCluster (clusterName ), connectName , connectorName , taskId )
245- .map (ResponseEntity ::ok )
246- ) .doOnEach (sig -> audit (context , sig ));
244+ .map (ResponseEntity ::ok ))
245+ .doOnEach (sig -> audit (context , sig ));
247246 }
248247
249248 @ Override
@@ -259,8 +258,8 @@ public Mono<ResponseEntity<Flux<ConnectorPluginDTO>>> getConnectorPlugins(
259258 return validateAccess (context ).then (
260259 Mono .just (
261260 ResponseEntity .ok (
262- kafkaConnectService .getConnectorPlugins (getCluster (clusterName ), connectName )))
263- ) .doOnEach (sig -> audit (context , sig ));
261+ kafkaConnectService .getConnectorPlugins (getCluster (clusterName ), connectName ))))
262+ .doOnEach (sig -> audit (context , sig ));
264263 }
265264
266265 @ Override
@@ -285,4 +284,26 @@ private Comparator<FullConnectorInfoDTO> getConnectorsComparator(ConnectorColumn
285284 default -> defaultComparator ;
286285 };
287286 }
287+
288+ @ Override
289+ public Mono <ResponseEntity <Void >> resetConnectorOffsets (String clusterName , String connectName ,
290+ String connectorName ,
291+ ServerWebExchange exchange ) {
292+ ConnectAction [] connectActions ;
293+
294+ connectActions = new ConnectAction [] { ConnectAction .VIEW , ConnectAction .RESET_OFFSETS };
295+
296+ var context = AccessContext .builder ()
297+ .cluster (clusterName )
298+ .connectActions (connectName , connectActions )
299+ .operationName ("resetConnectorOffsets" )
300+ .operationParams (Map .of (CONNECTOR_NAME , connectorName ))
301+ .build ();
302+
303+ return validateAccess (context ).then (
304+ kafkaConnectService
305+ .resetConnectorOffsets (getCluster (clusterName ), connectName , connectorName )
306+ .map (ResponseEntity ::ok ))
307+ .doOnEach (sig -> audit (context , sig ));
308+ }
288309}
0 commit comments