128
128
from pymongo .asynchronous .bulk import _AsyncBulk
129
129
from pymongo .asynchronous .client_session import AsyncClientSession , _ServerSession
130
130
from pymongo .asynchronous .cursor import _ConnectionManager
131
+ from pymongo .asynchronous .encryption import _Encrypter
131
132
from pymongo .asynchronous .pool import AsyncConnection
132
133
from pymongo .asynchronous .server import Server
133
134
from pymongo .read_concern import ReadConcern
@@ -840,26 +841,26 @@ def __init__(
840
841
options .read_concern ,
841
842
)
842
843
843
- self ._topology_settings = TopologySettings (
844
- seeds = seeds ,
845
- replica_set_name = options .replica_set_name ,
846
- pool_class = pool_class ,
847
- pool_options = options .pool_options ,
848
- monitor_class = monitor_class ,
849
- condition_class = condition_class ,
850
- local_threshold_ms = options .local_threshold_ms ,
851
- server_selection_timeout = options .server_selection_timeout ,
852
- server_selector = options .server_selector ,
853
- heartbeat_frequency = options .heartbeat_frequency ,
854
- fqdn = fqdn ,
855
- direct_connection = options .direct_connection ,
856
- load_balanced = options .load_balanced ,
857
- srv_service_name = srv_service_name ,
858
- srv_max_hosts = srv_max_hosts ,
859
- server_monitoring_mode = options .server_monitoring_mode ,
860
- )
861
-
862
- self ._topology = Topology (self ._topology_settings )
844
+ # self._topology_settings = TopologySettings(
845
+ # seeds=seeds,
846
+ # replica_set_name=options.replica_set_name,
847
+ # pool_class=pool_class,
848
+ # pool_options=options.pool_options,
849
+ # monitor_class=monitor_class,
850
+ # condition_class=condition_class,
851
+ # local_threshold_ms=options.local_threshold_ms,
852
+ # server_selection_timeout=options.server_selection_timeout,
853
+ # server_selector=options.server_selector,
854
+ # heartbeat_frequency=options.heartbeat_frequency,
855
+ # fqdn=fqdn,
856
+ # direct_connection=options.direct_connection,
857
+ # load_balanced=options.load_balanced,
858
+ # srv_service_name=srv_service_name,
859
+ # srv_max_hosts=srv_max_hosts,
860
+ # server_monitoring_mode=options.server_monitoring_mode,
861
+ # )
862
+ #
863
+ # self._topology = Topology(self._topology_settings)
863
864
864
865
self ._opened = False
865
866
self ._closed = False
@@ -878,7 +879,7 @@ def __init__(
878
879
if _IS_SYNC and connect :
879
880
self ._get_topology () # type: ignore[unused-coroutine]
880
881
881
- self ._encrypter = None
882
+ self ._encrypter : Optional [ _Encrypter ] = None
882
883
self ._timeout = self ._options .timeout
883
884
884
885
def _resolve_uri (self ) -> None :
@@ -1018,7 +1019,6 @@ async def aconnect(self) -> None:
1018
1019
await self ._get_topology ()
1019
1020
1020
1021
def _init_background (self , old_pid : Optional [int ] = None ) -> None :
1021
- self ._topology = Topology (self ._topology_settings )
1022
1022
# Seed the topology with the old one's pid so we can detect clients
1023
1023
# that are opened before a fork and used after.
1024
1024
self ._topology ._pid = old_pid
@@ -1235,14 +1235,20 @@ def options(self) -> ClientOptions:
1235
1235
1236
1236
def __eq__ (self , other : Any ) -> bool :
1237
1237
if isinstance (other , self .__class__ ):
1238
- return self ._topology == other ._topology
1238
+ if hasattr (self , "_topology" ):
1239
+ return self ._topology == other ._topology
1240
+ else :
1241
+ raise InvalidOperation ("Cannot perform operation until client is connected" )
1239
1242
return NotImplemented
1240
1243
1241
1244
def __ne__ (self , other : Any ) -> bool :
1242
1245
return not self == other
1243
1246
1244
1247
def __hash__ (self ) -> int :
1245
- return hash (self ._topology )
1248
+ if hasattr (self , "_topology" ):
1249
+ return hash (self ._topology )
1250
+ else :
1251
+ raise InvalidOperation ("Cannot perform operation until client is connected" )
1246
1252
1247
1253
def _repr_helper (self ) -> str :
1248
1254
def option_repr (option : str , value : Any ) -> str :
@@ -1278,7 +1284,9 @@ def option_repr(option: str, value: Any) -> str:
1278
1284
return ", " .join (options )
1279
1285
1280
1286
def __repr__ (self ) -> str :
1281
- return f"{ type (self ).__name__ } ({ self ._repr_helper ()} )"
1287
+ if hasattr (self , "_topology" ):
1288
+ return f"{ type (self ).__name__ } ({ self ._repr_helper ()} )"
1289
+ raise InvalidOperation ("Cannot perform operation until client is connected" )
1282
1290
1283
1291
def __getattr__ (self , name : str ) -> database .AsyncDatabase [_DocumentType ]:
1284
1292
"""Get a database by name.
@@ -1670,12 +1678,12 @@ async def close(self) -> None:
1670
1678
.. versionchanged:: 3.6
1671
1679
End all server sessions created by this client.
1672
1680
"""
1673
- session_ids = self ._topology .pop_all_sessions ()
1674
- if session_ids :
1675
- await self ._end_sessions (session_ids )
1676
- # Stop the periodic task thread and then send pending killCursor
1677
- # requests before closing the topology.
1678
1681
if self ._opened :
1682
+ session_ids = self ._topology .pop_all_sessions ()
1683
+ if session_ids :
1684
+ await self ._end_sessions (session_ids )
1685
+ # Stop the periodic task thread and then send pending killCursor
1686
+ # requests before closing the topology.
1679
1687
self ._kill_cursors_executor .close ()
1680
1688
await self ._process_kill_cursors ()
1681
1689
await self ._topology .close ()
0 commit comments