Skip to content

Commit b123388

Browse files
committed
Don't call subscribe when there's no topics to subscribe to.
This is important if one wants to use Consumer.assign instead of a subscriptions.
1 parent d4b32fd commit b123388

File tree

1 file changed

+34
-12
lines changed

1 file changed

+34
-12
lines changed

src/Kafka/Consumer.hs

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,22 +125,44 @@ newConsumer props (Subscription ts tp) = liftIO $ do
125125
_ <- setDefaultTopicConf kc tp'
126126
rdk <- newRdKafkaT RdKafkaConsumer kc'
127127
case rdk of
128-
Left err -> return . Left $ KafkaError err
128+
Left err ->
129+
return $
130+
Left (KafkaError err)
131+
129132
Right rdk' -> do
133+
let
134+
kafka =
135+
KafkaConsumer (Kafka rdk') kc
136+
130137
when (cpCallbackPollMode props == CallbackPollModeAsync) $ do
131-
msgq <- rdKafkaQueueNew rdk'
132-
writeIORef qref (Just msgq)
133-
let kafka = KafkaConsumer (Kafka rdk') kc
138+
messageQueue <- rdKafkaQueueNew rdk'
139+
writeIORef qref (Just messageQueue)
140+
134141
redErr <- redirectCallbacksPoll kafka
135142
case redErr of
136-
Just err -> closeConsumer kafka >> return (Left err)
137-
Nothing -> do
138-
forM_ (cpLogLevel cp) (setConsumerLogLevel kafka)
139-
sub <- subscribe kafka ts
140-
case sub of
141-
Nothing -> (when (cpCallbackPollMode props == CallbackPollModeAsync) $
142-
runConsumerLoop kafka (Just $ Timeout 100)) >> return (Right kafka)
143-
Just err -> closeConsumer kafka >> return (Left err)
143+
Just err -> do
144+
_ <- closeConsumer kafka
145+
return (Left err)
146+
147+
Nothing -> do
148+
forM_ (cpLogLevel cp) $
149+
setConsumerLogLevel kafka
150+
151+
subscribeError <-
152+
if Set.null ts then
153+
pure Nothing
154+
else
155+
subscribe kafka ts
156+
157+
case subscribeError of
158+
Nothing -> do
159+
when (cpCallbackPollMode props == CallbackPollModeAsync) $
160+
runConsumerLoop kafka (Just $ Timeout 100)
161+
return (Right kafka)
162+
163+
Just err -> do
164+
_ <- closeConsumer kafka
165+
return (Left err)
144166

145167
-- | Polls a single message
146168
pollMessage :: MonadIO m

0 commit comments

Comments
 (0)