Skip to content

Commit 5415c88

Browse files
committed
CDRIVER-2578 changestream getmore and agg lsid
collection_watch runs an aggregate command with collection_read_command_with_opts and then creates a cursor with cursor_new_from_command_reply. To ensure they use the same implicit session lsid, the change_stream_t must be responsible for creating the implicit session and pass this session to both the command and cursor as if it were an explicit session.
1 parent 4df2aa5 commit 5415c88

File tree

3 files changed

+110
-12
lines changed

3 files changed

+110
-12
lines changed

src/mongoc/mongoc-change-stream-private.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define MONGOC_CHANGE_STREAM_PRIVATE_H
1919

2020
#include "mongoc-change-stream.h"
21+
#include "mongoc-client-session.h"
2122
#include "mongoc-collection.h"
2223
#include "mongoc-cursor.h"
2324

@@ -34,6 +35,8 @@ struct _mongoc_change_stream_t {
3435
mongoc_collection_t *coll;
3536
int64_t max_await_time_ms;
3637
int32_t batch_size;
38+
39+
mongoc_client_session_t *implicit_session;
3740
};
3841

3942
mongoc_change_stream_t *

src/mongoc/mongoc-change-stream.c

Lines changed: 51 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "mongoc-cursor-private.h"
2121
#include "mongoc-collection-private.h"
2222
#include "mongoc-client-session-private.h"
23+
#include "mongoc-rpc-private.h"
2324

2425
#define CHANGE_STREAM_ERR(_str) \
2526
bson_set_error (&stream->err, \
@@ -88,7 +89,7 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
8889
size_t keyLen =
8990
bson_uint32_to_string (key_int, &key_str, buf, sizeof (buf));
9091
bson_append_value (
91-
&pipeline, key_str, keyLen, bson_iter_value (&child_iter));
92+
&pipeline, key_str, (int) keyLen, bson_iter_value (&child_iter));
9293
++key_int;
9394
}
9495
}
@@ -120,6 +121,31 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
120121
stream->coll->client, &iter, &cs, &stream->err)) {
121122
goto cleanup;
122123
}
124+
} else if (stream->implicit_session) {
125+
/* If an implicit session was created before, and this cursor is now
126+
* being recreated after resuming, then use the same session as before. */
127+
cs = stream->implicit_session;
128+
if (!mongoc_client_session_append (cs, &command_opts, &stream->err)) {
129+
goto cleanup;
130+
}
131+
} else {
132+
/* Create an implicit session. This session lsid must be the same for the
133+
* agg command and the subsequent getMores. Thus, this implicit session is
134+
* passed as if it were an explicit session to
135+
* collection_read_command_with_opts and cursor_new_from_reply, but it is
136+
* still implicit and its lifetime is owned by this change_stream_t. */
137+
mongoc_session_opt_t *session_opts;
138+
session_opts = mongoc_session_opts_new ();
139+
mongoc_session_opts_set_causal_consistency (session_opts, false);
140+
/* returns NULL if sessions aren't supported. ignore errors. */
141+
cs =
142+
mongoc_client_start_session (stream->coll->client, session_opts, NULL);
143+
stream->implicit_session = cs;
144+
mongoc_session_opts_destroy (session_opts);
145+
if (cs &&
146+
!mongoc_client_session_append (cs, &command_opts, &stream->err)) {
147+
goto cleanup;
148+
}
123149
}
124150

125151
server_id = mongoc_server_description_id (sd);
@@ -161,7 +187,8 @@ _mongoc_change_stream_make_cursor (mongoc_change_stream_t *stream)
161187
}
162188

163189
if (stream->batch_size > 0) {
164-
mongoc_cursor_set_batch_size (stream->cursor, stream->batch_size);
190+
mongoc_cursor_set_batch_size (stream->cursor,
191+
(uint32_t) stream->batch_size);
165192
}
166193

