Skip to content

Commit 9431e55

Browse files
dssjoblomDaniel Sjöblom
andauthored
Fix for "Core dump when providing extensions to oauthbearer_set_token" #719 (#733)
* Attempting to fix issue #719 * Tests * Optimization * Comments * Fix code review issue (empty but non-nil extensions) * Fix unsafe memory management * Fix for code review issue, close producer in tests --------- Co-authored-by: Daniel Sjöblom <[email protected]>
1 parent 531db13 commit 9431e55

File tree

4 files changed

+114
-36
lines changed

4 files changed

+114
-36
lines changed

lib/rdkafka/helpers/oauth.rb

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,18 @@ module OAuth
1212
# @return [Integer] 0 on success
1313
def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil)
1414
error_buffer = FFI::MemoryPointer.from_string(" " * 256)
15+
extensions_ptr, extensions_str_ptrs = map_extensions(extensions)
1516

16-
response = @native_kafka.with_inner do |inner|
17-
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
18-
inner, token, lifetime_ms, principal_name,
19-
flatten_extensions(extensions), extension_size(extensions), error_buffer, 256
20-
)
17+
begin
18+
response = @native_kafka.with_inner do |inner|
19+
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(
20+
inner, token, lifetime_ms, principal_name,
21+
extensions_ptr, extension_size(extensions), error_buffer, 256
22+
)
23+
end
24+
ensure
25+
extensions_str_ptrs&.each { |ptr| ptr.free }
26+
extensions_ptr&.free
2127
end
2228

2329
return response if response.zero?
@@ -41,10 +47,31 @@ def oauthbearer_set_token_failure(reason)
4147

4248
private
4349

44-
# Flatten the extensions hash into a string according to the spec, https://datatracker.ietf.org/doc/html/rfc7628#section-3.1
45-
def flatten_extensions(extensions)
46-
return nil unless extensions
47-
"\x01#{extensions.map { |e| e.join("=") }.join("\x01")}"
50+
# Convert extensions hash to FFI::MemoryPointer (const char **).
51+
# Note: the returned pointers must be freed manually (autorelease = false).
52+
def map_extensions(extensions)
53+
return [nil, nil] if extensions.nil? || extensions.empty?
54+
55+
# https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sasl_oauthbearer.c#L327-L347
56+
57+
# The method argument is const char **
58+
array_ptr = FFI::MemoryPointer.new(:pointer, extension_size(extensions))
59+
array_ptr.autorelease = false
60+
str_ptrs = []
61+
62+
# Element i is the key, i + 1 is the value.
63+
extensions.each_with_index do |(k, v), i|
64+
k_ptr = FFI::MemoryPointer.from_string(k.to_s)
65+
k_ptr.autorelease = false
66+
str_ptrs << k_ptr
67+
v_ptr = FFI::MemoryPointer.from_string(v.to_s)
68+
v_ptr.autorelease = false
69+
str_ptrs << v_ptr
70+
array_ptr[i * 2].put_pointer(0, k_ptr)
71+
array_ptr[i * 2 + 1].put_pointer(0, v_ptr)
72+
end
73+
74+
[array_ptr, str_ptrs]
4875
end
4976

5077
# extension_size is the number of keys + values which should be a non-negative even number

spec/lib/rdkafka/admin_spec.rb

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -953,14 +953,29 @@
953953
$admin_sasl.close
954954
end
955955

956-
it 'should succeed' do
956+
context 'without extensions' do
957+
it 'should succeed' do
958+
response = $admin_sasl.oauthbearer_set_token(
959+
token: "foo",
960+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
961+
principal_name: "kafka-cluster"
962+
)
963+
expect(response).to eq(0)
964+
end
965+
end
957966

958-
response = $admin_sasl.oauthbearer_set_token(
959-
token: "foo",
960-
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
961-
principal_name: "kafka-cluster"
962-
)
963-
expect(response).to eq(0)
967+
context 'with extensions' do
968+
it 'should succeed' do
969+
response = $admin_sasl.oauthbearer_set_token(
970+
token: "foo",
971+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
972+
principal_name: "kafka-cluster",
973+
extensions: {
974+
"foo" => "bar"
975+
}
976+
)
977+
expect(response).to eq(0)
978+
end
964979
end
965980
end
966981
end

spec/lib/rdkafka/consumer_spec.rb

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,14 +1230,29 @@ def collect(name, list)
12301230
$consumer_sasl.close
12311231
end
12321232

1233-
it 'should succeed' do
1233+
context 'without extensions' do
1234+
it 'should succeed' do
1235+
response = $consumer_sasl.oauthbearer_set_token(
1236+
token: "foo",
1237+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
1238+
principal_name: "kafka-cluster"
1239+
)
1240+
expect(response).to eq(0)
1241+
end
1242+
end
12341243

1235-
response = $consumer_sasl.oauthbearer_set_token(
1236-
token: "foo",
1237-
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
1238-
principal_name: "kafka-cluster"
1239-
)
1240-
expect(response).to eq(0)
1244+
context 'with extensions' do
1245+
it 'should succeed' do
1246+
response = $consumer_sasl.oauthbearer_set_token(
1247+
token: "foo",
1248+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
1249+
principal_name: "kafka-cluster",
1250+
extensions: {
1251+
"foo" => "bar"
1252+
}
1253+
)
1254+
expect(response).to eq(0)
1255+
end
12411256
end
12421257
end
12431258
end

spec/lib/rdkafka/producer_spec.rb

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -880,26 +880,47 @@ def call(_, handle)
880880
response = producer.oauthbearer_set_token(
881881
token: "foo",
882882
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
883-
principal_name: "kafka-cluster"
883+
principal_name: "kafka-cluster",
884884
)
885885
expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE)
886886
end
887887
end
888888

889889
context 'when sasl configured' do
890-
it 'should succeed' do
891-
producer_sasl = rdkafka_producer_config(
892-
{
893-
"security.protocol": "sasl_ssl",
894-
"sasl.mechanisms": 'OAUTHBEARER'
895-
}
890+
before do
891+
$producer_sasl = rdkafka_producer_config(
892+
"security.protocol": "sasl_ssl",
893+
"sasl.mechanisms": 'OAUTHBEARER'
896894
).producer
897-
response = producer_sasl.oauthbearer_set_token(
898-
token: "foo",
899-
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
900-
principal_name: "kafka-cluster"
901-
)
902-
expect(response).to eq(0)
895+
end
896+
897+
after do
898+
$producer_sasl.close
899+
end
900+
901+
context 'without extensions' do
902+
it 'should succeed' do
903+
response = $producer_sasl.oauthbearer_set_token(
904+
token: "foo",
905+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
906+
principal_name: "kafka-cluster"
907+
)
908+
expect(response).to eq(0)
909+
end
910+
end
911+
912+
context 'with extensions' do
913+
it 'should succeed' do
914+
response = $producer_sasl.oauthbearer_set_token(
915+
token: "foo",
916+
lifetime_ms: Time.now.to_i*1000 + 900 * 1000,
917+
principal_name: "kafka-cluster",
918+
extensions: {
919+
"foo" => "bar"
920+
}
921+
)
922+
expect(response).to eq(0)
923+
end
903924
end
904925
end
905926
end

0 commit comments

Comments
 (0)