Skip to content

Commit 36f3ba0

Browse files
authored
Merge pull request #530 from nguyenquangminh0711/describe-configs-api
Describe topic API
2 parents 5aa6db0 + 6359bed commit 36f3ba0

File tree

8 files changed

+174
-2
lines changed

8 files changed

+174
-2
lines changed

.circleci/config.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,8 +58,6 @@ jobs:
5858
KAFKA_DELETE_TOPIC_ENABLE: true
5959
- image: wurstmeister/kafka:0.11.0.1
6060
environment:
61-
KAFKA_ADVERTISED_HOST_NAME: localhost
62-
KAFKA_ADVERTISED_PORT: 9093
6361
KAFKA_PORT: 9093
6462
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
6563
KAFKA_DELETE_TOPIC_ENABLE: true

lib/kafka/broker.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ def delete_topics(**options)
121121
send_request(request)
122122
end
123123

124+
def describe_configs(**options)
125+
request = Protocol::DescribeConfigsRequest.new(**options)
126+
127+
send_request(request)
128+
end
129+
124130
def create_partitions(**options)
125131
request = Protocol::CreatePartitionsRequest.new(**options)
126132

lib/kafka/client.rb

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -473,6 +473,15 @@ def delete_topic(name, timeout: 30)
473473
@cluster.delete_topic(name, timeout: timeout)
474474
end
475475

476+
# Describe configs of a topic.
477+
#
478+
# @param name [String] the name of the topic.
479+
# @param configs [Array<String>] array of desired config names.
480+
# @return [Hash<string, string>]
481+
def describe_topic(name, configs = [])
482+
@cluster.describe_topic(name, configs)
483+
end
484+
476485
# Create partitions for a topic.
477486
#
478487
# @param name [String] the name of the topic.

lib/kafka/cluster.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,25 @@ def delete_topic(name, timeout:)
210210
@logger.info "Topic `#{name}` was deleted"
211211
end
212212

213+
def describe_topic(name, configs = [])
214+
options = {
215+
resources: [[Kafka::Protocol::RESOURCE_TYPE_TOPIC, name, configs]]
216+
}
217+
broker = controller_broker
218+
219+
@logger.info "Fetching topic `#{name}`'s configs using controller broker #{broker}"
220+
221+
response = broker.describe_configs(**options)
222+
223+
response.resources.each do |resource|
224+
Protocol.handle_error(resource.error_code, resource.error_message)
225+
end
226+
topic_description = response.resources.first
227+
topic_description.configs.each_with_object({}) do |config, hash|
228+
hash[config.name] = config.value
229+
end
230+
end
231+
213232
def create_partitions_for(name, num_partitions:, timeout:)
214233
options = {
215234
topics: [[name, num_partitions, nil]],

lib/kafka/protocol.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ module Protocol
2727
API_VERSIONS_API = 18
2828
CREATE_TOPICS_API = 19
2929
DELETE_TOPICS_API = 20
30+
DESCRIBE_CONFIGS_API = 32
3031
CREATE_PARTITIONS_API = 37
3132

3233
# A mapping from numeric API keys to symbolic API names.
@@ -46,6 +47,7 @@ module Protocol
4647
API_VERSIONS_API => :api_versions,
4748
CREATE_TOPICS_API => :create_topics,
4849
DELETE_TOPICS_API => :delete_topics,
50+
DESCRIBE_CONFIGS_API => :describe_configs_api,
4951
CREATE_PARTITIONS_API => :create_partitions
5052
}
5153

@@ -91,6 +93,25 @@ module Protocol
9193
42 => InvalidRequest
9294
}
9395

