File tree Expand file tree Collapse file tree 4 files changed +18
-0
lines changed
kafka-ui-api/src/main/java/com/provectus/kafka/ui/emitter
kafka-ui-react-app/src/components/Topics/Topic/Messages/Filters Expand file tree Collapse file tree 4 files changed +18
-0
lines changed Original file line number Diff line number Diff line change 1717import org .apache .kafka .clients .consumer .ConsumerRecord ;
1818import org .apache .kafka .clients .consumer .KafkaConsumer ;
1919import org .apache .kafka .common .TopicPartition ;
20+ import org .apache .kafka .common .errors .InterruptException ;
2021import org .apache .kafka .common .utils .Bytes ;
2122import reactor .core .publisher .FluxSink ;
2223
@@ -85,6 +86,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
8586 }
8687 sendFinishStatsAndCompleteSink (sink );
8788 log .debug ("Polling finished" );
89+ } catch (InterruptException kafkaInterruptException ) {
90+ log .debug ("Polling finished due to thread interruption" );
91+ sink .complete ();
8892 } catch (Exception e ) {
8993 log .error ("Error occurred while consuming records" , e );
9094 sink .error (e );
Original file line number Diff line number Diff line change 99import org .apache .kafka .clients .consumer .ConsumerRecord ;
1010import org .apache .kafka .clients .consumer .ConsumerRecords ;
1111import org .apache .kafka .clients .consumer .KafkaConsumer ;
12+ import org .apache .kafka .common .errors .InterruptException ;
1213import org .apache .kafka .common .utils .Bytes ;
1314import reactor .core .publisher .FluxSink ;
1415
@@ -59,6 +60,9 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
5960 }
6061 sendFinishStatsAndCompleteSink (sink );
6162 log .debug ("Polling finished" );
63+ } catch (InterruptException kafkaInterruptException ) {
64+ log .debug ("Polling finished due to thread interruption" );
65+ sink .complete ();
6266 } catch (Exception e ) {
6367 log .error ("Error occurred while consuming records" , e );
6468 sink .error (e );
Original file line number Diff line number Diff line change @@ -41,6 +41,7 @@ public void accept(FluxSink<TopicMessageEventDTO> sink) {
4141 sink .complete ();
4242 log .debug ("Tailing finished" );
4343 } catch (InterruptException kafkaInterruptException ) {
44+ log .debug ("Tailing finished due to thread interruption" );
4445 sink .complete ();
4546 } catch (Exception e ) {
4647 log .error ("Error consuming {}" , consumerPosition , e );
Original file line number Diff line number Diff line change @@ -219,6 +219,15 @@ const Filters: React.FC<FiltersProps> = ({
219219 default :
220220 props . seekType = currentSeekType ;
221221 }
222+
223+ if ( offset && currentSeekType === SeekType . OFFSET ) {
224+ props . seekType = SeekType . OFFSET ;
225+ }
226+
227+ if ( timestamp && currentSeekType === SeekType . TIMESTAMP ) {
228+ props . seekType = SeekType . TIMESTAMP ;
229+ }
230+
222231 props . seekTo = selectedPartitions . map ( ( { value } ) => {
223232 const offsetProperty =
224233 seekDirection === SeekDirection . FORWARD ? 'offsetMin' : 'offsetMax' ;
You can’t perform that action at this time.
0 commit comments