Skip to content

Commit 8e5c182

Browse files
authored
Merge pull request #533 from nguyenquangminh0711/create-partitions-api
Increase topic partition count API
2 parents 2d7430f + 9228511 commit 8e5c182

File tree

9 files changed

+153
-4
lines changed

9 files changed

+153
-4
lines changed

lib/kafka/broker.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ def delete_topics(**options)
121121
send_request(request)
122122
end
123123

124+
def create_partitions(**options)
125+
request = Protocol::CreatePartitionsRequest.new(**options)
126+
127+
send_request(request)
128+
end
129+
124130
def api_versions
125131
request = Protocol::ApiVersionsRequest.new
126132

lib/kafka/client.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,18 @@ def delete_topic(name, timeout: 30)
473473
@cluster.delete_topic(name, timeout: timeout)
474474
end
475475

476+
# Create partitions for a topic.
477+
#
478+
# @param name [String] the name of the topic.
479+
# @param num_partitions [Integer] the number of desired partitions for
480+
# the topic
481+
# @param timeout [Integer] a duration of time to wait for the new
482+
# partitions to be added.
483+
# @return [nil]
484+
def create_partitions_for(name, num_partitions: 1, timeout: 30)
485+
@cluster.create_partitions_for(name, num_partitions: num_partitions, timeout: timeout)
486+
end
487+
476488
# Lists all topics in the cluster.
477489
#
478490
# @return [Array<String>] the list of topic names.
@@ -526,6 +538,15 @@ def last_offsets_for(*topics)
526538
}.to_h
527539
end
528540

541+
# Check whether current cluster supports a specific version or not
542+
#
543+
# @param api_key [Integer] API key.
544+
# @param version [Integer] API version.
545+
# @return [Boolean]
546+
def supports_api?(api_key, version = nil)
547+
@cluster.supports_api?(api_key, version)
548+
end
549+
529550
def apis
530551
@cluster.apis
531552
end

lib/kafka/cluster.rb

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,17 @@ def api_info(api_key)
5353
apis.find {|api| api.api_key == api_key }
5454
end
5555

56+
def supports_api?(api_key, version = nil)
57+
info = api_info(api_key)
58+
if info.nil?
59+
return false
60+
elsif version.nil?
61+
return true
62+
else
63+
return (info.min_version..info.max_version).include?(version)
64+
end
65+
end
66+
5667
def apis
5768
@apis ||=
5869
begin
@@ -199,6 +210,26 @@ def delete_topic(name, timeout:)
199210
@logger.info "Topic `#{name}` was deleted"
200211
end
201212

213+
def create_partitions_for(name, num_partitions:, timeout:)
214+
options = {
215+
topics: [[name, num_partitions, nil]],
216+
timeout: timeout
217+
}
218+
219+
broker = controller_broker
220+
221+
@logger.info "Creating #{num_partitions} partition(s) for topic `#{name}` using controller broker #{broker}"
222+
223+
response = broker.create_partitions(**options)
224+
225+
response.errors.each do |topic, error_code, error_message|
226+
Protocol.handle_error(error_code, error_message)
227+
end
228+
mark_as_stale!
229+
230+
@logger.info "Topic `#{name}` was updated"
231+
end
232+
202233
def resolve_offsets(topic, partitions, offset)
203234
add_target_topics([topic])
204235
refresh_metadata_if_necessary!

lib/kafka/protocol.rb

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Protocol
2727
API_VERSIONS_API = 18
2828
CREATE_TOPICS_API = 19
2929
DELETE_TOPICS_API = 20
30+
CREATE_PARTITIONS_API = 37
3031

3132
# A mapping from numeric API keys to symbolic API names.
3233
APIS = {
@@ -45,6 +46,7 @@ module Protocol
4546
API_VERSIONS_API => :api_versions,
4647
CREATE_TOPICS_API => :create_topics,
4748
DELETE_TOPICS_API => :delete_topics,
49+
CREATE_PARTITIONS_API => :create_partitions
4850
}
4951

