Skip to content

Commit 5a18c3c

Browse files
committed
CDRIVER-2619 fix change_stream_next bugs
1 parent 9428ea2 commit 5a18c3c

File tree

2 files changed

+171
-75
lines changed

2 files changed

+171
-75
lines changed

src/mongoc/mongoc-change-stream.c

Lines changed: 54 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
#include "mongoc-cursor-private.h"
2121
#include "mongoc-collection-private.h"
2222
#include "mongoc-client-session-private.h"
23-
#include "mongoc-rpc-private.h"
2423

2524
#define CHANGE_STREAM_ERR(_str) \
2625
bson_set_error (&stream->err, \
@@ -36,33 +35,24 @@
3635
} \
3736
} while (0);
3837

38+
/* Construct the aggregate command in cmd:
39+
* { aggregate: collname, pipeline: [], cursor: { batchSize: x } } */
3940
static void
40-
_mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
41+
_make_command (mongoc_change_stream_t *stream, bson_t *command)
4142
{
42-
mongoc_client_session_t *cs = NULL;
43+
bson_iter_t iter;
4344
bson_t change_stream_stage; /* { $changeStream: <change_stream_doc> } */
4445
bson_t change_stream_doc;
4546
bson_t pipeline;
4647
bson_t cursor_doc;
47-
bson_t command_opts;
48-
bson_t command; /* { aggregate: "coll", pipeline: [], ... } */
49-
bson_t reply;
50-
bson_iter_t iter;
51-
bson_error_t err = {0};
52-
mongoc_server_description_t *sd;
53-
uint32_t server_id;
54-
55-
BSON_ASSERT (stream);
5648

57-
/* Construct the aggregate command */
58-
/* { aggregate: collname, pipeline: [], cursor: { batchSize: x } } */
59-
bson_init (&command);
60-
bson_append_utf8 (&command,
49+
bson_init (command);
50+
bson_append_utf8 (command,
6151
"aggregate",
6252
9,
6353
stream->coll->collection,
6454
stream->coll->collectionlen);
65-
bson_append_array_begin (&command, "pipeline", 8, &pipeline);
55+
bson_append_array_begin (command, "pipeline", 8, &pipeline);
6656

6757
/* Append the $changeStream stage */
6858
bson_append_document_begin (&pipeline, "0", 1, &change_stream_stage);
@@ -85,36 +75,54 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
8575

8676
bson_iter_recurse (&iter, &child_iter);
8777
while (bson_iter_next (&child_iter)) {
88-
if (BSON_ITER_HOLDS_DOCUMENT (&child_iter)) {
89-
size_t keyLen =
90-
bson_uint32_to_string (key_int, &key_str, buf, sizeof (buf));
91-
bson_append_value (
92-
&pipeline, key_str, (int) keyLen, bson_iter_value (&child_iter));
93-
++key_int;
94-
}
78+
/* The user pipeline may consist of invalid stages or non-documents.
79+
* Append anyway, and rely on the server error. */
80+
size_t keyLen =
81+
bson_uint32_to_string (key_int, &key_str, buf, sizeof (buf));
82+
bson_append_value (
83+
&pipeline, key_str, (int) keyLen, bson_iter_value (&child_iter));
84+
++key_int;
9585
}
9686
}
9787

98-
bson_append_array_end (&command, &pipeline);
88+
bson_append_array_end (command, &pipeline);
9989

10090
/* Add batch size if needed */
101-
bson_append_document_begin (&command, "cursor", 6, &cursor_doc);
91+
bson_append_document_begin (command, "cursor", 6, &cursor_doc);
10292
if (stream->batch_size > 0) {
10393
bson_append_int32 (&cursor_doc, "batchSize", 9, stream->batch_size);
10494
}
105-
bson_append_document_end (&command, &cursor_doc);
95+
bson_append_document_end (command, &cursor_doc);
96+
}
10697

