@@ -54,21 +54,16 @@ class CSA extends AbstractConsumerSeekAware {
5454 var first = new AtomicBoolean (true );
5555 var map1 = new LinkedHashMap <>(Map .of (new TopicPartition ("foo" , 0 ), 0L , new TopicPartition ("foo" , 1 ), 0L ));
5656 var map2 = new LinkedHashMap <>(Map .of (new TopicPartition ("foo" , 2 ), 0L , new TopicPartition ("foo" , 3 ), 0L ));
57- var register = new Callable <Void >() {
58-
59- @ Override
60- public Void call () {
61- if (first .getAndSet (false )) {
62- csa .registerSeekCallback (cb1 );
63- csa .onPartitionsAssigned (map1 , null );
64- }
65- else {
66- csa .registerSeekCallback (cb2 );
67- csa .onPartitionsAssigned (map2 , null );
68- }
69- return null ;
57+ var register = (Callable <Void >) () -> {
58+ if (first .getAndSet (false )) {
59+ csa .registerSeekCallback (cb1 );
60+ csa .onPartitionsAssigned (map1 , null );
7061 }
71-
62+ else {
63+ csa .registerSeekCallback (cb2 );
64+ csa .onPartitionsAssigned (map2 , null );
65+ }
66+ return null ;
7267 };
7368 exec1 .submit (register ).get ();
7469 exec2 .submit (register ).get ();
@@ -81,19 +76,14 @@ public Void call() {
8176 csa .seekToTimestamp (42L );
8277 verify (cb1 ).seekToTimestamp (new LinkedList <>(map1 .keySet ()), 42L );
8378 verify (cb2 ).seekToTimestamp (new LinkedList <>(map2 .keySet ()), 42L );
84- var revoke1 = new Callable <Void >() {
85-
86- @ Override
87- public Void call () {
88- if (!first .getAndSet (true )) {
89- csa .onPartitionsRevoked (Collections .singletonList (map1 .keySet ().iterator ().next ()));
90- }
91- else {
92- csa .onPartitionsRevoked (Collections .singletonList (map2 .keySet ().iterator ().next ()));
93- }
94- return null ;
79+ var revoke1 = (Callable <Void >) () -> {
80+ if (!first .getAndSet (true )) {
81+ csa .onPartitionsRevoked (Collections .singletonList (map1 .keySet ().iterator ().next ()));
9582 }
96-
83+ else {
84+ csa .onPartitionsRevoked (Collections .singletonList (map2 .keySet ().iterator ().next ()));
85+ }
86+ return null ;
9787 };
9888 exec1 .submit (revoke1 ).get ();
9989 exec2 .submit (revoke1 ).get ();
@@ -102,33 +92,23 @@ public Void call() {
10292 csa .seekToTimestamp (43L );
10393 verify (cb1 ).seekToTimestamp (new LinkedList <>(map1 .keySet ()), 43L );
10494 verify (cb2 ).seekToTimestamp (new LinkedList <>(map2 .keySet ()), 43L );
105- var revoke2 = new Callable <Void >() {
106-
107- @ Override
108- public Void call () {
109- if (first .getAndSet (false )) {
110- csa .onPartitionsRevoked (Collections .singletonList (map1 .keySet ().iterator ().next ()));
111- }
112- else {
113- csa .onPartitionsRevoked (Collections .singletonList (map2 .keySet ().iterator ().next ()));
114- }
115- return null ;
95+ var revoke2 = (Callable <Void >) () -> {
96+ if (first .getAndSet (false )) {
97+ csa .onPartitionsRevoked (Collections .singletonList (map1 .keySet ().iterator ().next ()));
11698 }
117-
99+ else {
100+ csa .onPartitionsRevoked (Collections .singletonList (map2 .keySet ().iterator ().next ()));
101+ }
102+ return null ;
118103 };
119104 exec1 .submit (revoke2 ).get ();
120105 exec2 .submit (revoke2 ).get ();
121106 assertThat (KafkaTestUtils .getPropertyValue (csa , "callbacks" , Map .class )).isEmpty ();
122107 assertThat (KafkaTestUtils .getPropertyValue (csa , "callbacksToTopic" , Map .class )).isEmpty ();
123- var checkTL = new Callable <Void >() {
124-
125- @ Override
126- public Void call () throws Exception {
127- csa .unregisterSeekCallback ();
128- assertThat (KafkaTestUtils .getPropertyValue (csa , "callbackForThread" , ThreadLocal .class ).get ()).isNull ();
129- return null ;
130- }
131-
108+ var checkTL = (Callable <Void >) () -> {
109+ csa .unregisterSeekCallback ();
110+ assertThat (KafkaTestUtils .getPropertyValue (csa , "callbackForThread" , ThreadLocal .class ).get ()).isNull ();
111+ return null ;
132112 };
133113 exec1 .submit (checkTL ).get ();
134114 exec2 .submit (checkTL ).get ();
0 commit comments