167194
cleanup:
@@ -177,7 +204,7 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
177204
{
178205
bool full_doc_set = false;
179206
mongoc_change_stream_t *stream =
180-
(mongoc_change_stream_t *) bson_malloc (sizeof (mongoc_change_stream_t));
207+
(mongoc_change_stream_t *) bson_malloc0 (sizeof (mongoc_change_stream_t));
181208

182209
BSON_ASSERT (coll);
183210
BSON_ASSERT (pipeline);
@@ -190,8 +217,6 @@ _mongoc_change_stream_new (const mongoc_collection_t *coll,
190217
bson_init (&stream->opts);
191218
bson_init (&stream->resume_token);
192219
bson_init (&stream->err_doc);
193-
memset (&stream->err, 0, sizeof (bson_error_t));
194-
stream->cursor = NULL;
195220

196221
/*
197222
* The passed options may consist of:
@@ -260,12 +285,13 @@ bool
260285
mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
261286
{
262287
bson_iter_t iter;
288+
bool ret = false;
263289

264290
BSON_ASSERT (stream);
265291
BSON_ASSERT (bson);
266292

267293
if (stream->err.code != 0) {
268-
return false;
294+
goto end;
269295
}
270296

271297
if (!mongoc_cursor_next (stream->cursor, bson)) {
@@ -275,7 +301,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
275301

276302
if (!mongoc_cursor_error_document (stream->cursor, &err, &err_doc)) {
277303
/* No error occurred, just no documents left */
278-
return false;
304+
goto end;
279305
}
280306

281307
/* Change Streams Spec: An error is resumable if it is not a server error,
@@ -311,7 +337,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
311337
!mongoc_cursor_error_document (stream->cursor, &err, &err_doc);
312338
if (resumable) {
313339
/* Empty batch. */
314-
return false;
340+
goto end;
315341
}
316342
}
317343
}
@@ -320,7 +346,7 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
320346
stream->err = err;
321347
bson_destroy (&stream->err_doc);
322348
bson_copy_to (err_doc, &stream->err_doc);
323-
return false;
349+
goto end;
324350
}
325351
}
326352

@@ -332,14 +358,25 @@ mongoc_change_stream_next (mongoc_change_stream_t *stream, const bson_t **bson)
332358
MONGOC_ERROR_CHANGE_STREAM_NO_RESUME_TOKEN,
333359
"Cannot provide resume functionality when the resume "
334360
"token is missing");
335-
return false;
361+
goto end;
336362
}
337363

338364
/* Copy the resume token */
339365
bson_reinit (&stream->resume_token);
340366
BSON_APPEND_VALUE (
341367
&stream->resume_token, "resumeAfter", bson_iter_value (&iter));
342-
return true;
368+
ret = true;
369+
370+
end:
371+
/* Driver Sessions Spec: "When an implicit session is associated with a
372+
* cursor for use with getMore operations, the session MUST be returned to
373+
* the pool immediately following a getMore operation that indicates that the
374+
* cursor has been exhausted." */
375+
if (stream->implicit_session && stream->cursor->rpc.reply.cursor_id == 0) {
376+
mongoc_client_session_destroy (stream->implicit_session);
377+
stream->implicit_session = NULL;
378+
}
379+
return ret;
343380
}
344381

345382
bool
@@ -373,6 +410,9 @@ mongoc_change_stream_destroy (mongoc_change_stream_t *stream)
373410
if (stream->cursor) {
374411
mongoc_cursor_destroy (stream->cursor);
375412
}
413+
if (stream->implicit_session) {
414+
mongoc_client_session_destroy (stream->implicit_session);
415+
}
376416
mongoc_collection_destroy (stream->coll);
377417
bson_free (stream);
378418
}

