@@ -125,22 +125,44 @@ newConsumer props (Subscription ts tp) = liftIO $ do
125125 _ <- setDefaultTopicConf kc tp'
126126 rdk <- newRdKafkaT RdKafkaConsumer kc'
127127 case rdk of
128- Left err -> return . Left $ KafkaError err
128+ Left err ->
129+ return $
130+ Left (KafkaError err)
131+
129132 Right rdk' -> do
133+ let
134+ kafka =
135+ KafkaConsumer (Kafka rdk') kc
136+
130137 when (cpCallbackPollMode props == CallbackPollModeAsync ) $ do
131- msgq <- rdKafkaQueueNew rdk'
132- writeIORef qref (Just msgq )
133- let kafka = KafkaConsumer ( Kafka rdk') kc
138+ messageQueue <- rdKafkaQueueNew rdk'
139+ writeIORef qref (Just messageQueue )
140+
134141 redErr <- redirectCallbacksPoll kafka
135142 case redErr of
136- Just err -> closeConsumer kafka >> return (Left err)
137- Nothing -> do
138- forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
139- sub <- subscribe kafka ts
140- case sub of
141- Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync ) $
142- runConsumerLoop kafka (Just $ Timeout 100 )) >> return (Right kafka)
143- Just err -> closeConsumer kafka >> return (Left err)
143+ Just err -> do
144+ _ <- closeConsumer kafka
145+ return (Left err)
146+
147+ Nothing -> do
148+ forM_ (cpLogLevel cp) $
149+ setConsumerLogLevel kafka
150+
151+ subscribeError <-
152+ if Set. null ts then
153+ pure Nothing
154+ else
155+ subscribe kafka ts
156+
157+ case subscribeError of
158+ Nothing -> do
159+ when (cpCallbackPollMode props == CallbackPollModeAsync ) $
160+ runConsumerLoop kafka (Just $ Timeout 100 )
161+ return (Right kafka)
162+
163+ Just err -> do
164+ _ <- closeConsumer kafka
165+ return (Left err)
144166
145167-- | Polls a single message
146168pollMessage :: MonadIO m
0 commit comments