Skip to content

Commit 3927ae8

Browse files
committed
CDRIVER-3553 use whitelist/labels for resume
1 parent 67a37a0 commit 3927ae8

File tree

8 files changed

+3766
-119
lines changed

8 files changed

+3766
-119
lines changed

src/libmongoc/src/mongoc/mongoc-change-stream-private.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ struct _mongoc_change_stream_t {
6262
bool resumed;
6363

6464
mongoc_client_session_t *implicit_session;
65+
66+
/* The max_wire_version of the server the change stream is tied to. */
67+
uint32_t max_wire_version;
6568
};
6669

6770
mongoc_change_stream_t *

src/libmongoc/src/mongoc/mongoc-change-stream.c

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
/* the caller knows either a client or server error has occurred.
3333
* `reply` contains the server reply or an empty document. */
3434
static bool
35-
_is_resumable_error (const bson_t *reply)
35+
_is_resumable_error (mongoc_change_stream_t *stream, const bson_t *reply)
3636
{
3737
bson_error_t error = {0};
3838

@@ -46,28 +46,32 @@ _is_resumable_error (const bson_t *reply)
4646
return true;
4747
}
4848

49-
if (mongoc_error_has_label (reply, "NonResumableChangeStreamError")) {
50-
return false;
51-
}
52-
53-
/* Change Streams Spec resumable criteria: "a server error response with an
54-
* error message containing the substring 'not master' or 'node is
55-
* recovering' */
56-
if (strstr (error.message, "not master") ||
57-
strstr (error.message, "node is recovering")) {
58-
return true;
49+
if (stream->max_wire_version >= WIRE_VERSION_4_4) {
50+
return mongoc_error_has_label (reply, "ResumableChangeStreamError");
5951
}
6052

61-
/* Change Streams Spec resumable criteria: "any server error response from a
62-
* getMore command excluding those containing the following error codes" */
6353
switch (error.code) {
64-
case 11601: /* Interrupted */
65-
case 136: /* CappedPositionLost */
66-
case 237: /* CursorKilled */
67-
case MONGOC_ERROR_QUERY_FAILURE: /* error code omitted */
68-
return false;
69-
default:
54+
case 6: /* HostUnreachable */
55+
case 7: /* HostNotFound */
56+
case 89: /* NetworkTimeout */
57+
case 91: /* ShutdownInProgress */
58+
case 189: /* PrimarySteppedDown */
59+
case 262: /* ExceededTimeLimit */
60+
case 9001: /* SocketException */
61+
case 10107: /* NotMaster */
62+
case 11600: /* InterruptedAtShutdown */
63+
case 11602: /* InterruptedDueToReplStateChange */
64+
case 13435: /* NotMasterNoSlaveOk */
65+
case 13436: /* NotMasterOrSecondary */
66+
case 63: /* StaleShardVersion */
67+
case 150: /* StaleEpoch */
68+
case 13388: /* StaleConfig */
69+
case 234: /* RetryChangeStream */
70+
case 133: /* FailedToSatisfyReadPreference */
71+
case 216: /* ElectionInProgress */
7072
return true;
73+
default:
74+
return false;
7175
}
7276
}
7377

