Skip to content

Commit 426d227

Browse files
author
DanielePalaia
committed
adding oauth configuration
1 parent 055c0e1 commit 426d227

File tree

3 files changed

+46
-15
lines changed

3 files changed

+46
-15
lines changed

rabbitmq_amqp_python_client/connection.py

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414

1515
from .address_helper import validate_address
1616
from .consumer import Consumer
17-
from .entities import RecoveryConfiguration, StreamOptions
17+
from .entities import (
18+
OAuth2Options,
19+
RecoveryConfiguration,
20+
StreamOptions,
21+
)
1822
from .exceptions import (
1923
ArgumentOutOfRangeException,
2024
ValidationCodeException,
@@ -60,6 +64,7 @@ def __init__(
6064
ssl_context: Union[
6165
PosixSslConfigurationContext, WinSslConfigurationContext, None
6266
] = None,
67+
oauth2_options: Optional[OAuth2Options] = None,
6368
recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(),
6469
):
6570
"""
@@ -93,6 +98,7 @@ def __init__(
9398
self._index: int = -1
9499
self._publishers: list[Publisher] = []
95100
self._consumers: list[Consumer] = []
101+
self._oauth2_options = oauth2_options
96102

97103
# Some recovery_configuration validation
98104
if recovery_configuration.back_off_reconnect_interval < timedelta(seconds=1):
@@ -109,19 +115,8 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
109115
def _open_connections(self, reconnect_handlers: bool = False) -> None:
110116

111117
logger.debug("inside connection._open_connections creating connection")
112-
if self._recovery_configuration.active_recovery is False:
113-
self._conn = BlockingConnection(
114-
url=self._addr,
115-
urls=self._addrs,
116-
ssl_domain=self._ssl_domain,
117-
)
118-
else:
119-
self._conn = BlockingConnection(
120-
url=self._addr,
121-
urls=self._addrs,
122-
ssl_domain=self._ssl_domain,
123-
on_disconnection_handler=self._on_disconnection,
124-
)
118+
119+
self._create_connection()
125120

126121
if reconnect_handlers is True:
127122
logger.debug("reconnecting managements, publishers and consumers handlers")
@@ -137,6 +132,35 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
137132
# Update the broken connection and sender in the consumer
138133
self._consumers[i]._update_connection(self._conn)
139134

135+
def _create_connection(self) -> None:
136+
137+
user = None
138+
password = None
139+
140+
if self._oauth2_options is not None:
141+
user = ""
142+
password = self._oauth2_options.token
143+
144+
if self._recovery_configuration.active_recovery is False:
145+
self._conn = BlockingConnection(
146+
url=self._addr,
147+
urls=self._addrs,
148+
oauth2_options=self._oauth2_options,
149+
ssl_domain=self._ssl_domain,
150+
user=user,
151+
password=password,
152+
)
153+
else:
154+
self._conn = BlockingConnection(
155+
url=self._addr,
156+
urls=self._addrs,
157+
oauth2_options=self._oauth2_options,
158+
ssl_domain=self._ssl_domain,
159+
on_disconnection_handler=self._on_disconnection,
160+
user=user,
161+
password=password,
162+
)
163+
140164
def dial(self) -> None:
141165
"""
142166
Establish a connection to the AMQP server.

rabbitmq_amqp_python_client/entities.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,3 +257,8 @@ class RecoveryConfiguration:
257257
active_recovery: bool = True
258258
back_off_reconnect_interval: timedelta = timedelta(seconds=5)
259259
MaxReconnectAttempts: int = 5
260+
261+
262+
@dataclass
263+
class OAuth2Options:
264+
token: str

rabbitmq_amqp_python_client/environment.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
)
1010

1111
from .connection import Connection
12-
from .entities import RecoveryConfiguration
12+
from .entities import OAuth2Options, RecoveryConfiguration
1313
from .ssl_configuration import (
1414
PosixSslConfigurationContext,
1515
WinSslConfigurationContext,
@@ -41,6 +41,7 @@ def __init__(
4141
ssl_context: Union[
4242
PosixSslConfigurationContext, WinSslConfigurationContext, None
4343
] = None,
44+
oauth2_options: Optional[OAuth2Options] = None,
4445
recovery_configuration: RecoveryConfiguration = RecoveryConfiguration(),
4546
):
4647
"""
@@ -66,6 +67,7 @@ def __init__(
6667
self._ssl_context = ssl_context
6768
self._recovery_configuration = recovery_configuration
6869
self._connections: list[Connection] = []
70+
self._oauth2_options = oauth2_options
6971

7072
def connection(
7173
self,

0 commit comments

Comments
 (0)