2424import lombok .extern .log4j .Log4j2 ;
2525import org .openapitools .jackson .nullable .JsonNullableModule ;
2626import org .testng .Assert ;
27+
2728import java .io .File ;
2829import java .io .IOException ;
2930import java .nio .file .Path ;
3435import java .util .Locale ;
3536import java .util .concurrent .ExecutionException ;
3637import java .util .stream .Collectors ;
38+
3739import static java .time .Duration .ofMinutes ;
3840import static lombok .Lombok .sneakyThrow ;
3941
@@ -44,7 +46,7 @@ public class CLI {
4446 private static final Duration DEFAULT_TIMEOUT = ofMinutes (3 );
4547
4648 private static final String CLUSTER_CAPACITY_EXHAUSTED_CODE = "KAFKAS-MGMT-24" ;
47-
49+
4850 private static final Locale LOCALE_EN = Locale .ENGLISH ;
4951
5052 private final String workdir ;
@@ -161,6 +163,15 @@ public KafkaRequestList listKafka() throws CliGenericException {
161163 .getObjectValue (KafkaRequestList ::createFromDiscriminatorValue );
162164 }
163165
166+ public void UpdateKafkaOwner (String userName , String instanceName ) throws CliGenericException {
167+ retry (() -> exec ("kafka" , "update" , "--owner" , userName , "--name" , instanceName , "-y" ));
168+ }
169+
170+ public void UpdateKafkaReauthentication (String newStatus , String instanceName ) throws CliGenericException {
171+ retry (() -> exec ("kafka" , "update" , "--reauthentication" , newStatus .toLowerCase (LOCALE_EN ), "--name" ,
172+ instanceName , "-y" ));
173+ }
174+
164175 public KafkaRequestList searchKafkaByName (String name ) throws CliGenericException {
165176 return retry (() -> exec ("kafka" , "list" , "--search" , name , "-o" , "json" ))
166177 .parseNodeFromProcessOutput ()
@@ -175,8 +186,8 @@ public ServiceAccountData describeServiceAccount(String id) throws CliGenericExc
175186
176187 public List <ServiceAccountData > listServiceAccount () throws CliGenericException {
177188 return retry (() -> exec ("service-account" , "list" , "-o" , "json" ))
178- .parseNodeFromProcessOutput ()
179- .getCollectionOfObjectValues (ServiceAccountData ::createFromDiscriminatorValue );
189+ .parseNodeFromProcessOutput ()
190+ .getCollectionOfObjectValues (ServiceAccountData ::createFromDiscriminatorValue );
180191 }
181192
182193 public void deleteServiceAccount (String id ) throws CliGenericException {
@@ -252,6 +263,7 @@ private ACLEntityType(String name, String flag) {
252263 this .flag = flag ;
253264 }
254265 }
266+
255267 //// kafka acl create
256268 public void createAcl (ACLEntityType aclEntityType , String entityIdentificator , AclOperation operation , AclPermissionType permission , String topic ) throws CliGenericException {
257269 retry (() -> exec ("kafka" , "acl" , "create" , "-y" , aclEntityType .flag , entityIdentificator , "--topic" , topic , "--permission" , permission .toString ().toLowerCase (LOCALE_EN ), "--operation" , operation .toString ().toLowerCase (LOCALE_EN )));
@@ -298,7 +310,7 @@ public Registry createServiceRegistry(String name) throws CliGenericException {
298310 .getObjectValue (Registry ::createFromDiscriminatorValue );
299311 }
300312
301- public Registry describeServiceRegistry (String id ) throws CliGenericException {
313+ public Registry describeServiceRegistry (String id ) throws CliGenericException {
302314 return retry (() -> exec ("service-registry" , "describe" , "--id" , id ))
303315 .parseNodeFromProcessOutput ()
304316 .getObjectValue (Registry ::createFromDiscriminatorValue );
@@ -326,43 +338,43 @@ public void deleteServiceRegistry(String name) throws CliGenericException {
326338
327339 public List <Record > consumeRecords (String topicName , String instanceId , int partition , int offset ) throws CliGenericException , JsonProcessingException {
328340 List <String > cmd = List .of ("kafka" , "topic" , "consume" ,
329- "--instance-id" , instanceId ,
330- "--name" , topicName ,
331- "--offset" , Integer .toString (offset ),
332- "--partition" , Integer .toString (partition ),
333- "--format" , "json"
341+ "--instance-id" , instanceId ,
342+ "--name" , topicName ,
343+ "--offset" , Integer .toString (offset ),
344+ "--partition" , Integer .toString (partition ),
345+ "--format" , "json"
334346 );
335347
336348 return consumeRecords (cmd );
337349 }
338350
339351 public List <Record > consumeRecords (String topicName , String instanceId , int partition ) throws CliGenericException , JsonProcessingException {
340352 List <String > cmd = List .of ("kafka" , "topic" , "consume" ,
341- "--instance-id" , instanceId ,
342- "--name" , topicName ,
343- "--partition" , Integer .toString (partition ),
344- "--format" , "json"
353+ "--instance-id" , instanceId ,
354+ "--name" , topicName ,
355+ "--partition" , Integer .toString (partition ),
356+ "--format" , "json"
345357 );
346358
347359 return consumeRecords (cmd );
348360 }
349361
350362 public Record produceRecords (String topicName , String instanceId , String message , int partition , String recordKey )
351- throws InterruptedException , ExecutionException , IOException {
363+ throws InterruptedException , ExecutionException , IOException {
352364 List <String > cmd = List .of ("kafka" , "topic" , "produce" ,
353- "--instance-id" , instanceId ,
354- "--name" , topicName ,
355- "--partition" , Integer .toString (partition ),
356- "--key" , recordKey
365+ "--instance-id" , instanceId ,
366+ "--name" , topicName ,
367+ "--partition" , Integer .toString (partition ),
368+ "--key" , recordKey
357369 );
358370 return produceRecords (message , cmd );
359371 }
360372
361373 public Record produceRecords (String topicName , String instanceId , String message )
362- throws IOException , ExecutionException , InterruptedException {
374+ throws IOException , ExecutionException , InterruptedException {
363375 List <String > cmd = List .of ("kafka" , "topic" , "produce" ,
364- "--instance-id" , instanceId ,
365- "--name" , topicName
376+ "--instance-id" , instanceId ,
377+ "--name" , topicName
366378 );
367379 return produceRecords (message , cmd );
368380 }
@@ -395,20 +407,20 @@ private List<Record> consumeRecords(List<String> cmd) throws CliGenericException
395407 if (output .isEmpty ()) {
396408 return new ArrayList <Record >();
397409 }
398-
410+
399411 // specific separated JSON objects \n}\n which is separator of multiple inline jsons
400412 String [] lines = output .split ("\n \\ }\n " );
401413 // append back '}' (i.e. curly bracket) so JSON objects will not miss this end symbol
402- List <String > messagesWithFixedFormat = Arrays .stream (lines ).map (in -> in + "}" ).collect (Collectors .toList ());
414+ List <String > messagesWithFixedFormat = Arrays .stream (lines ).map (in -> in + "}" ).collect (Collectors .toList ());
403415
404- var objectMapper = new ObjectMapper ()
405- .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false )
406- .registerModule (new JavaTimeModule ())
407- .registerModule (new JsonNullableModule ());
416+ var objectMapper = new ObjectMapper ()
417+ .configure (DeserializationFeature .FAIL_ON_UNKNOWN_PROPERTIES , false )
418+ .registerModule (new JavaTimeModule ())
419+ .registerModule (new JsonNullableModule ());
408420 List <Record > records = new ArrayList <>();
409421
410422 // each object is read as separated Record
411- for (String line : messagesWithFixedFormat ) {
423+ for (String line : messagesWithFixedFormat ) {
412424 Record record = objectMapper .readValue (line , Record .class );
413425 records .add (record );
414426 }
@@ -421,7 +433,7 @@ private <T, E extends Throwable> T retry(ThrowingSupplier<T, E> call) throws E {
421433
422434 private <T , E extends Throwable > T retryKafkaCreation (ThrowingSupplier <T , E > call ) throws E {
423435 return RetryUtils .retry (
424- 1 , null , call , CLI ::retryConditionKafkaCreation , 12 , Duration .ofSeconds (10 ));
436+ 1 , null , call , CLI ::retryConditionKafkaCreation , 12 , Duration .ofSeconds (10 ));
425437 }
426438
427439 private static boolean retryConditionKafkaCreation (Throwable t ) {
0 commit comments