@@ -93,9 +97,7 @@ _set_resume_token (mongoc_change_stream_t *stream, const bson_t *resume_token)
9397
* cursor: { batchSize: x } }
9498
*/
9599
static void
96-
_make_command (mongoc_change_stream_t *stream,
97-
bson_t *command,
98-
int32_t max_wire_version)
100+
_make_command (mongoc_change_stream_t *stream, bson_t *command)
99101
{
100102
bson_iter_t iter;
101103
bson_t change_stream_stage; /* { $changeStream: <change_stream_doc> } */
@@ -135,7 +137,7 @@ _make_command (mongoc_change_stream_t *stream,
135137
&change_stream_doc, "resumeAfter", &stream->resume_token);
136138
}
137139
} else if (!_mongoc_timestamp_empty (&stream->operation_time) &&
138-
max_wire_version >= 7) {
140+
stream->max_wire_version >= WIRE_VERSION_4_0) {
139141
/* Else if there is no cached resumeToken and the ChangeStream
140142
has a saved operation time and the max wire version is >= 7,
141143
the driver MUST set startAtOperationTime */
@@ -231,7 +233,6 @@ _make_cursor (mongoc_change_stream_t *stream)
231233
bson_iter_t iter;
232234
mongoc_server_description_t *sd;
233235
uint32_t server_id;
234-
int32_t max_wire_version = -1;
235236

236237
BSON_ASSERT (stream);
237238
BSON_ASSERT (!stream->cursor);
@@ -245,7 +246,7 @@ _make_cursor (mongoc_change_stream_t *stream)
245246
server_id = mongoc_server_description_id (sd);
246247
bson_append_int32 (&command_opts, "serverId", 8, server_id);
247248
bson_append_int32 (&getmore_opts, "serverId", 8, server_id);
248-
max_wire_version = sd->max_wire_version;
249+
stream->max_wire_version = sd->max_wire_version;
249250
mongoc_server_description_destroy (sd);
250251

251252
if (bson_iter_init_find (&iter, &command_opts, "sessionId")) {
@@ -288,7 +289,7 @@ _make_cursor (mongoc_change_stream_t *stream)
288289
mongoc_read_concern_append (stream->read_concern, &command_opts);
289290
}
290291

291-
_make_command (stream, &command, max_wire_version);
292+
_make_command (stream, &command);
292293

293294
/* even though serverId has already been set, still pass the read prefs.
294295
* they are necessary for OP_MSG if sending to a secondary. */
@@ -350,7 +351,8 @@ _make_cursor (mongoc_change_stream_t *stream)
350351
if (bson_empty (&stream->opts.resumeAfter) &&
351352
bson_empty (&stream->opts.startAfter) &&
352353
_mongoc_timestamp_empty (&stream->operation_time) &&
353-
max_wire_version >= 7 && bson_empty (&stream->resume_token) &&
354+
stream->max_wire_version >= WIRE_VERSION_4_0 &&
355+
bson_empty (&stream->resume_token) &&
354356
bson_iter_init_find (
355357
&iter,
356358
_mongoc_cursor_change_stream_get_reply (stream->cursor),
@@ -529,7 +531,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
529531
goto end;
530532
}
531533

532-
resumable = _is_resumable_error (err_doc);
534+
resumable = _is_resumable_error (stream, err_doc);
533535
while (resumable) {
534536
/* recreate the cursor. */
535537
mongoc_cursor_destroy (stream->cursor);
@@ -545,7 +547,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
545547
goto end;
546548
}
547549
if (err_doc) {
548-
resumable = _is_resumable_error (err_doc);
550+
resumable = _is_resumable_error (stream, err_doc);
549551
} else {
550552
resumable = false;
551553
}

src/libmongoc/tests/json-test.c

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -869,6 +869,28 @@ check_topology_type (const bson_t *test)
869869
return can_proceed;
870870
}
871871

872+
static void
873+
_recreate (mongoc_client_t *client,
874+
const char *db_name,
875+
const char *collection_name)
876+
{
877+
mongoc_collection_t *collection;
878+
mongoc_database_t *db;
879+
880+
if (!db_name || !collection_name) {
881+
return;
882+
}
883+
collection = mongoc_client_get_collection (client, db_name, collection_name);
884+
mongoc_collection_drop (collection, NULL);
885+
mongoc_collection_destroy (collection);
886+
887+
db = mongoc_client_get_database (client, db_name);
888+
collection = mongoc_database_create_collection (
889+
db, collection_name, NULL /* options */, NULL);
890+
mongoc_collection_destroy (collection);
891+
mongoc_database_destroy (db);
892+
}
893+
872894

873895
static void
874896
_insert_data (mongoc_collection_t *collection, bson_t *documents)
@@ -1174,6 +1196,15 @@ deactivate_fail_points (mongoc_client_t *client, uint32_t server_id)
11741196
}
11751197
}
11761198

1199+
if (sd->max_wire_version >= WIRE_VERSION_4_4) {
1200+
/* failGetMoreAfterCursorCheckout added in 4.4 */
1201+
command = tmp_bson ("{'configureFailPoint': "
1202+
"'failGetMoreAfterCursorCheckout', 'mode': 'off'}");
1203+
r = mongoc_client_command_simple_with_server_id (
1204+
client, "admin", command, NULL, server_id, NULL, &error);
1205+
ASSERT_OR_PRINT (r, error);
1206+
}
1207+
11771208
mongoc_server_description_destroy (sd);
11781209
}
11791210

