File tree Expand file tree Collapse file tree 1 file changed +19
-13
lines changed Expand file tree Collapse file tree 1 file changed +19
-13
lines changed Original file line number Diff line number Diff line change @@ -40,20 +40,26 @@ async function consumerStart() {
40
40
41
41
await consumer .subscribe ({ topics: [ " topic" ] });
42
42
43
- consumer .run ({
44
- eachMessage: async ({ topic, partition, message }) => {
45
- console .log ({
46
- topic,
47
- partition,
48
- offset: message .offset ,
49
- key: message .key ? .toString (),
50
- value: message .value .toString (),
51
- });
52
- },
53
- });
43
+ let stopped = false ;
44
+ while (! stopped) {
45
+ const message = await consumer .consume (1000 );
46
+ if (! message) {
47
+ continue ;
48
+ }
49
+ console .log ({
50
+ topic: message .topic ,
51
+ partition: message .partition ,
52
+ offset: message .offset ,
53
+ key: message .key ? .toString (),
54
+ value: message .value .toString (),
55
+ });
56
+
57
+ // Update stopped whenever we're done consuming.
58
+ // stopped = true;
59
+ }
54
60
55
- // When done consuming
56
- // await consumer.disconnect();
61
+ // Disconnect and clean up.
62
+ await consumer .disconnect ();
57
63
}
58
64
59
65
consumerStart ();
You can’t perform that action at this time.
0 commit comments