Skip to content
15 changes: 15 additions & 0 deletions kombu/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ class Connection:
keyword argument at the same time.
:keyword userid: Default user name if not provided in the URL.
:keyword password: Default password if not provided in the URL.
Can be a string or a callable returning a string.
If a callable is provided, it will be invoked on each
connection/reconnection attempt, enabling credential refresh
(e.g., for expiring tokens).
Note: callable passwords are currently resolved by the
``pyamqp`` transport only. Other transports (e.g.
``librabbitmq``) will not invoke the callable.
Callable passwords are only supported via programmatic
configuration, not via URL-based configuration.
The callable must be thread-safe if used in a multi-threaded
context, and picklable if using the ``spawn`` start method.
:keyword virtual_host: Default virtual host if not provided in the URL.
:keyword port: Default port if not provided in the URL.
"""
Expand Down Expand Up @@ -738,6 +749,10 @@ def as_uri(self, include_password=False, mask='**',
return connection_as_uri
fields = self.info()
port, userid, password, vhost, transport = getfields(fields)
if not include_password:
password = mask
elif callable(password):
password = password()

return as_url(
transport, hostname, port, userid, password, quote(vhost),
Expand Down
7 changes: 6 additions & 1 deletion kombu/transport/pyamqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,15 @@ def establish_connection(self):
'server_hostname' in conninfo.ssl and \
conninfo.ssl['server_hostname'] is None:
conninfo.ssl['server_hostname'] = conninfo.hostname
# Resolve callable password to support credential refresh
# on each connection/reconnection attempt.
password = conninfo.password
if callable(password):
password = password()
opts = dict({
'host': conninfo.host,
'userid': conninfo.userid,
'password': conninfo.password,
'password': password,
'login_method': conninfo.login_method,
'virtual_host': conninfo.virtual_host,
'insist': conninfo.insist,
Expand Down
35 changes: 35 additions & 0 deletions t/unit/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,6 +839,41 @@ def test_connection_timeout_with_callback(self):
callback.assert_called()


class test_Connection_callable_password:

def test_connection_preserves_callable_password(self):
"""Callable password is stored without coercion."""
password_func = Mock(return_value='secret')
conn = Connection(port=5672, transport=Transport,
password=password_func)
assert conn.password is password_func

def test_clone_preserves_callable_password(self):
"""Cloned connection preserves callable password."""
password_func = Mock(return_value='secret')
conn = Connection(port=5672, transport=Transport,
password=password_func)
cloned = conn.clone()
assert cloned.password is password_func

def test_as_uri_with_callable_password(self):
"""as_uri() resolves callable password without crashing."""
password_func = Mock(return_value='secret_token')
conn = Connection(
'amqp://user@localhost:5672//',
password=password_func,
transport=Transport,
)
# Without include_password, password is masked and callable NOT invoked
uri = conn.as_uri()
assert '**' in uri
password_func.assert_not_called()
# With include_password, the resolved password appears
uri_with_pass = conn.as_uri(include_password=True)
assert 'secret_token' in uri_with_pass
password_func.assert_called_once()


class test_Connection_with_transport_options:

transport_options = {'pool_recycler': 3600, 'echo': True}
Expand Down
59 changes: 59 additions & 0 deletions t/unit/transport/test_pyamqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,62 @@ def test_get_manager(self):
t = pyamqp.Transport(Mock())
t.get_manager(1, kw=2)
get_manager.assert_called_with(t.client, 1, kw=2)

def test_establish_connection_callable_password(self):
"""Callable password is invoked and resolved string passed to
amqp.Connection."""
class Transport(pyamqp.Transport):
Connection = MagicMock()

password_func = Mock(return_value='fresh_token')
conn = Connection(
'amqp://user@localhost/',
password=password_func,
transport=Transport,
)
conn.connect()
password_func.assert_called_once()
_, kwargs = Transport.Connection.call_args
assert kwargs['password'] == 'fresh_token'

def test_establish_connection_string_password_unchanged(self):
"""Static string password still works identically."""
class Transport(pyamqp.Transport):
Connection = MagicMock()

conn = Connection(
'amqp://user@localhost/',
password='static_pass',
transport=Transport,
)
conn.connect()
_, kwargs = Transport.Connection.call_args
assert kwargs['password'] == 'static_pass'

def test_reconnection_invokes_callable_each_time(self):
"""Each reconnection invokes the callable for a fresh password."""
call_count = 0

def password_func():
nonlocal call_count
call_count += 1
return f'token_{call_count}'

class Transport(pyamqp.Transport):
Connection = MockConnection

conn = Connection(
'amqp://user@localhost/',
password=password_func,
transport=Transport,
)
# First connection
c1 = conn.connect()
assert c1['password'] == 'token_1'
assert call_count == 1

# Simulate disconnect + reconnect
conn.close()
c2 = conn.connect()
assert c2['password'] == 'token_2'
assert call_count == 2
Loading