Skip to content

Commit 9f12764

Browse files
committed
Add Kerberos authentication support to HadoopFileSystem #20719
1 parent d022338 commit 9f12764

File tree

4 files changed

+123
-3
lines changed

4 files changed

+123
-3
lines changed

CHANGES.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
## New Features / Improvements
7171

7272
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
73+
* HadoopFileSystem now supports Kerberos authentication via `--hdfs_client=KERBEROS` flag (Python) ([#20719](https://github.com/apache/beam/issues/20719)).
7374

7475
## Breaking Changes
7576

@@ -2349,4 +2350,4 @@ Schema Options, it will be removed in version `2.23.0`. ([BEAM-9704](https://iss
23492350

23502351
## Highlights
23512352

2352-
- For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).
2353+
- - For versions 2.19.0 and older release notes are available on [Apache Beam Blog](https://beam.apache.org/blog/).

sdks/python/apache_beam/io/hadoopfilesystem.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@
4040
except ImportError:
4141
hdfs = None
4242

43+
try:
44+
from hdfs.ext.kerberos import KerberosClient
45+
except ImportError:
46+
KerberosClient = None
47+
4348
__all__ = ['HadoopFileSystem']
4449

4550
_HDFS_PREFIX = 'hdfs:/'
@@ -123,11 +128,13 @@ def __init__(self, pipeline_options):
123128
hdfs_host = hdfs_options.hdfs_host
124129
hdfs_port = hdfs_options.hdfs_port
125130
hdfs_user = hdfs_options.hdfs_user
131+
hdfs_client = hdfs_options.hdfs_client
126132
self._full_urls = hdfs_options.hdfs_full_urls
127133
else:
128134
hdfs_host = pipeline_options.get('hdfs_host')
129135
hdfs_port = pipeline_options.get('hdfs_port')
130136
hdfs_user = pipeline_options.get('hdfs_user')
137+
hdfs_client = pipeline_options.get('hdfs_client', 'INSECURE')
131138
self._full_urls = pipeline_options.get('hdfs_full_urls', False)
132139

133140
if hdfs_host is None:
@@ -139,8 +146,25 @@ def __init__(self, pipeline_options):
139146
if not isinstance(self._full_urls, bool):
140147
raise ValueError(
141148
'hdfs_full_urls should be bool, got: %s', self._full_urls)
142-
self._hdfs_client = hdfs.InsecureClient(
143-
'http://%s:%s' % (hdfs_host, str(hdfs_port)), user=hdfs_user)
149+
150+
# Create HDFS client based on authentication type
151+
url = 'http://%s:%s' % (hdfs_host, str(hdfs_port))
152+
if hdfs_client == 'KERBEROS':
153+
if KerberosClient is None:
154+
raise ImportError(
155+
'Kerberos authentication requires the requests-kerberos library. '
156+
'Install it with: pip install requests-kerberos')
157+
_LOGGER.info('Using KerberosClient for HDFS authentication')
158+
try:
159+
self._hdfs_client = KerberosClient(url)
160+
except Exception as e:
161+
raise RuntimeError(
162+
'Failed to create KerberosClient. Ensure you have valid Kerberos '
163+
'credentials (run kinit) or have configured a keytab. '
164+
'Error: %s' % str(e))
165+
else:
166+
# Default to INSECURE for backward compatibility
167+
self._hdfs_client = hdfs.InsecureClient(url, user=hdfs_user)
144168

145169
@classmethod
146170
def scheme(cls):

sdks/python/apache_beam/io/hadoopfilesystem_test.py

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -668,6 +668,93 @@ def test_dict_options_full_urls(self):
668668
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
669669
self.assertTrue(self.fs._full_urls)
670670

671+
def test_insecure_client_default(self):
672+
"""Test that InsecureClient is used by default."""
673+
pipeline_options = PipelineOptions()
674+
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
675+
hdfs_options.hdfs_host = 'localhost'
676+
hdfs_options.hdfs_port = 9870
677+
hdfs_options.hdfs_user = 'testuser'
678+
# hdfs_client not specified, should default to INSECURE
679+
680+
self.fs = hdfs.HadoopFileSystem(pipeline_options)
681+
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
682+
683+
def test_insecure_client_explicit(self):
684+
"""Test that InsecureClient is used when explicitly specified."""
685+
pipeline_options = PipelineOptions()
686+
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
687+
hdfs_options.hdfs_host = 'localhost'
688+
hdfs_options.hdfs_port = 9870
689+
hdfs_options.hdfs_user = 'testuser'
690+
hdfs_options.hdfs_client = 'INSECURE'
691+
692+
self.fs = hdfs.HadoopFileSystem(pipeline_options)
693+
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
694+
695+
def test_kerberos_client_missing_library(self):
696+
"""Test that Kerberos client fails gracefully when library not installed."""
697+
pipeline_options = PipelineOptions()
698+
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
699+
hdfs_options.hdfs_host = 'localhost'
700+
hdfs_options.hdfs_port = 9870
701+
hdfs_options.hdfs_user = 'testuser'
702+
hdfs_options.hdfs_client = 'KERBEROS'
703+
704+
# Temporarily set KerberosClient to None to simulate missing library
705+
original_kerberos_client = hdfs.KerberosClient
706+
hdfs.KerberosClient = None
707+
708+
try:
709+
with self.assertRaisesRegex(
710+
ImportError, r'requests-kerberos'):
711+
hdfs.HadoopFileSystem(pipeline_options)
712+
finally:
713+
hdfs.KerberosClient = original_kerberos_client
714+
715+
def test_kerberos_client_creation(self):
716+
"""Test that KerberosClient is created when specified."""
717+
pipeline_options = PipelineOptions()
718+
hdfs_options = pipeline_options.view_as(HadoopFileSystemOptions)
719+
hdfs_options.hdfs_host = 'localhost'
720+
hdfs_options.hdfs_port = 9870
721+
hdfs_options.hdfs_user = 'testuser'
722+
hdfs_options.hdfs_client = 'KERBEROS'
723+
724+
# Mock KerberosClient to return our FakeHdfs
725+
if hdfs.KerberosClient is not None:
726+
original_kerberos_client = hdfs.KerberosClient
727+
hdfs.KerberosClient = lambda *args, **kwargs: self._fake_hdfs
728+
729+
try:
730+
self.fs = hdfs.HadoopFileSystem(pipeline_options)
731+
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
732+
finally:
733+
hdfs.KerberosClient = original_kerberos_client
734+
735+
def test_dict_options_insecure_client(self):
736+
"""Test InsecureClient with dict-based pipeline options."""
737+
pipeline_options = {
738+
'hdfs_host': 'localhost',
739+
'hdfs_port': 9870,
740+
'hdfs_user': 'testuser',
741+
'hdfs_client': 'INSECURE',
742+
}
743+
744+
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
745+
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
746+
747+
def test_dict_options_default_client(self):
748+
"""Test dict options default to INSECURE without hdfs_client."""
749+
pipeline_options = {
750+
'hdfs_host': 'localhost',
751+
'hdfs_port': 9870,
752+
'hdfs_user': 'testuser',
753+
}
754+
755+
self.fs = hdfs.HadoopFileSystem(pipeline_options=pipeline_options)
756+
self.assertIsInstance(self.fs._hdfs_client, FakeHdfs)
757+
671758

672759
if __name__ == '__main__':
673760
logging.getLogger().setLevel(logging.INFO)

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1328,6 +1328,14 @@ def _add_argparse_args(cls, parser):
13281328
'If set, URLs will be parsed as "hdfs://server/path/...", instead '
13291329
'of "hdfs://path/...". The "server" part will be unused (use '
13301330
'--hdfs_host and --hdfs_port).'))
1331+
parser.add_argument(
1332+
'--hdfs_client',
1333+
default='INSECURE',
1334+
choices=['INSECURE', 'KERBEROS'],
1335+
help=(
1336+
'HDFS client type for authentication. INSECURE uses simple '
1337+
'username-based authentication (default). KERBEROS uses Kerberos '
1338+
'authentication (requires kinit or keytab configuration).'))
13311339

13321340
def validate(self, validator):
13331341
errors = []

0 commit comments

Comments
 (0)