107-
bson_copy_to (&stream->opts, &command_opts);
98+
/* Construct and send the aggregate command and create the resulting cursor.
99+
* Returns false on error, and sets stream->err. On error, stream->cursor
100+
* remains NULL, otherwise it is created and must be destroyed. */
101+
static bool
102+
_make_cursor (mongoc_change_stream_t *stream)
103+
{
104+
mongoc_client_session_t *cs = NULL;
105+
bson_t command_opts;
106+
bson_t command; /* { aggregate: "coll", pipeline: [], ... } */
107+
bson_t reply;
108+
bson_iter_t iter;
109+
mongoc_server_description_t *sd;
110+
uint32_t server_id;
108111

112+
BSON_ASSERT (stream);
113+
BSON_ASSERT (!stream->cursor);
114+
_make_command (stream, &command);
115+
bson_copy_to (&stream->opts, &command_opts);
109116
sd = mongoc_client_select_server (stream->coll->client,
110117
false /* for_writes */,
111118
stream->coll->read_prefs,
112-
&err);
113-
119+
&stream->err);
114120
if (!sd) {
115-
stream->err = err;
116121
goto cleanup;
117122
}
123+
server_id = mongoc_server_description_id (sd);
124+
bson_append_int32 (&command_opts, "serverId", 8, server_id);
125+
mongoc_server_description_destroy (sd);
118126

119127
if (bson_iter_init_find (&iter, &command_opts, "sessionId")) {
120128
if (!_mongoc_client_session_from_iter (
@@ -148,16 +156,12 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
148156
}
149157
}
150158

151-
server_id = mongoc_server_description_id (sd);
152-
bson_append_int32 (&command_opts, "serverId", 8, server_id);
153-
154159
/* use inherited read preference and read concern of the collection */
155160
if (!mongoc_collection_read_command_with_opts (
156-
stream->coll, &command, NULL, &command_opts, &reply, &err)) {
161+
stream->coll, &command, NULL, &command_opts, &reply, &stream->err)) {
157162
bson_destroy (&stream->err_doc);
158163
bson_copy_to (&reply, &stream->err_doc);
159164
bson_destroy (&reply);
160-
stream->err = err;
161165
goto cleanup;
162166
}
163167

@@ -194,7 +198,7 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
194198
cleanup:
195199
bson_destroy (&command);
196200
bson_destroy (&command_opts);
197-
mongoc_server_description_destroy (sd);
201+
return stream->err.code == 0;
198202
}
199203

200204
mongoc_change_stream_t *
@@ -275,7 +279,7 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
275279
}
276280

277281
if (stream->err.code == 0) {
278-
_mongoc_change_stream_make_cursor (stream);
282+
(void) _make_cursor (stream);
279283
}
280284

281285
return stream;
@@ -294,6 +298,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
294298
goto end;
295299
}
296300

301+
BSON_ASSERT (stream->cursor);
297302
if (!mongoc_cursor_next (stream->cursor, bson)) {
298303
const bson_t *err_doc;
299304
bson_error_t err;
@@ -330,8 +335,12 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
330335
}
331336

332337
if (resumable) {
338+
/* recreate the cursor */
333339
mongoc_cursor_destroy (stream->cursor);
334-
_mongoc_change_stream_make_cursor (stream);
340+
stream->cursor = NULL;
341+
if (!_make_cursor (stream)) {
342+
goto end;
343+
}
335344
if (!mongoc_cursor_next (stream->cursor, bson)) {
336345
resumable =
337346
!mongoc_cursor_error_document (stream->cursor, &err, &err_doc);
@@ -372,9 +381,12 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
372381
* cursor for use with getMore operations, the session MUST be returned to
373382
* the pool immediately following a getMore operation that indicates that the
374383
* cursor has been exhausted." */
375-
if (stream->implicit_session && stream->cursor->rpc.reply.cursor_id == 0) {
376-
mongoc_client_session_destroy (stream->implicit_session);
377-
stream->implicit_session = NULL;
384+
if (stream->implicit_session) {
385+
/* If creating the change stream cursor errored, it may be null. */
386+
if (!stream->cursor || stream->cursor->rpc.reply.cursor_id == 0) {
387+
mongoc_client_session_destroy (stream->implicit_session);
388+
stream->implicit_session = NULL;
389+
}
378390
}
379391
return ret;
380392
}

0 commit comments

Comments
 (0)