Skip to content

Commit bc3a894

Browse files
committed
CDRIVER-5707 ignore unknown servers in aggregate-with-write check (#1723)
* add regression test * only check available servers * update comments * also test sharded cluster Use more block scopes to organize test. Insert a document to force creation of database "db". * apply fix for sharded * fix return Addresses Coverity issue CID 121398
1 parent a51a4d8 commit bc3a894

File tree

2 files changed

+90
-5
lines changed

2 files changed

+90
-5
lines changed

src/libmongoc/src/mongoc/mongoc-topology-description.c

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -714,7 +714,7 @@ _check_any_server_less_than_wire_version_13 (const void *sd_, void *any_too_old_
714714
{
715715
const mongoc_server_description_t *sd = sd_;
716716
bool *any_too_old = any_too_old_;
717-
if (sd->max_wire_version < WIRE_VERSION_5_0) {
717+
if (sd->type != MONGOC_SERVER_UNKNOWN && sd->max_wire_version < WIRE_VERSION_5_0) {
718718
*any_too_old = true;
719719
return false /* Stop searching */;
720720
}
@@ -726,7 +726,7 @@ _check_any_server_less_than_wire_version_13 (const void *sd_, void *any_too_old_
726726
* @brief Calculate the read mode that we should be using, based on what was
727727
* requested and what is available in the topology.
728728
*
729-
* Per the CRUD spec, if the requested read mode is *not* primary, and *any*
729+
* Per the CRUD spec, if the requested read mode is *not* primary, and *any* available
730730
* server in the topology has a wire version < server v5.0, we must override the
731731
* read mode preference with "primary." Server v5.0 indicates support on a
732732
* secondary server for using aggregate pipelines that contain writing stages
@@ -741,7 +741,7 @@ _must_use_primary (const mongoc_topology_description_t *td,
741741
/* We never alter from a primary read mode. This early-return is just an
742742
* optimization to skip scanning for old servers, as we would end up
743743
* returning MONGOC_READ_PRIMARY regardless. */
744-
return requested_read_mode;
744+
return true;
745745
}
746746
switch (optype) {
747747
case MONGOC_SS_WRITE:
@@ -751,7 +751,7 @@ _must_use_primary (const mongoc_topology_description_t *td,
751751
/* Maintain the requested read mode if it is a regular read operation */
752752
return false;
753753
case MONGOC_SS_AGGREGATE_WITH_WRITE: {
754-
/* Check if any of the servers are too old to support the
754+
/* Check if any of the available servers are too old to support the
755755
* aggregate-with-write on a secondary server */
756756
bool any_too_old = false;
757757
mongoc_set_for_each_const (mc_tpld_servers_const (td), _check_any_server_less_than_wire_version_13, &any_too_old);
@@ -1015,7 +1015,8 @@ mongoc_topology_description_select (const mongoc_topology_description_t *topolog
10151015
if (topology->type == MONGOC_TOPOLOGY_SINGLE) {
10161016
mongoc_server_description_t const *const sd = mongoc_set_get_item_const (mc_tpld_servers_const (topology), 0);
10171017

1018-
if (optype == MONGOC_SS_AGGREGATE_WITH_WRITE && sd->max_wire_version < WIRE_VERSION_5_0) {
1018+
if (optype == MONGOC_SS_AGGREGATE_WITH_WRITE && sd->type != MONGOC_SERVER_UNKNOWN &&
1019+
sd->max_wire_version < WIRE_VERSION_5_0) {
10191020
/* The single server may be part of an unseen replica set that may not
10201021
* support aggr-with-write operations on secondaries. Force the read
10211022
* preference to use a primary. */

src/libmongoc/tests/test-mongoc-aggregate.c

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,92 @@ test_query_flags (void)
9292
}
9393
}
9494

95+
typedef struct {
96+
bson_t *cmd;
97+
bson_mutex_t lock;
98+
} last_captured_t;
99+
100+
static void
101+
command_started (const mongoc_apm_command_started_t *event)
102+
{
103+
const bson_t *cmd = mongoc_apm_command_started_get_command (event);
104+
last_captured_t *lc = mongoc_apm_command_started_get_context (event);
105+
bson_mutex_lock (&lc->lock);
106+
bson_destroy (lc->cmd);
107+
lc->cmd = bson_copy (cmd);
108+
bson_mutex_unlock (&lc->lock);
109+
}
110+
111+
// `test_write_respects_read_prefs` tests that an aggregate with a write stage respects the original read preferences
112+
// when talking to >= 5.0 servers. This is a regression test for CDRIVER-5707.
113+
static void
114+
test_write_respects_read_prefs (void *unused)
115+
{
116+
BSON_UNUSED (unused);
117+
118+
bson_error_t error;
119+
last_captured_t lc = {0};
120+
bson_mutex_init (&lc.lock);
121+
122+
mongoc_client_pool_t *pool = test_framework_new_default_client_pool ();
123+
// Capture the most recent command-started event.
124+
{
125+
mongoc_apm_callbacks_t *cbs = mongoc_apm_callbacks_new ();
126+
mongoc_apm_set_command_started_cb (cbs, command_started);
127+
mongoc_client_pool_set_apm_callbacks (pool, cbs, &lc);
128+
mongoc_apm_callbacks_destroy (cbs);
129+
}
130+
131+
// Create database 'db' on separate client to avoid "database 'db' not found" error.
132+
{
133+
mongoc_client_t *client = test_framework_new_default_client ();
134+
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
135+
ASSERT_OR_PRINT (mongoc_collection_insert_one (coll, tmp_bson (BSON_STR ({"x" : 1})), NULL, NULL, &error), error);
136+
mongoc_collection_destroy (coll);
137+
mongoc_client_destroy (client);
138+
}
139+
140+
// Do an 'aggregate' with '$out'.
141+
{
142+
bson_t *pipeline = tmp_bson (BSON_STR ({"pipeline" : [ {"$out" : "foo"} ]}));
143+
mongoc_client_t *client = mongoc_client_pool_pop (pool);
144+
mongoc_collection_t *coll = mongoc_client_get_collection (client, "db", "coll");
145+
mongoc_read_prefs_t *rp = mongoc_read_prefs_new (MONGOC_READ_SECONDARY_PREFERRED);
146+
mongoc_cursor_t *cursor = mongoc_collection_aggregate (coll, MONGOC_QUERY_NONE, pipeline, NULL /* opts */, rp);
147+
// Iterate cursor to send `aggregate` command.
148+
const bson_t *ignored;
149+
ASSERT (!mongoc_cursor_next (cursor, &ignored));
150+
ASSERT_OR_PRINT (!mongoc_cursor_error (cursor, &error), error);
151+
mongoc_read_prefs_destroy (rp);
152+
mongoc_cursor_destroy (cursor);
153+
mongoc_collection_destroy (coll);
154+
mongoc_client_pool_push (pool, client);
155+
}
156+
157+
// Check that `aggregate` command contains $readPreference.
158+
{
159+
bson_t *got;
160+
bson_mutex_lock (&lc.lock);
161+
got = bson_copy (lc.cmd);
162+
bson_mutex_unlock (&lc.lock);
163+
ASSERT_MATCH (got, BSON_STR ({"$readPreference" : {"mode" : "secondaryPreferred"}}));
164+
bson_destroy (got);
165+
}
166+
167+
mongoc_client_pool_destroy (pool);
168+
bson_destroy (lc.cmd);
169+
bson_mutex_destroy (&lc.lock);
170+
}
171+
95172
void
96173
test_aggregate_install (TestSuite *suite)
97174
{
98175
TestSuite_AddMockServerTest (suite, "/Aggregate/query_flags", test_query_flags);
176+
TestSuite_AddFull (suite,
177+
"/Aggregate/write_respects_read_prefs",
178+
test_write_respects_read_prefs,
179+
NULL,
180+
NULL,
181+
test_framework_skip_if_single /* $readPreference is not sent for single servers */,
182+
test_framework_skip_if_max_wire_version_less_than_13 /* require server 5.0+ */);
99183
}

0 commit comments

Comments
 (0)