@@ -757,7 +757,7 @@ def __init__(
757757 raise TypeError (f"port must be an instance of int, not { type (port )} " )
758758 self ._host = host
759759 self ._port = port
760- self ._topology : Optional [ Topology ] = None
760+ self ._topology : Topology = None # type: ignore[assignment]
761761
762762 # _pool_class, _monitor_class, and _condition_class are for deep
763763 # customization of PyMongo, e.g. Motor.
@@ -1036,7 +1036,6 @@ def _should_pin_cursor(self, session: Optional[AsyncClientSession]) -> Optional[
10361036
10371037 def _after_fork (self ) -> None :
10381038 """Resets topology in a child after successfully forking."""
1039- assert self ._topology is not None
10401039 self ._init_background (self ._topology ._pid )
10411040 # Reset the session pool to avoid duplicate sessions in the child process.
10421041 self ._topology ._session_pool .reset ()
@@ -1196,7 +1195,6 @@ def topology_description(self) -> TopologyDescription:
11961195
11971196 .. versionadded:: 4.0
11981197 """
1199- assert self ._topology is not None
12001198 return self ._topology .description
12011199
12021200 @property
@@ -1210,7 +1208,6 @@ def nodes(self) -> FrozenSet[_Address]:
12101208 to any servers, or a network partition causes it to lose connection
12111209 to all servers.
12121210 """
1213- assert self ._topology is not None
12141211 description = self ._topology .description
12151212 return frozenset (s .address for s in description .known_servers )
12161213
@@ -1384,7 +1381,6 @@ def _ensure_session(
13841381 def _send_cluster_time (
13851382 self , command : MutableMapping [str , Any ], session : Optional [AsyncClientSession ]
13861383 ) -> None :
1387- assert self ._topology is not None
13881384 topology_time = self ._topology .max_cluster_time ()
13891385 session_time = session .cluster_time if session else None
13901386 if topology_time and session_time :
@@ -1570,7 +1566,6 @@ async def address(self) -> Optional[tuple[str, int]]:
15701566
15711567 .. versionadded:: 3.0
15721568 """
1573- assert self ._topology is not None
15741569 topology_type = self ._topology ._description .topology_type
15751570 if (
15761571 topology_type == TOPOLOGY_TYPE .Sharded
@@ -1593,7 +1588,6 @@ async def primary(self) -> Optional[tuple[str, int]]:
15931588 .. versionadded:: 3.0
15941589 AsyncMongoClient gained this property in version 3.0.
15951590 """
1596- assert self ._topology is not None
15971591 return await self ._topology .get_primary () # type: ignore[return-value]
15981592
15991593 @property
@@ -1607,7 +1601,6 @@ async def secondaries(self) -> set[_Address]:
16071601 .. versionadded:: 3.0
16081602 AsyncMongoClient gained this property in version 3.0.
16091603 """
1610- assert self ._topology is not None
16111604 return await self ._topology .get_secondaries ()
16121605
16131606 @property
@@ -1618,7 +1611,6 @@ async def arbiters(self) -> set[_Address]:
16181611 connected to a replica set, there are no arbiters, or this client was
16191612 created without the `replicaSet` option.
16201613 """
1621- assert self ._topology is not None
16221614 return await self ._topology .get_arbiters ()
16231615
16241616 @property
@@ -1677,25 +1669,26 @@ async def close(self) -> None:
16771669 .. versionchanged:: 3.6
16781670 End all server sessions created by this client.
16791671 """
1680- if self ._topology is not None :
1681- session_ids = self ._topology .pop_all_sessions ()
1682- if session_ids :
1683- await self ._end_sessions (session_ids )
1684- # Stop the periodic task thread and then send pending killCursor
1685- # requests before closing the topology.
1686- self ._kill_cursors_executor .close ()
1687- await self ._process_kill_cursors ()
1688- await self ._topology .close ()
1689- if self ._encrypter :
1690- # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
1691- await self ._encrypter .close ()
1692- self ._closed = True
1693- if not _IS_SYNC :
1694- await asyncio .gather (
1695- self ._topology .cleanup_monitors (), # type: ignore[func-returns-value]
1696- self ._kill_cursors_executor .join (), # type: ignore[func-returns-value]
1697- return_exceptions = True ,
1698- )
1672+ if self ._topology is None :
1673+ return
1674+ session_ids = self ._topology .pop_all_sessions ()
1675+ if session_ids :
1676+ await self ._end_sessions (session_ids )
1677+ # Stop the periodic task thread and then send pending killCursor
1678+ # requests before closing the topology.
1679+ self ._kill_cursors_executor .close ()
1680+ await self ._process_kill_cursors ()
1681+ await self ._topology .close ()
1682+ if self ._encrypter :
1683+ # TODO: PYTHON-1921 Encrypted MongoClients cannot be re-opened.
1684+ await self ._encrypter .close ()
1685+ self ._closed = True
1686+ if not _IS_SYNC :
1687+ await asyncio .gather (
1688+ self ._topology .cleanup_monitors (), # type: ignore[func-returns-value]
1689+ self ._kill_cursors_executor .join (), # type: ignore[func-returns-value]
1690+ return_exceptions = True ,
1691+ )
16991692
17001693 if not _IS_SYNC :
17011694 # Add support for contextlib.aclosing.
@@ -1711,12 +1704,10 @@ async def _get_topology(self) -> Topology:
17111704 if self ._resolve_srv_info ["is_srv" ]:
17121705 await self ._resolve_srv ()
17131706 self ._init_background ()
1714- assert self ._topology is not None
17151707 await self ._topology .open ()
17161708 async with self ._lock :
17171709 self ._kill_cursors_executor .open ()
17181710 self ._opened = True
1719- assert self ._topology is not None
17201711 return self ._topology
17211712
17221713 @contextlib .asynccontextmanager
@@ -1819,7 +1810,6 @@ async def _conn_from_server(
18191810 # Thread safe: if the type is single it cannot change.
18201811 # NOTE: We already opened the Topology when selecting a server so there's no need
18211812 # to call _get_topology() again.
1822- assert self ._topology is not None
18231813 single = self ._topology .description .topology_type == TOPOLOGY_TYPE .Single
18241814 async with self ._checkout (server , session ) as conn :
18251815 if single :
@@ -2159,7 +2149,6 @@ async def _process_kill_cursors(self) -> None:
21592149 """Process any pending kill cursors requests."""
21602150 address_to_cursor_ids = defaultdict (list )
21612151 pinned_cursors = []
2162- assert self ._topology is not None
21632152
21642153 # Other threads or the GC may append to the queue concurrently.
21652154 while True :
@@ -2201,7 +2190,6 @@ async def _process_periodic_tasks(self) -> None:
22012190 """Process any pending kill cursors requests and
22022191 maintain connection pool parameters.
22032192 """
2204- assert self ._topology is not None
22052193 try :
22062194 await self ._process_kill_cursors ()
22072195 await self ._topology .update_pool ()
@@ -2217,7 +2205,6 @@ def _return_server_session(
22172205 """Internal: return a _ServerSession to the pool."""
22182206 if isinstance (server_session , _EmptyServerSession ):
22192207 return None
2220- assert self ._topology is not None
22212208 return self ._topology .return_server_session (server_session )
22222209
22232210 @contextlib .asynccontextmanager
@@ -2255,7 +2242,6 @@ async def _tmp_session(
22552242 async def _process_response (
22562243 self , reply : Mapping [str , Any ], session : Optional [AsyncClientSession ]
22572244 ) -> None :
2258- assert self ._topology is not None
22592245 await self ._topology .receive_cluster_time (reply .get ("$clusterTime" ))
22602246 if session is not None :
22612247 session ._process_response (reply )
0 commit comments