@@ -48,13 +48,11 @@ def closed?
4848 @native_kafka . closed?
4949 end
5050
51- # Subscribe to one or more topics letting Kafka handle partition assignments.
51+ # Subscribes to one or more topics letting Kafka handle partition assignments.
5252 #
5353 # @param topics [Array<String>] One or more topic names
54- #
55- # @raise [RdkafkaError] When subscribing fails
56- #
5754 # @return [nil]
55+ # @raise [RdkafkaError] When subscribing fails
5856 def subscribe ( *topics )
5957 closed_consumer_check ( __method__ )
6058
@@ -78,9 +76,8 @@ def subscribe(*topics)
7876
7977 # Unsubscribe from all subscribed topics.
8078 #
81- # @raise [RdkafkaError] When unsubscribing fails
82- #
8379 # @return [nil]
80+ # @raise [RdkafkaError] When unsubscribing fails
8481 def unsubscribe
8582 closed_consumer_check ( __method__ )
8683
@@ -95,10 +92,8 @@ def unsubscribe
9592 # Pause producing or consumption for the provided list of partitions
9693 #
9794 # @param list [TopicPartitionList] The topic with partitions to pause
98- #
99- # @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
100- #
10195 # @return [nil]
96+ # @raise [RdkafkaTopicPartitionListError] When pausing subscription fails.
10297 def pause ( list )
10398 closed_consumer_check ( __method__ )
10499
@@ -122,13 +117,11 @@ def pause(list)
122117 end
123118 end
124119
125- # Resume producing consumption for the provided list of partitions
120+ # Resumes producing consumption for the provided list of partitions
126121 #
127122 # @param list [TopicPartitionList] The topic with partitions to pause
128- #
129- # @raise [RdkafkaError] When resume subscription fails.
130- #
131123 # @return [nil]
124+ # @raise [RdkafkaError] When resume subscription fails.
132125 def resume ( list )
133126 closed_consumer_check ( __method__ )
134127
@@ -150,11 +143,10 @@ def resume(list)
150143 end
151144 end
152145
153- # Return the current subscription to topics and partitions
154- #
155- # @raise [RdkafkaError] When getting the subscription fails.
146+ # Returns the current subscription to topics and partitions
156147 #
157148 # @return [TopicPartitionList]
149+ # @raise [RdkafkaError] When getting the subscription fails.
158150 def subscription
159151 closed_consumer_check ( __method__ )
160152
@@ -179,7 +171,6 @@ def subscription
179171 # Atomic assignment of partitions to consume
180172 #
181173 # @param list [TopicPartitionList] The topic with partitions to assign
182- #
183174 # @raise [RdkafkaError] When assigning fails
184175 def assign ( list )
185176 closed_consumer_check ( __method__ )
@@ -204,9 +195,8 @@ def assign(list)
204195
205196 # Returns the current partition assignment.
206197 #
207- # @raise [RdkafkaError] When getting the assignment fails.
208- #
209198 # @return [TopicPartitionList]
199+ # @raise [RdkafkaError] When getting the assignment fails.
210200 def assignment
211201 closed_consumer_check ( __method__ )
212202
@@ -232,14 +222,14 @@ def assignment
232222 end
233223
234224 # Return the current committed offset per partition for this consumer group.
235- # The offset field of each requested partition will either be set to stored offset or to -1001 in case there was no stored offset for that partition.
225+ # The offset field of each requested partition will either be set to stored offset or to -1001
226+ # in case there was no stored offset for that partition.
236227 #
237- # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil to use the current subscription.
228+ # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil
229+ # to use the current subscription.
238230 # @param timeout_ms [Integer] The timeout for fetching this information.
239- #
240- # @raise [RdkafkaError] When getting the committed positions fails.
241- #
242231 # @return [TopicPartitionList]
232+ # @raise [RdkafkaError] When getting the committed positions fails.
243233 def committed ( list = nil , timeout_ms = 1200 )
244234 closed_consumer_check ( __method__ )
245235
@@ -269,10 +259,8 @@ def committed(list=nil, timeout_ms=1200)
269259 # @param topic [String] The topic to query
270260 # @param partition [Integer] The partition to query
271261 # @param timeout_ms [Integer] The timeout for querying the broker
272- #
273- # @raise [RdkafkaError] When querying the broker fails.
274- #
275262 # @return [Integer] The low and high watermark
263+ # @raise [RdkafkaError] When querying the broker fails.
276264 def query_watermark_offsets ( topic , partition , timeout_ms = 200 )
277265 closed_consumer_check ( __method__ )
278266
@@ -306,10 +294,9 @@ def query_watermark_offsets(topic, partition, timeout_ms=200)
306294 #
307295 # @param topic_partition_list [TopicPartitionList] The list to calculate lag for.
308296 # @param watermark_timeout_ms [Integer] The timeout for each query watermark call.
309- #
297+ # @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag
298+ # per partition
310299 # @raise [RdkafkaError] When querying the broker fails.
311- #
312- # @return [Hash<String, Hash<Integer, Integer>>] A hash containing all topics with the lag per partition
313300 def lag ( topic_partition_list , watermark_timeout_ms = 100 )
314301 out = { }
315302
@@ -358,10 +345,8 @@ def member_id
358345 # When using this `enable.auto.offset.store` should be set to `false` in the config.
359346 #
360347 # @param message [Rdkafka::Consumer::Message] The message which offset will be stored
361- #
362- # @raise [RdkafkaError] When storing the offset fails
363- #
364348 # @return [nil]
349+ # @raise [RdkafkaError] When storing the offset fails
365350 def store_offset ( message )
366351 closed_consumer_check ( __method__ )
367352
@@ -392,10 +377,8 @@ def store_offset(message)
392377 # message at the given offset.
393378 #
394379 # @param message [Rdkafka::Consumer::Message] The message to which to seek
395- #
396- # @raise [RdkafkaError] When seeking fails
397- #
398380 # @return [nil]
381+ # @raise [RdkafkaError] When seeking fails
399382 def seek ( message )
400383 closed_consumer_check ( __method__ )
401384
@@ -434,10 +417,8 @@ def seek(message)
434417 #
435418 # @param list [TopicPartitionList,nil] The topic with partitions to commit
436419 # @param async [Boolean] Whether to commit async or wait for the commit to finish
437- #
438- # @raise [RdkafkaError] When committing fails
439- #
440420 # @return [nil]
421+ # @raise [RdkafkaError] When committing fails
441422 def commit ( list = nil , async = false )
442423 closed_consumer_check ( __method__ )
443424
@@ -462,10 +443,8 @@ def commit(list=nil, async=false)
462443 # Poll for the next message on one of the subscribed topics
463444 #
464445 # @param timeout_ms [Integer] Timeout of this poll
465- #
466- # @raise [RdkafkaError] When polling fails
467- #
468446 # @return [Message, nil] A message or nil if there was no new message within the timeout
447+ # @raise [RdkafkaError] When polling fails
469448 def poll ( timeout_ms )
470449 closed_consumer_check ( __method__ )
471450
@@ -494,14 +473,11 @@ def poll(timeout_ms)
494473 # Poll for new messages and yield for each received one. Iteration
495474 # will end when the consumer is closed.
496475 #
497- # If `enable.partition.eof` is turned on in the config this will raise an
498- # error when an eof is reached, so you probably want to disable that when
499- # using this method of iteration.
476+ # If `enable.partition.eof` is turned on in the config this will raise an error when an eof is
477+ # reached, so you probably want to disable that when using this method of iteration.
500478 #
501479 # @raise [RdkafkaError] When polling fails
502- #
503480 # @yieldparam message [Message] Received message
504- #
505481 # @return [nil]
506482 def each
507483 loop do
@@ -554,9 +530,7 @@ def each
554530 # that you may or may not see again.
555531 #
556532 # @param max_items [Integer] Maximum size of the yielded array of messages
557- #
558533 # @param bytes_threshold [Integer] Threshold number of total message bytes in the yielded array of messages
559- #
560534 # @param timeout_ms [Integer] max time to wait for up to max_items
561535 #
562536 # @raise [RdkafkaError] When polling fails
0 commit comments