@@ -1435,8 +1466,8 @@ run_json_general_test (const json_test_config_t *config)
14351466
const bson_t *scenario = config->scenario;
14361467
bson_iter_t scenario_iter;
14371468
bson_iter_t tests_iter;
1438-
const char *db_name;
1439-
const char *collection_name;
1469+
const char *db_name, *db2_name;
1470+
const char *collection_name, *collection2_name;
14401471

14411472
ASSERT (scenario);
14421473

@@ -1451,6 +1482,15 @@ run_json_general_test (const json_test_config_t *config)
14511482
? bson_lookup_utf8 (scenario, "collection_name")
14521483
: "test";
14531484

1485+
/* database2_name/collection2_name are optional. */
1486+
db2_name = bson_has_field (scenario, "database2_name")
1487+
? bson_lookup_utf8 (scenario, "database2_name")
1488+
: NULL;
1489+
collection2_name = bson_has_field (scenario, "collection2_name")
1490+
? bson_lookup_utf8 (scenario, "collection2_name")
1491+
: NULL;
1492+
1493+
14541494
ASSERT (bson_iter_init_find (&scenario_iter, scenario, "tests"));
14551495
ASSERT (BSON_ITER_HOLDS_ARRAY (&scenario_iter));
14561496
ASSERT (bson_iter_recurse (&scenario_iter, &tests_iter));
@@ -1557,6 +1597,9 @@ run_json_general_test (const json_test_config_t *config)
15571597
}
15581598

15591599
set_auto_encryption_opts (client, &test);
1600+
/* Drop and recreate test database/collection if necessary. */
1601+
_recreate (client, db_name, collection_name);
1602+
_recreate (client, db2_name, collection2_name);
15601603
insert_data (db_name, collection_name, scenario);
15611604

15621605
db = mongoc_client_get_database (client, db_name);

src/libmongoc/tests/json/change_streams/change-streams-errors.json

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,90 @@
7171
"code": 40324
7272
}
7373
}
74+
},
75+
{
76+
"description": "Change Stream should error when _id is projected out",
77+
"minServerVersion": "4.1.11",
78+
"target": "collection",
79+
"topology": [
80+
"replicaset",
81+
"sharded"
82+
],
83+
"changeStreamPipeline": [
84+
{
85+
"$project": {
86+
"_id": 0
87+
}
88+
}
89+
],
90+
"changeStreamOptions": {},
91+
"operations": [
92+
{
93+
"database": "change-stream-tests",
94+
"collection": "test",
95+
"name": "insertOne",
96+
"arguments": {
97+
"document": {
98+
"z": 3
99+
}
100+
}
101+
}
102+
],
103+
"result": {
104+
"error": {
105+
"code": 280,
106+
"errorLabels": [
107+
"NonResumableChangeStreamError"
108+
]
109+
}
110+
}
111+
},
112+
{
113+
"description": "change stream errors on MaxTimeMSExpired",
114+
"minServerVersion": "4.2",
115+
"failPoint": {
116+
"configureFailPoint": "failCommand",
117+
"mode": {
118+
"times": 1
119+
},
120+
"data": {
121+
"failCommands": [
122+
"getMore"
123+
],
124+
"errorCode": 50,
125+
"closeConnection": false
126+
}
127+
},
128+
"target": "collection",
129+
"topology": [
130+
"replicaset",
131+
"sharded"
132+
],
133+
"changeStreamPipeline": [
134+
{
135+
"$project": {
136+
"_id": 0
137+
}
138+
}
139+
],
140+
"changeStreamOptions": {},
141+
"operations": [
142+
{
143+
"database": "change-stream-tests",
144+
"collection": "test",
145+
"name": "insertOne",
146+
"arguments": {
147+
"document": {
148+
"z": 3
149+
}
150+
}
151+
}
152+
],
153+
"result": {
154+
"error": {
155+
"code": 50
156+
}
157+
}
74158
}
75159
]
76160
}

0 commit comments

Comments
 (0)