5052
# A mapping from numeric error codes to exception classes.
@@ -95,13 +97,13 @@ module Protocol
9597
# @param error_code Integer
9698
# @raise [ProtocolError]
9799
# @return [nil]
98-
def self.handle_error(error_code)
100+
def self.handle_error(error_code, error_message = nil)
99101
if error_code == 0
100102
# No errors, yay!
101103
elsif error = ERRORS[error_code]
102-
raise error
104+
raise error, error_message
103105
else
104-
raise UnknownError, "Unknown error with code #{error_code}"
106+
raise UnknownError, "Unknown error with code #{error_code} #{error_message}"
105107
end
106108
end
107109

@@ -145,3 +147,5 @@ def self.api_name(api_key)
145147
require "kafka/protocol/create_topics_response"
146148
require "kafka/protocol/delete_topics_request"
147149
require "kafka/protocol/delete_topics_response"
150+
require "kafka/protocol/create_partitions_request"
151+
require "kafka/protocol/create_partitions_response"
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
module Kafka
2+
module Protocol
3+
4+
class CreatePartitionsRequest
5+
def initialize(topics:, timeout:)
6+
@topics, @timeout = topics, timeout
7+
end
8+
9+
def api_key
10+
CREATE_PARTITIONS_API
11+
end
12+
13+
def api_version
14+
0
15+
end
16+
17+
def response_class
18+
Protocol::CreatePartitionsResponse
19+
end
20+
21+
def encode(encoder)
22+
encoder.write_array(@topics) do |topic, count, assignments|
23+
encoder.write_string(topic)
24+
encoder.write_int32(count)
25+
encoder.write_array(assignments) do |assignment|
26+
encoder.write_array(assignment) do |broker|
27+
encoder.write_int32(broker)
28+
end
29+
end
30+
end
31+
# Timeout is in ms.
32+
encoder.write_int32(@timeout * 1000)
33+
# validate_only. There isn't any use case for this in real life. So
34+
# let's ignore it for now
35+
encoder.write_boolean(false)
36+
end
37+
end
38+
39+
end
40+
end
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
module Kafka
2+
module Protocol
3+
4+
class CreatePartitionsResponse
5+
attr_reader :errors
6+
7+
def initialize(throttle_time_ms:, errors:)
8+
@throttle_time_ms = throttle_time_ms
9+
@errors = errors
10+
end
11+
12+
def self.decode(decoder)
13+
throttle_time_ms = decoder.int32
14+
errors = decoder.array do
15+
topic = decoder.string
16+
error_code = decoder.int16
17+
error_message = decoder.string
18+
[topic, error_code, error_message]
19+
end
20+
21+
new(throttle_time_ms: throttle_time_ms, errors: errors)
22+
end
23+
end
24+
25+
end
26+
end

lib/kafka/protocol/encoder.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ def write(bytes)
3030
# @param boolean [Boolean]
3131
# @return [nil]
3232
def write_boolean(boolean)
33-
write(boolean ? 0x1 : 0x0)
33+
boolean ? write_int8(1) : write_int8(0)
3434
end
3535

3636
# Writes an 8-bit integer to the IO object.

spec/functional/api_versions_spec.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,12 @@
55
expect(produce_api.min_version).to eq 0
66
expect(produce_api.max_version).to be >= 2
77
end
8+
9+
example "checks cluster API support" do
10+
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API)).to eql(true)
11+
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API, 0)).to eql(true)
12+
expect(kafka.supports_api?(Kafka::Protocol::PRODUCE_API, 100)).to eql(false)
13+
expect(kafka.supports_api?(100)).to eql(false)
14+
expect(kafka.supports_api?(100, 100)).to eql(false)
15+
end
816
end

spec/functional/topic_management_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,4 +19,17 @@
1919
kafka.delete_topic(topic)
2020
expect(kafka.has_topic?(topic)).to eql(false)
2121
end
22+
23+
example "create partitions" do
24+
unless kafka.supports_api?(Kafka::Protocol::CREATE_PARTITIONS_API)
25+
skip("This Kafka version not support ")
26+
end
27+
topic = generate_topic_name
28+
29+
kafka.create_topic(topic, num_partitions: 3)
30+
expect(kafka.partitions_for(topic)).to eq 3
31+
32+
kafka.create_partitions_for(topic, num_partitions: 10)
33+
expect(kafka.partitions_for(topic)).to eq 10
34+
end
2235
end

0 commit comments

Comments
 (0)