Skip to content

Commit 9e01b9f

Browse files
authored
Merge pull request #83 from JiHyunSong/using-delegation-token
when using kerberos, add option for reusing delegation token
2 parents c4def11 + 8b2287d commit 9e01b9f

File tree

4 files changed

+110
-2
lines changed

4 files changed

+110
-2
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ With kerberos authentication:
146146
path /path/on/hdfs/access.log.%Y%m%d_%H.log
147147
kerberos true
148148
kerberos_keytab /path/to/keytab # if needed
149+
renew_kerberos_delegation_token true # if needed
149150
</match>
150151

151152
NOTE: You need to install `gssapi` gem for kerberos. See https://github.com/kzk/webhdfs#for-kerberos-authentication

fluent-plugin-webhdfs.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,5 +23,5 @@ Gem::Specification.new do |gem|
2323
gem.add_development_dependency "bzip2-ffi"
2424
gem.add_development_dependency "zstandard"
2525
gem.add_runtime_dependency "fluentd", '>= 0.14.22'
26-
gem.add_runtime_dependency "webhdfs", '>= 0.6.0'
26+
gem.add_runtime_dependency "webhdfs", '>= 0.10.0'
2727
end

lib/fluent/plugin/out_webhdfs.rb

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,10 @@ class Fluent::Plugin::WebHDFSOutput < Fluent::Plugin::Output
6666
config_param :kerberos, :bool, default: false
6767
desc 'kerberos keytab file'
6868
config_param :kerberos_keytab, :string, default: nil
69+
desc 'Use delegation token while upload webhdfs or not'
70+
config_param :renew_kerberos_delegation_token, :bool, default: false
71+
desc 'delegation token reuse timer (default 8h)'
72+
config_param :renew_kerberos_delegation_token_interval, :time, default: 8 * 60 * 60
6973

7074
SUPPORTED_COMPRESS = [:gzip, :bzip2, :snappy, :hadoop_snappy, :lzo_command, :zstd, :text]
7175
desc "Compression method (#{SUPPORTED_COMPRESS.join(',')})"
@@ -184,6 +188,14 @@ def configure(conf)
184188
raise Fluent::ConfigError, "Path on hdfs MUST starts with '/', but '#{@path}'"
185189
end
186190

191+
@renew_kerberos_delegation_token_interval_hour = nil
192+
if @renew_kerberos_delegation_token
193+
unless @username
194+
raise Fluent::ConfigError, "username is missing. If you want to reuse delegation token, follow with kerberos accounts"
195+
end
196+
@renew_kerberos_delegation_token_interval_hour = @renew_kerberos_delegation_token_interval / 60 / 60
197+
end
198+
187199
@client = prepare_client(@namenode_host, @namenode_port, @username)
188200
if @standby_namenode_host
189201
@client_standby = prepare_client(@standby_namenode_host, @standby_namenode_port, @username)
@@ -203,7 +215,7 @@ def multi_workers_ready?
203215
end
204216

205217
def prepare_client(host, port, username)
206-
client = WebHDFS::Client.new(host, port, username)
218+
client = WebHDFS::Client.new(host, port, username, nil, nil, nil, {}, @renew_kerberos_delegation_token_interval_hour)
207219
if @httpfs
208220
client.httpfs_mode = true
209221
end

test/plugin/test_out_webhdfs.rb

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -316,4 +316,99 @@ def test_time_key_without_buffer_section
316316
assert_equal "2017-01-24T20:10:30Z\ttest.now\t{\"message\":\"yay\",\"name\":\"tagomoris\"}\n", line
317317
end
318318
end
319+
320+
sub_test_case "kerberos config" do
321+
CONFIG_KERBEROS = config_element(
322+
"ROOT", "", {
323+
"namenode" => "server.local:14000",
324+
"path" => "/hdfs/path/file.%Y%m%d.%H%M.log",
325+
"username" => "hdfs_user",
326+
"kerberos" => true,
327+
"kerberos_keytab" => "/path/to/kerberos.keytab",
328+
})
329+
330+
test "renew_kerberos_delegation_token default" do
331+
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, nil).once
332+
333+
d = create_driver(CONFIG_KERBEROS)
334+
335+
assert_equal(
336+
{
337+
kerberos: true,
338+
renew_kerberos_delegation_token: false,
339+
renew_kerberos_delegation_token_interval_hour: nil,
340+
},
341+
{
342+
kerberos: d.instance.kerberos,
343+
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
344+
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
345+
})
346+
end
347+
348+
test "default renew_kerberos_delegation_token_interval" do
349+
expected_hour = 8
350+
351+
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once
352+
353+
d = create_driver(CONFIG_KERBEROS +
354+
config_element("", "", { "renew_kerberos_delegation_token" => true }))
355+
356+
assert_equal(
357+
{
358+
kerberos: true,
359+
renew_kerberos_delegation_token: true,
360+
renew_kerberos_delegation_token_interval: expected_hour * 60 * 60,
361+
renew_kerberos_delegation_token_interval_hour: expected_hour,
362+
},
363+
{
364+
kerberos: d.instance.kerberos,
365+
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
366+
renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"),
367+
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
368+
})
369+
end
370+
371+
test "renew_kerberos_delegation_token_interval" do
372+
expected_hour = 10
373+
374+
mock.proxy(WebHDFS::Client).new("server.local", 14000, "hdfs_user", nil, nil, nil, {}, expected_hour).once
375+
376+
d = create_driver(
377+
CONFIG_KERBEROS +
378+
config_element(
379+
"", "",
380+
{
381+
"renew_kerberos_delegation_token" => true,
382+
"renew_kerberos_delegation_token_interval" => "#{expected_hour}h",
383+
}))
384+
385+
assert_equal(
386+
{
387+
kerberos: true,
388+
renew_kerberos_delegation_token: true,
389+
renew_kerberos_delegation_token_interval: expected_hour * 60 * 60,
390+
renew_kerberos_delegation_token_interval_hour: expected_hour,
391+
},
392+
{
393+
kerberos: d.instance.kerberos,
394+
renew_kerberos_delegation_token: d.instance.instance_eval("@renew_kerberos_delegation_token"),
395+
renew_kerberos_delegation_token_interval: d.instance.instance_eval("@renew_kerberos_delegation_token_interval"),
396+
renew_kerberos_delegation_token_interval_hour: d.instance.instance_eval("@renew_kerberos_delegation_token_interval_hour"),
397+
})
398+
end
399+
400+
test "username is required for renew_kerberos_delegation_token" do
401+
conf = config_element(
402+
"ROOT", "", {
403+
"namenode" => "server.local:14000",
404+
"path" => "/hdfs/path/file.%Y%m%d.%H%M.log",
405+
"kerberos" => true,
406+
"renew_kerberos_delegation_token" => true,
407+
})
408+
409+
assert_raise(Fluent::ConfigError) do
410+
create_driver(conf)
411+
end
412+
end
413+
end
319414
end

0 commit comments

Comments
 (0)