tests/test-mongoc-client-session.c

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
#include "mongoc.h"
22
#include "mongoc-cursor-private.h"
33
#include "mongoc-util-private.h"
4+
#include "mongoc-change-stream-private.h"
45
#include "mongoc-collection-private.h"
56
#include "utlist.h"
67
#include "TestSuite.h"
@@ -1923,7 +1924,7 @@ test_find_indexes (session_test_t *test)
19231924
const mongoc_server_session_t *_tmp; \
19241925
int _n_sessions; \
19251926
CDL_COUNT ((_topology)->session_pool, _tmp, _n_sessions); \
1926-
ASSERT_CMPINT (_n_sessions, ==, (int) (_expected_size)); \
1927+
ASSERT_CMPINT (_n_sessions, ==, (int) (_expected_size)); \
19271928
} while (0)
19281929

19291930

@@ -1977,6 +1978,53 @@ test_cursor_implicit_session (void *ctx)
19771978
}
19781979

19791980

1981+
static void
1982+
test_change_stream_implicit_session (void *ctx)
1983+
{
1984+
session_test_t *test;
1985+
mongoc_topology_t *topology;
1986+
mongoc_client_session_t *cs;
1987+
bson_error_t error;
1988+
mongoc_change_stream_t *change_stream;
1989+
bson_t pipeline = BSON_INITIALIZER;
1990+
const bson_t *doc;
1991+
bson_t aggregate_lsid;
1992+
1993+
test = session_test_new (CORRECT_CLIENT, NOT_CAUSAL);
1994+
test->expect_explicit_lsid = false;
1995+
topology = test->client->topology;
1996+
cs = mongoc_client_start_session (test->client, NULL, &error);
1997+
ASSERT_OR_PRINT (cs, error);
1998+
change_stream =
1999+
mongoc_collection_watch (test->session_collection, &pipeline, NULL);
2000+
bson_destroy (&pipeline);
2001+
bson_copy_to (&test->sent_lsid, &aggregate_lsid);
2002+
ASSERT_POOL_SIZE (topology, 0);
2003+
BSON_ASSERT (change_stream->implicit_session);
2004+
2005+
/* push a new server session into the pool */
2006+
mongoc_client_session_destroy (cs);
2007+
ASSERT_POOL_SIZE (topology, 1);
2008+
ASSERT_SESSIONS_DIFFER (&aggregate_lsid, &topology->session_pool->lsid);
2009+
2010+
/* "getMore" uses the same lsid as "aggregate" did */
2011+
bson_reinit (&test->sent_lsid);
2012+
mongoc_change_stream_next (change_stream, &doc);
2013+
ASSERT_SESSIONS_MATCH (
2014+
&test->sent_lsid, &change_stream->implicit_session->server_session->lsid);
2015+
ASSERT_SESSIONS_MATCH (
2016+
&test->sent_lsid,
2017+
&change_stream->cursor->client_session->server_session->lsid);
2018+
ASSERT_SESSIONS_MATCH (&test->sent_lsid, &aggregate_lsid);
2019+
ASSERT_OR_PRINT (
2020+
!mongoc_change_stream_error_document (change_stream, &error, NULL),
2021+
error);
2022+
bson_destroy (&aggregate_lsid);
2023+
mongoc_change_stream_destroy (change_stream);
2024+
session_test_destroy (test);
2025+
}
2026+
2027+
19802028
static void
19812029
test_cmd_error (void *ctx)
19822030
{
@@ -2314,6 +2362,13 @@ test_session_install (TestSuite *suite)
23142362
NULL,
23152363
test_framework_skip_if_no_cluster_time,
23162364
test_framework_skip_if_no_crypto);
2365+
TestSuite_AddFull (suite,
2366+
"/Session/change_stream_implicit_session",
2367+
test_change_stream_implicit_session,
2368+
NULL,
2369+
NULL,
2370+
test_framework_skip_if_no_cluster_time,
2371+
test_framework_skip_if_no_crypto);
23172372
TestSuite_AddFull (suite,
23182373
"/Session/cmd_error",
23192374
test_cmd_error,

0 commit comments

Comments
 (0)