@@ -79,40 +79,53 @@ public static void Run_Consume(string brokerList, List<string> topics, Cancellat
7979 {
8080 consumer . Subscribe ( topics ) ;
8181
82- while ( ! cancellationToken . IsCancellationRequested )
82+ try
8383 {
84- try
84+ while ( true )
8585 {
86- var consumeResult = consumer . Consume ( cancellationToken ) ;
87-
88- if ( consumeResult . IsPartitionEOF )
86+ try
8987 {
90- Console . WriteLine (
91- $ "Reached end of topic { consumeResult . Topic } , partition { consumeResult . Partition } .") ;
92-
93- continue ;
88+ var consumeResult = consumer . Consume ( cancellationToken ) ;
89+
90+ if ( consumeResult . IsPartitionEOF )
91+ {
92+ Console . WriteLine (
93+ $ "Reached end of topic { consumeResult . Topic } , partition { consumeResult . Partition } , offset { consumeResult . Offset } .") ;
94+
95+ continue ;
96+ }
97+
98+ Console . WriteLine ( $ "Received message at { consumeResult . TopicPartitionOffset } : { consumeResult . Value } ") ;
99+
100+ if ( consumeResult . Offset % commitPeriod == 0 )
101+ {
102+ // The Commit method sends a "commit offsets" request to the Kafka
103+ // cluster and synchronously waits for the response. This is very
104+ // slow compared to the rate at which the consumer is capable of
105+ // consuming messages. A high performance application will typically
106+ // commit offsets relatively infrequently and be designed handle
107+ // duplicate messages in the event of failure.
108+ try
109+ {
110+ consumer . Commit ( consumeResult ) ;
111+ }
112+ catch ( KafkaException e )
113+ {
114+ Console . WriteLine ( $ "Commit error: { e . Error . Reason } ") ;
115+ }
116+ }
94117 }
95-
96- Console . WriteLine ( $ "Received message at { consumeResult . TopicPartitionOffset } : { consumeResult . Value } ") ;
97-
98- if ( consumeResult . Offset % commitPeriod == 0 )
118+ catch ( ConsumeException e )
99119 {
100- // The Commit method sends a "commit offsets" request to the Kafka
101- // cluster and synchronously waits for the response. This is very
102- // slow compared to the rate at which the consumer is capable of
103- // consuming messages. A high performance application will typically
104- // commit offsets relatively infrequently and be designed handle
105- // duplicate messages in the event of failure.
106- consumer . Commit ( consumeResult ) ;
120+ Console . WriteLine ( $ "Consume error: { e . Error . Reason } ") ;
107121 }
108122 }
109- catch ( ConsumeException e )
110- {
111- Console . WriteLine ( $ "Consume error: { e . Error } ") ;
112- }
113123 }
114-
115- consumer . Close ( ) ;
124+ catch ( OperationCanceledException )
125+ {
126+ Console . WriteLine ( "Closing consumer." ) ;
127+ consumer . Close ( ) ;
128+ }
116129 }
117130 }
118131
@@ -143,28 +156,34 @@ public static void Run_ManualAssign(string brokerList, List<string> topics, Canc
143156 {
144157 consumer . Assign ( topics . Select ( topic => new TopicPartitionOffset ( topic , 0 , Offset . Beginning ) ) . ToList ( ) ) ;
145158
146- while ( ! cancellationToken . IsCancellationRequested )
159+ try
147160 {
148- try
161+ while ( true )
149162 {
150- var consumeResult = consumer . Consume ( cancellationToken ) ;
151- // Note: End of partition notification has not been enabled, so
152- // it is guaranteed that the ConsumeResult instance corresponds
153- // to a Message, and not a PartitionEOF event.
154- Console . WriteLine ( $ "Received message at { consumeResult . TopicPartitionOffset } : ${ consumeResult . Value } ") ;
155- }
156- catch ( ConsumeException e )
157- {
158- Console . WriteLine ( $ "Consume error: { e . Error } ") ;
163+ try
164+ {
165+ var consumeResult = consumer . Consume ( cancellationToken ) ;
166+ // Note: End of partition notification has not been enabled, so
167+ // it is guaranteed that the ConsumeResult instance corresponds
168+ // to a Message, and not a PartitionEOF event.
169+ Console . WriteLine ( $ "Received message at { consumeResult . TopicPartitionOffset } : ${ consumeResult . Value } ") ;
170+ }
171+ catch ( ConsumeException e )
172+ {
173+ Console . WriteLine ( $ "Consume error: { e . Error . Reason } ") ;
174+ }
159175 }
160176 }
161-
162- consumer . Close ( ) ;
177+ catch ( OperationCanceledException )
178+ {
179+ Console . WriteLine ( "Closing consumer." ) ;
180+ consumer . Close ( ) ;
181+ }
163182 }
164183 }
165184
166185 private static void PrintUsage ( )
167- => Console . WriteLine ( "Usage: .. <poll|consume |manual> <broker,broker,..> <topic> [topic..]" ) ;
186+ => Console . WriteLine ( "Usage: .. <subscribe |manual> <broker,broker,..> <topic> [topic..]" ) ;
168187
169188 public static void Main ( string [ ] args )
170189 {
@@ -188,7 +207,7 @@ public static void Main(string[] args)
188207
189208 switch ( mode )
190209 {
191- case "consume " :
210+ case "subscribe " :
192211 Run_Consume ( brokerList , topics , cts . Token ) ;
193212 break ;
194213 case "manual" :
0 commit comments