96+
# A mapping from int to corresponding resource type in symbol.
97+
# https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
98+
RESOURCE_TYPE_UNKNOWN = 0
99+
RESOURCE_TYPE_ANY = 1
100+
RESOURCE_TYPE_TOPIC = 2
101+
RESOURCE_TYPE_GROUP = 3
102+
RESOURCE_TYPE_CLUSTER = 4
103+
RESOURCE_TYPE_TRANSACTIONAL_ID = 5
104+
RESOURCE_TYPE_DELEGATION_TOKEN = 6
105+
RESOURCE_TYPES = {
106+
RESOURCE_TYPE_UNKNOWN => :unknown,
107+
RESOURCE_TYPE_ANY => :any,
108+
RESOURCE_TYPE_TOPIC => :topic,
109+
RESOURCE_TYPE_GROUP => :group,
110+
RESOURCE_TYPE_CLUSTER => :cluster,
111+
RESOURCE_TYPE_TRANSACTIONAL_ID => :transactional_id,
112+
RESOURCE_TYPE_DELEGATION_TOKEN => :delegation_token,
113+
}
114+
94115
# Handles an error code by either doing nothing (if there was no error) or
95116
# by raising an appropriate exception.
96117
#
@@ -147,5 +168,7 @@ def self.api_name(api_key)
147168
require "kafka/protocol/create_topics_response"
148169
require "kafka/protocol/delete_topics_request"
149170
require "kafka/protocol/delete_topics_response"
171+
require "kafka/protocol/describe_configs_request"
172+
require "kafka/protocol/describe_configs_response"
150173
require "kafka/protocol/create_partitions_request"
151174
require "kafka/protocol/create_partitions_response"
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
module Kafka
2+
module Protocol
3+
4+
class DescribeConfigsRequest
5+
def initialize(resources:)
6+
@resources = resources
7+
end
8+
9+
def api_key
10+
DESCRIBE_CONFIGS_API
11+
end
12+
13+
def api_version
14+
0
15+
end
16+
17+
def response_class
18+
Protocol::DescribeConfigsResponse
19+
end
20+
21+
def encode(encoder)
22+
encoder.write_array(@resources) do |type, name, configs|
23+
encoder.write_int8(type)
24+
encoder.write_string(name)
25+
encoder.write_array(configs) do |config|
26+
encoder.write_string(config)
27+
end
28+
end
29+
end
30+
end
31+
32+
end
33+
end
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
module Kafka
2+
module Protocol
3+
class DescribeConfigsResponse
4+
class ResourceDescription
5+
attr_reader :name, :type, :error_code, :error_message, :configs
6+
7+
def initialize(name:, type:, error_code:, error_message:, configs:)
8+
@name = name
9+
@type = type
10+
@error_code = error_code
11+
@error_message = error_message
12+
@configs = configs
13+
end
14+
end
15+
16+
class ConfigEntry
17+
attr_reader :name, :value, :read_only, :is_default, :is_sensitive
18+
19+
def initialize(name:, value:, read_only:, is_default:, is_sensitive:)
20+
@name = name
21+
@value = value
22+
@read_only = read_only
23+
@is_default = is_default
24+
@is_sensitive = is_sensitive
25+
end
26+
end
27+
28+
attr_reader :resources
29+
30+
def initialize(throttle_time_ms:, resources:)
31+
@throttle_time_ms = throttle_time_ms
32+
@resources = resources
33+
end
34+
35+
def self.decode(decoder)
36+
throttle_time_ms = decoder.int32
37+
resources = decoder.array do
38+
error_code = decoder.int16
39+
error_message = decoder.string
40+
41+
resource_type = decoder.int8
42+
if Kafka::Protocol::RESOURCE_TYPES[resource_type].nil?
43+
raise Kafka::ProtocolError, "Resource type not supported: #{resource_type}"
44+
end
45+
resource_name = decoder.string
46+
47+
configs = decoder.array do
48+
ConfigEntry.new(
49+
name: decoder.string,
50+
value: decoder.string,
51+
read_only: decoder.boolean,
52+
is_default: decoder.boolean,
53+
is_sensitive: decoder.boolean,
54+
)
55+
end
56+
57+
ResourceDescription.new(
58+
type: RESOURCE_TYPES[resource_type],
59+
name: resource_name,
60+
error_code: error_code,
61+
error_message: error_message,
62+
configs: configs
63+
)
64+
end
65+
66+
new(throttle_time_ms: throttle_time_ms, resources: resources)
67+
end
68+
end
69+
70+
end
71+
end

spec/functional/topic_management_spec.rb

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,17 @@
3232
kafka.create_partitions_for(topic, num_partitions: 10)
3333
expect(kafka.partitions_for(topic)).to eq 10
3434
end
35+
36+
example "describe a topic" do
37+
unless kafka.supports_api?(Kafka::Protocol::DESCRIBE_CONFIGS_API)
38+
skip("This Kafka version not support ")
39+
end
40+
41+
topic = generate_topic_name
42+
configs = kafka.describe_topic(topic, %w(retention.ms retention.bytes non_exists))
43+
44+
expect(configs.keys).to eql(%w(retention.ms retention.bytes))
45+
expect(configs['retention.ms']).to be_a(String)
46+
expect(configs['retention.bytes']).to be_a(String)
47+
end
3548
end

0 commit comments

Comments
 (0)