3535 WinSslConfigurationContext ,
3636)
3737
38+ from .entities import OAuth2Options
39+
3840logger = logging .getLogger (__name__ )
3941
4042MT = TypeVar ("MT" )
@@ -60,6 +62,7 @@ def __init__(
6062 ssl_context : Union [
6163 PosixSslConfigurationContext , WinSslConfigurationContext , None
6264 ] = None ,
65+ oauth2_options : Optional [OAuth2Options ] = None ,
6366 recovery_configuration : RecoveryConfiguration = RecoveryConfiguration (),
6467 ):
6568 """
@@ -93,6 +96,7 @@ def __init__(
9396 self ._index : int = - 1
9497 self ._publishers : list [Publisher ] = []
9598 self ._consumers : list [Consumer ] = []
99+ self ._oauth2_options = oauth2_options
96100
97101 # Some recovery_configuration validation
98102 if recovery_configuration .back_off_reconnect_interval < timedelta (seconds = 1 ):
@@ -109,19 +113,8 @@ def _set_environment_connection_list(self, connections: []): # type: ignore
109113 def _open_connections (self , reconnect_handlers : bool = False ) -> None :
110114
111115 logger .debug ("inside connection._open_connections creating connection" )
112- if self ._recovery_configuration .active_recovery is False :
113- self ._conn = BlockingConnection (
114- url = self ._addr ,
115- urls = self ._addrs ,
116- ssl_domain = self ._ssl_domain ,
117- )
118- else :
119- self ._conn = BlockingConnection (
120- url = self ._addr ,
121- urls = self ._addrs ,
122- ssl_domain = self ._ssl_domain ,
123- on_disconnection_handler = self ._on_disconnection ,
124- )
116+
117+ self ._create_connection ()
125118
126119 if reconnect_handlers is True :
127120 logger .debug ("reconnecting managements, publishers and consumers handlers" )
@@ -137,6 +130,35 @@ def _open_connections(self, reconnect_handlers: bool = False) -> None:
137130 # Update the broken connection and sender in the consumer
138131 self ._consumers [i ]._update_connection (self ._conn )
139132
133+ def _create_connection (self ):
134+
135+ user = None
136+ password = None
137+
138+ if self ._oauth2_options is not None :
139+ user = ""
140+ password = self ._oauth2_options .token
141+
142+ if self ._recovery_configuration .active_recovery is False :
143+ self ._conn = BlockingConnection (
144+ url = self ._addr ,
145+ urls = self ._addrs ,
146+ oauth2_options = self ._oauth2_options ,
147+ ssl_domain = self ._ssl_domain ,
148+ user = user ,
149+ password = password ,
150+ )
151+ else :
152+ self ._conn = BlockingConnection (
153+ url = self ._addr ,
154+ urls = self ._addrs ,
155+ oauth2_options = self ._oauth2_options ,
156+ ssl_domain = self ._ssl_domain ,
157+ on_disconnection_handler = self ._on_disconnection ,
158+ user = user ,
159+ password = password ,
160+ )
161+
140162 def dial (self ) -> None :
141163 """
142164 Establish a connection to the AMQP server.
0 commit comments