Skip to content

Commit fc6ce2a

Browse files
committed
Merge branch 'master' of github.com:mongodb/mongo-python-driver
2 parents 865cecd + 7380097 commit fc6ce2a

File tree

8 files changed

+246
-43
lines changed

8 files changed

+246
-43
lines changed

bson/_cbsonmodule.c

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ static PyObject* _test_long_long_to_str(PyObject* self, PyObject* args) {
207207
*
208208
* Returns a new ref */
209209
static PyObject* _error(char* name) {
210-
PyObject* error;
210+
PyObject* error = NULL;
211211
PyObject* errors = PyImport_ImportModule("bson.errors");
212212
if (!errors) {
213213
return NULL;
@@ -279,7 +279,7 @@ static PyObject* datetime_from_millis(long long millis) {
279279
* micros = diff * 1000 111000
280280
* Resulting in datetime(1, 1, 1, 1, 1, 1, 111000) -- the expected result
281281
*/
282-
PyObject* datetime;
282+
PyObject* datetime = NULL;
283283
int diff = (int)(((millis % 1000) + 1000) % 1000);
284284
int microseconds = diff * 1000;
285285
Time64_T seconds = (millis - diff) / 1000;
@@ -294,7 +294,7 @@ static PyObject* datetime_from_millis(long long millis) {
294294
timeinfo.tm_sec,
295295
microseconds);
296296
if(!datetime) {
297-
PyObject *etype, *evalue, *etrace;
297+
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
298298

299299
/*
300300
* Calling _error clears the error state, so fetch it first.
@@ -350,8 +350,8 @@ static PyObject* datetime_ms_from_millis(PyObject* self, long long millis){
350350
return NULL;
351351
}
352352

353-
PyObject* dt;
354-
PyObject* ll_millis;
353+
PyObject* dt = NULL;
354+
PyObject* ll_millis = NULL;
355355

356356
if (!(ll_millis = PyLong_FromLongLong(millis))){
357357
return NULL;
@@ -1790,7 +1790,7 @@ static PyObject* _cbson_dict_to_bson(PyObject* self, PyObject* args) {
17901790
PyObject* result;
17911791
unsigned char check_keys;
17921792
unsigned char top_level = 1;
1793-
PyObject* options_obj;
1793+
PyObject* options_obj = NULL;
17941794
codec_options_t options;
17951795
buffer_t buffer;
17961796
PyObject* raw_bson_document_bytes_obj;
@@ -2512,8 +2512,8 @@ static PyObject* get_value(PyObject* self, PyObject* name, const char* buffer,
25122512
* Wrap any non-InvalidBSON errors in InvalidBSON.
25132513
*/
25142514
if (PyErr_Occurred()) {
2515-
PyObject *etype, *evalue, *etrace;
2516-
PyObject *InvalidBSON;
2515+
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
2516+
PyObject *InvalidBSON = NULL;
25172517

25182518
/*
25192519
* Calling _error clears the error state, so fetch it first.
@@ -2585,8 +2585,8 @@ static int _element_to_dict(PyObject* self, const char* string,
25852585
if (!*name) {
25862586
/* If NULL is returned then wrap the UnicodeDecodeError
25872587
in an InvalidBSON error */
2588-
PyObject *etype, *evalue, *etrace;
2589-
PyObject *InvalidBSON;
2588+
PyObject *etype = NULL, *evalue = NULL, *etrace = NULL;
2589+
PyObject *InvalidBSON = NULL;
25902590

25912591
PyErr_Fetch(&etype, &evalue, &etrace);
25922592
if (PyErr_GivenExceptionMatches(etype, PyExc_Exception)) {
@@ -2620,7 +2620,7 @@ static PyObject* _cbson_element_to_dict(PyObject* self, PyObject* args) {
26202620
/* TODO: Support buffer protocol */
26212621
char* string;
26222622
PyObject* bson;
2623-
PyObject* options_obj;
2623+
PyObject* options_obj = NULL;
26242624
codec_options_t options;
26252625
unsigned position;
26262626
unsigned max;
@@ -2732,7 +2732,7 @@ static PyObject* _cbson_bson_to_dict(PyObject* self, PyObject* args) {
27322732
int32_t size;
27332733
Py_ssize_t total_size;
27342734
const char* string;
2735-
PyObject* bson;
2735+
PyObject* bson = NULL;
27362736
codec_options_t options;
27372737
PyObject* result = NULL;
27382738
PyObject* options_obj;

pymongo/_cmessagemodule.c

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ struct module_state {
4545
*
4646
* Returns a new ref */
4747
static PyObject* _error(char* name) {
48-
PyObject* error;
48+
PyObject* error = NULL;
4949
PyObject* errors = PyImport_ImportModule("pymongo.errors");
5050
if (!errors) {
5151
return NULL;
@@ -75,9 +75,9 @@ static PyObject* _cbson_query_message(PyObject* self, PyObject* args) {
7575
int begin, cur_size, max_size = 0;
7676
int num_to_skip;
7777
int num_to_return;
78-
PyObject* query;
79-
PyObject* field_selector;
80-
PyObject* options_obj;
78+
PyObject* query = NULL;
79+
PyObject* field_selector = NULL;
80+
PyObject* options_obj = NULL;
8181
codec_options_t options;
8282
buffer_t buffer = NULL;
8383
int length_location, message_length;
@@ -221,12 +221,12 @@ static PyObject* _cbson_op_msg(PyObject* self, PyObject* args) {
221221
/* NOTE just using a random number as the request_id */
222222
int request_id = rand();
223223
unsigned int flags;
224-
PyObject* command;
224+
PyObject* command = NULL;
225225
char* identifier = NULL;
226226
Py_ssize_t identifier_length = 0;
227-
PyObject* docs;
228-
PyObject* doc;
229-
PyObject* options_obj;
227+
PyObject* docs = NULL;
228+
PyObject* doc = NULL;
229+
PyObject* options_obj = NULL;
230230
codec_options_t options;
231231
buffer_t buffer = NULL;
232232
int length_location, message_length;
@@ -535,12 +535,12 @@ static PyObject*
535535
_cbson_encode_batched_op_msg(PyObject* self, PyObject* args) {
536536
unsigned char op;
537537
unsigned char ack;
538-
PyObject* command;
539-
PyObject* docs;
538+
PyObject* command = NULL;
539+
PyObject* docs = NULL;
540540
PyObject* ctx = NULL;
541541
PyObject* to_publish = NULL;
542542
PyObject* result = NULL;
543-
PyObject* options_obj;
543+
PyObject* options_obj = NULL;
544544
codec_options_t options;
545545
buffer_t buffer;
546546
struct module_state *state = GETSTATE(self);
@@ -592,12 +592,12 @@ _cbson_batched_op_msg(PyObject* self, PyObject* args) {
592592
unsigned char ack;
593593
int request_id;
594594
int position;
595-
PyObject* command;
596-
PyObject* docs;
595+
PyObject* command = NULL;
596+
PyObject* docs = NULL;
597597
PyObject* ctx = NULL;
598598
PyObject* to_publish = NULL;
599599
PyObject* result = NULL;
600-
PyObject* options_obj;
600+
PyObject* options_obj = NULL;
601601
codec_options_t options;
602602
buffer_t buffer;
603603
struct module_state *state = GETSTATE(self);
@@ -868,12 +868,12 @@ _cbson_encode_batched_write_command(PyObject* self, PyObject* args) {
868868
char *ns = NULL;
869869
unsigned char op;
870870
Py_ssize_t ns_len;
871-
PyObject* command;
872-
PyObject* docs;
871+
PyObject* command = NULL;
872+
PyObject* docs = NULL;
873873
PyObject* ctx = NULL;
874874
PyObject* to_publish = NULL;
875875
PyObject* result = NULL;
876-
PyObject* options_obj;
876+
PyObject* options_obj = NULL;
877877
codec_options_t options;
878878
buffer_t buffer;
879879
struct module_state *state = GETSTATE(self);

test/asynchronous/helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
from bson.son import SON
4444
from pymongo import common, message
45+
from pymongo.read_preferences import ReadPreference
4546
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
4647
from pymongo.uri_parser import parse_uri
4748

@@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
150151
return authdb.command(cmd)
151152

152153

154+
async def async_repl_set_step_down(client, **kwargs):
155+
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
156+
cmd = SON([("replSetStepDown", 1)])
157+
cmd.update(kwargs)
158+
159+
# Unfreeze a secondary to ensure a speedy election.
160+
await client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY)
161+
await client.admin.command(cmd)
162+
163+
153164
class client_knobs:
154165
def __init__(
155166
self,
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright 2019-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test compliance with the connections survive primary step down spec."""
16+
from __future__ import annotations
17+
18+
import sys
19+
20+
sys.path[0:0] = [""]
21+
22+
from test.asynchronous import AsyncIntegrationTest, async_client_context, unittest
23+
from test.asynchronous.helpers import async_repl_set_step_down
24+
from test.utils import (
25+
CMAPListener,
26+
async_ensure_all_connected,
27+
)
28+
29+
from bson import SON
30+
from pymongo import monitoring
31+
from pymongo.asynchronous.collection import AsyncCollection
32+
from pymongo.errors import NotPrimaryError
33+
from pymongo.write_concern import WriteConcern
34+
35+
_IS_SYNC = False
36+
37+
38+
class TestAsyncConnectionsSurvivePrimaryStepDown(AsyncIntegrationTest):
39+
listener: CMAPListener
40+
coll: AsyncCollection
41+
42+
@classmethod
43+
@async_client_context.require_replica_set
44+
async def _setup_class(cls):
45+
await super()._setup_class()
46+
cls.listener = CMAPListener()
47+
cls.client = await cls.unmanaged_async_rs_or_single_client(
48+
event_listeners=[cls.listener], retryWrites=False, heartbeatFrequencyMS=500
49+
)
50+
51+
# Ensure connections to all servers in replica set. This is to test
52+
# that the is_writable flag is properly updated for connections that
53+
# survive a replica set election.
54+
await async_ensure_all_connected(cls.client)
55+
cls.listener.reset()
56+
57+
cls.db = cls.client.get_database("step-down", write_concern=WriteConcern("majority"))
58+
cls.coll = cls.db.get_collection("step-down", write_concern=WriteConcern("majority"))
59+
60+
@classmethod
61+
async def _tearDown_class(cls):
62+
await cls.client.close()
63+
64+
async def asyncSetUp(self):
65+
# Note that all ops use same write-concern as self.db (majority).
66+
await self.db.drop_collection("step-down")
67+
await self.db.create_collection("step-down")
68+
self.listener.reset()
69+
70+
async def set_fail_point(self, command_args):
71+
cmd = SON([("configureFailPoint", "failCommand")])
72+
cmd.update(command_args)
73+
await self.client.admin.command(cmd)
74+
75+
def verify_pool_cleared(self):
76+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 1)
77+
78+
def verify_pool_not_cleared(self):
79+
self.assertEqual(self.listener.event_count(monitoring.PoolClearedEvent), 0)
80+
81+
@async_client_context.require_version_min(4, 2, -1)
82+
async def test_get_more_iteration(self):
83+
# Insert 5 documents with WC majority.
84+
await self.coll.insert_many([{"data": k} for k in range(5)])
85+
# Start a find operation and retrieve first batch of results.
86+
batch_size = 2
87+
cursor = self.coll.find(batch_size=batch_size)
88+
for _ in range(batch_size):
89+
await cursor.next()
90+
# Force step-down the primary.
91+
await async_repl_set_step_down(self.client, replSetStepDown=5, force=True)
92+
# Get await anext batch of results.
93+
for _ in range(batch_size):
94+
await cursor.next()
95+
# Verify pool not cleared.
96+
self.verify_pool_not_cleared()
97+
# Attempt insertion to mark server description as stale and prevent a
98+
# NotPrimaryError on the subsequent operation.
99+
try:
100+
await self.coll.insert_one({})
101+
except NotPrimaryError:
102+
pass
103+
# Next insert should succeed on the new primary without clearing pool.
104+
await self.coll.insert_one({})
105+
self.verify_pool_not_cleared()
106+
107+
async def run_scenario(self, error_code, retry, pool_status_checker):
108+
# Set fail point.
109+
await self.set_fail_point(
110+
{"mode": {"times": 1}, "data": {"failCommands": ["insert"], "errorCode": error_code}}
111+
)
112+
self.addAsyncCleanup(self.set_fail_point, {"mode": "off"})
113+
# Insert record and verify failure.
114+
with self.assertRaises(NotPrimaryError) as exc:
115+
await self.coll.insert_one({"test": 1})
116+
self.assertEqual(exc.exception.details["code"], error_code) # type: ignore[call-overload]
117+
# Retry before CMAPListener assertion if retry_before=True.
118+
if retry:
119+
await self.coll.insert_one({"test": 1})
120+
# Verify pool cleared/not cleared.
121+
pool_status_checker()
122+
# Always retry here to ensure discovery of new primary.
123+
await self.coll.insert_one({"test": 1})
124+
125+
@async_client_context.require_version_min(4, 2, -1)
126+
@async_client_context.require_test_commands
127+
async def test_not_primary_keep_connection_pool(self):
128+
await self.run_scenario(10107, True, self.verify_pool_not_cleared)
129+
130+
@async_client_context.require_version_min(4, 0, 0)
131+
@async_client_context.require_version_max(4, 1, 0, -1)
132+
@async_client_context.require_test_commands
133+
async def test_not_primary_reset_connection_pool(self):
134+
await self.run_scenario(10107, False, self.verify_pool_cleared)
135+
136+
@async_client_context.require_version_min(4, 0, 0)
137+
@async_client_context.require_test_commands
138+
async def test_shutdown_in_progress(self):
139+
await self.run_scenario(91, False, self.verify_pool_cleared)
140+
141+
@async_client_context.require_version_min(4, 0, 0)
142+
@async_client_context.require_test_commands
143+
async def test_interrupted_at_shutdown(self):
144+
await self.run_scenario(11600, False, self.verify_pool_cleared)
145+
146+
147+
if __name__ == "__main__":
148+
unittest.main()

test/helpers.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242

4343
from bson.son import SON
4444
from pymongo import common, message
45+
from pymongo.read_preferences import ReadPreference
4546
from pymongo.ssl_support import HAVE_SSL, _ssl # type:ignore[attr-defined]
4647
from pymongo.uri_parser import parse_uri
4748

@@ -150,6 +151,16 @@ def _create_user(authdb, user, pwd=None, roles=None, **kwargs):
150151
return authdb.command(cmd)
151152

152153

154+
def repl_set_step_down(client, **kwargs):
155+
"""Run replSetStepDown, first unfreezing a secondary with replSetFreeze."""
156+
cmd = SON([("replSetStepDown", 1)])
157+
cmd.update(kwargs)
158+
159+
# Unfreeze a secondary to ensure a speedy election.
160+
client.admin.command("replSetFreeze", 0, read_preference=ReadPreference.SECONDARY)
161+
client.admin.command(cmd)
162+
163+
153164
class client_knobs:
154165
def __init__(
155166
self,

0 commit comments

Comments
 (0)