Skip to content

Commit 9b5bb86

Browse files
authored
A bit of docs, code remarks and time stuff extraction (#330)
* A bit of docs, code remarks and time stuff extraction * do not private registry for now
1 parent 57b86e3 commit 9b5bb86

File tree

5 files changed

+60
-32
lines changed

5 files changed

+60
-32
lines changed

lib/rdkafka.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
require "rdkafka/version"
44

5+
require "rdkafka/helpers/time"
56
require "rdkafka/abstract_handle"
67
require "rdkafka/admin"
78
require "rdkafka/admin/create_topic_handle"
@@ -22,3 +23,7 @@
2223
require "rdkafka/producer"
2324
require "rdkafka/producer/delivery_handle"
2425
require "rdkafka/producer/delivery_report"
26+
27+
# Main Rdkafka namespace of this gem
28+
module Rdkafka
29+
end

lib/rdkafka/abstract_handle.rb

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,26 +3,37 @@
33
require "ffi"
44

55
module Rdkafka
6+
# This class serves as an abstract base class to represent handles within the Rdkafka module.
7+
# As a subclass of `FFI::Struct`, this class provides a blueprint for other specific handle
8+
# classes to inherit from, ensuring they adhere to a particular structure and behavior.
9+
#
10+
# Subclasses must define their own layout, and the layout must start with:
11+
#
12+
# layout :pending, :bool,
13+
# :response, :int
614
class AbstractHandle < FFI::Struct
7-
# Subclasses must define their own layout, and the layout must start with:
8-
#
9-
# layout :pending, :bool,
10-
# :response, :int
15+
include Helpers::Time
1116

17+
# Registry for registering all the handles.
1218
REGISTRY = {}
1319

14-
CURRENT_TIME = -> { Process.clock_gettime(Process::CLOCK_MONOTONIC) }.freeze
15-
16-
private_constant :CURRENT_TIME
20+
class << self
21+
# Adds handle to the register
22+
#
23+
# @param handle [AbstractHandle] any handle we want to register
24+
def register(handle)
25+
address = handle.to_ptr.address
26+
REGISTRY[address] = handle
27+
end
1728

18-
def self.register(handle)
19-
address = handle.to_ptr.address
20-
REGISTRY[address] = handle
29+
# Removes handle from the register based on the handle address
30+
#
31+
# @param address [Integer] address of the registered handle we want to remove
32+
def remove(address)
33+
REGISTRY.delete(address)
34+
end
2135
end
2236

23-
def self.remove(address)
24-
REGISTRY.delete(address)
25-
end
2637

2738
# Whether the handle is still pending.
2839
#
@@ -32,26 +43,30 @@ def pending?
3243
end
3344

3445
# Wait for the operation to complete or raise an error if this takes longer than the timeout.
35-
# If there is a timeout this does not mean the operation failed, rdkafka might still be working on the operation.
36-
# In this case it is possible to call wait again.
46+
# If there is a timeout this does not mean the operation failed, rdkafka might still be working
47+
# on the operation. In this case it is possible to call wait again.
3748
#
38-
# @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out. If this is nil it does not time out.
39-
# @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the operation has completed
49+
# @param max_wait_timeout [Numeric, nil] Amount of time to wait before timing out.
50+
# If this is nil it does not time out.
51+
# @param wait_timeout [Numeric] Amount of time we should wait before we recheck if the
52+
# operation has completed
53+
#
54+
# @return [Object] Operation-specific result
4055
#
4156
# @raise [RdkafkaError] When the operation failed
4257
# @raise [WaitTimeoutError] When the timeout has been reached and the handle is still pending
43-
#
44-
# @return [Object] Operation-specific result
4558
def wait(max_wait_timeout: 60, wait_timeout: 0.1)
4659
timeout = if max_wait_timeout
47-
CURRENT_TIME.call + max_wait_timeout
60+
monotonic_now + max_wait_timeout
4861
else
4962
nil
5063
end
5164
loop do
5265
if pending?
53-
if timeout && timeout <= CURRENT_TIME.call
54-
raise WaitTimeoutError.new("Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds")
66+
if timeout && timeout <= monotonic_now
67+
raise WaitTimeoutError.new(
68+
"Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds"
69+
)
5570
end
5671
sleep wait_timeout
5772
elsif self[:response] != 0

lib/rdkafka/consumer.rb

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ module Rdkafka
1212
# `each_slice` to consume batches of messages.
1313
class Consumer
1414
include Enumerable
15+
include Helpers::Time
1516

1617
# @private
1718
def initialize(native_kafka)
@@ -602,10 +603,6 @@ def each_batch(max_items: 100, bytes_threshold: Float::INFINITY, timeout_ms: 250
602603
end
603604

604605
private
605-
def monotonic_now
606-
# needed because Time.now can go backwards
607-
Process.clock_gettime(Process::CLOCK_MONOTONIC)
608-
end
609606

610607
def closed_consumer_check(method)
611608
raise Rdkafka::ClosedConsumerError.new(method) if closed?

lib/rdkafka/helpers/time.rb

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# frozen_string_literal: true
2+
3+
module Rdkafka
4+
# Namespace for some small utilities used in multiple components
5+
module Helpers
6+
# Time related methods used across Karafka
7+
module Time
8+
# @return [Float] current monotonic time in seconds with microsecond precision
9+
def monotonic_now
10+
::Process.clock_gettime(::Process::CLOCK_MONOTONIC)
11+
end
12+
end
13+
end
14+
end

lib/rdkafka/producer.rb

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
module Rdkafka
66
# A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that.
77
class Producer
8+
include Helpers::Time
9+
810
# Cache partitions count for 30 seconds
911
PARTITIONS_COUNT_TTL = 30
1012

@@ -116,7 +118,7 @@ def flush(timeout_ms=5_000)
116118
# This prevents us in case someone uses `partition_key` from querying for the count with
117119
# each message. Instead we query once every 30 seconds at most
118120
#
119-
# @param topic [String] topic name
121+
# @param [String] topic name
120122
# @return [Integer] partition count for a given topic
121123
def partition_count(topic)
122124
closed_producer_check(__method__)
@@ -247,11 +249,6 @@ def arity(callback)
247249

248250
private
249251

250-
def monotonic_now
251-
# needed because Time.now can go backwards
252-
Process.clock_gettime(Process::CLOCK_MONOTONIC)
253-
end
254-
255252
def closed_producer_check(method)
256253
raise Rdkafka::ClosedProducerError.new(method) if closed?
257254
end

0 commit comments

Comments
 (0)