Skip to content

Commit 4a511a3

Browse files
committed
CDRIVER-2670 add db and client watch
1 parent 72687f6 commit 4a511a3

15 files changed

+482
-71
lines changed

src/libmongoc/doc/mongoc_change_stream_t.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ Example
3232
:titlesonly:
3333
:maxdepth: 1
3434

35+
mongoc_client_watch
36+
mongoc_database_watch
3537
mongoc_collection_watch
3638
mongoc_change_stream_next
3739
mongoc_change_stream_error_document

src/libmongoc/doc/mongoc_client_t.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,5 +83,6 @@ Example
8383
mongoc_client_set_stream_initiator
8484
mongoc_client_set_write_concern
8585
mongoc_client_start_session
86+
mongoc_client_watch
8687
mongoc_client_write_command_with_opts
8788

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
:man_page: mongoc_client_watch
2+
3+
mongoc_client_watch()
4+
=====================
5+
6+
Synopsis
7+
--------
8+
9+
.. code-block:: c
10+
11+
mongoc_change_stream_t*
12+
mongoc_client_watch (mongoc_client_t *client,
13+
const bson_t *pipeline,
14+
const bson_t *opts);
15+
16+
A helper function to create a change stream. It is preferred to call this
17+
function over using a raw aggregation to create a change stream.
18+
19+
This function uses the read preference and read concern of the client. If
20+
the change stream needs to re-establish connection, the same read preference
21+
will be used. This may happen if the change stream encounters a resumable error.
22+
23+
.. warning::
24+
25+
A change stream is only supported with majority read concern.
26+
27+
Parameters
28+
----------
29+
30+
* ``db``: A :symbol:`mongoc_client_t` specifying the client which the change stream listens to.
31+
* ``pipeline``: A :symbol:`bson:bson_t` representing an aggregation pipeline appended to the change stream. This may be an empty document.
32+
* ``opts``: A :symbol:`bson:bson_t` containing change stream options or ``NULL``.
33+
34+
``opts`` may be ``NULL`` or a document consisting of any subset of the following
35+
parameters:
36+
37+
* ``batchSize`` An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`
38+
* ``resumeAfter`` A ``Document`` representing the logical starting point of the change stream. The ``_id`` field of any change received from a change stream can be used here.
39+
* ``startAtOperationTime`` A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here.
40+
* ``maxAwaitTimeMS`` An ``int64`` representing the maximum amount of time a call to :symbol:`mongoc_change_stream_next` will block waiting for data
41+
* ``collation`` A `Collation Document <https://docs.mongodb.com/manual/reference/collation/>`_
42+
43+
Returns
44+
-------
45+
A newly allocated :symbol:`mongoc_change_stream_t` which must be freed with
46+
:symbol:`mongoc_change_stream_destroy` when no longer in use. The returned
47+
:symbol:`mongoc_change_stream_t` is never ``NULL``. If there is an error, it can
48+
be retrieved with :symbol:`mongoc_change_stream_error_document`, and subsequent
49+
calls to :symbol:`mongoc_change_stream_next` will return ``false``.
50+
51+
See Also
52+
--------
53+
:doc:`mongoc_database_watch`
54+
55+
:doc:`mongoc_collection_watch`

src/libmongoc/doc/mongoc_collection_watch.rst

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ Parameters
3535
parameters:
3636

3737
* ``batchSize`` An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`
38-
* ``resumeAfter`` A ``Document`` representing the starting point of the change stream
38+
* ``resumeAfter`` A ``Document`` representing the logical starting point of the change stream. The ``_id`` field of any change received from a change stream can be used here.
39+
* ``startAtOperationTime`` A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here.
3940
* ``maxAwaitTimeMS`` An ``int64`` representing the maximum amount of time a call to :symbol:`mongoc_change_stream_next` will block waiting for data
4041
* ``collation`` A `Collation Document <https://docs.mongodb.com/manual/reference/collation/>`_
4142

@@ -45,4 +46,10 @@ A newly allocated :symbol:`mongoc_change_stream_t` which must be freed with
4546
:symbol:`mongoc_change_stream_destroy` when no longer in use. The returned
4647
:symbol:`mongoc_change_stream_t` is never ``NULL``. If there is an error, it can
4748
be retrieved with :symbol:`mongoc_change_stream_error_document`, and subsequent
48-
calls to :symbol:`mongoc_change_stream_next` will return ``false``.
49+
calls to :symbol:`mongoc_change_stream_next` will return ``false``.
50+
51+
See Also
52+
--------
53+
:doc:`mongoc_client_watch`
54+
55+
:doc:`mongoc_database_watch`

src/libmongoc/doc/mongoc_database_t.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ Read preferences and write concerns are inherited from the parent client. They c
5151
mongoc_database_set_read_concern
5252
mongoc_database_set_read_prefs
5353
mongoc_database_set_write_concern
54+
mongoc_database_watch
5455
mongoc_database_write_command_with_opts
5556

5657
Examples
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
:man_page: mongoc_database_watch
2+
3+
mongoc_database_watch()
4+
=======================
5+
6+
Synopsis
7+
--------
8+
9+
.. code-block:: c
10+
11+
mongoc_change_stream_t*
12+
mongoc_database_watch (const mongoc_database_t *db,
13+
const bson_t *pipeline,
14+
const bson_t *opts);
15+
16+
A helper function to create a change stream. It is preferred to call this
17+
function over using a raw aggregation to create a change stream.
18+
19+
This function uses the read preference and read concern of the database. If
20+
the change stream needs to re-establish connection, the same read preference
21+
will be used. This may happen if the change stream encounters a resumable error.
22+
23+
.. warning::
24+
25+
A change stream is only supported with majority read concern.
26+
27+
Parameters
28+
----------
29+
30+
* ``db``: A :symbol:`mongoc_database_t` specifying the database which the change stream listens to.
31+
* ``pipeline``: A :symbol:`bson:bson_t` representing an aggregation pipeline appended to the change stream. This may be an empty document.
32+
* ``opts``: A :symbol:`bson:bson_t` containing change stream options or ``NULL``.
33+
34+
``opts`` may be ``NULL`` or a document consisting of any subset of the following
35+
parameters:
36+
37+
* ``batchSize`` An ``int32`` representing number of documents requested to be returned on each call to :symbol:`mongoc_change_stream_next`
38+
* ``resumeAfter`` A ``Document`` representing the logical starting point of the change stream. The ``_id`` field of any change received from a change stream can be used here.
39+
* ``startAtOperationTime`` A ``Timestamp``. The change stream only provides changes that occurred at or after the specified timestamp. Any command run against the server will return an operation time that can be used here.
40+
* ``maxAwaitTimeMS`` An ``int64`` representing the maximum amount of time a call to :symbol:`mongoc_change_stream_next` will block waiting for data
41+
* ``collation`` A `Collation Document <https://docs.mongodb.com/manual/reference/collation/>`_
42+
43+
Returns
44+
-------
45+
A newly allocated :symbol:`mongoc_change_stream_t` which must be freed with
46+
:symbol:`mongoc_change_stream_destroy` when no longer in use. The returned
47+
:symbol:`mongoc_change_stream_t` is never ``NULL``. If there is an error, it can
48+
be retrieved with :symbol:`mongoc_change_stream_error_document`, and subsequent
49+
calls to :symbol:`mongoc_change_stream_next` will return ``false``.
50+
51+
See Also
52+
--------
53+
:doc:`mongoc_client_watch`
54+
55+
:doc:`mongoc_collection_watch`

src/libmongoc/src/mongoc/mongoc-change-stream-private.h

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,26 +22,51 @@
2222
#include "mongoc-collection.h"
2323
#include "mongoc-cursor.h"
2424

25+
typedef enum {
26+
MONGOC_CHANGE_STREAM_COLLECTION,
27+
MONGOC_CHANGE_STREAM_DATABASE,
28+
MONGOC_CHANGE_STREAM_CLIENT
29+
} mongoc_change_stream_type_t;
30+
2531
struct _mongoc_change_stream_t {
2632
bson_t pipeline_to_append;
2733
bson_t full_document;
2834
bson_t opts;
29-
bson_t resume_token; /* empty, or has resumeAfter: doc */
35+
bson_t resume_token; /* empty, or has resumeAfter: doc */
36+
bson_t operation_time; /* empty, or has startAtOperationTime: ts */
3037

3138
bson_error_t err;
3239
bson_t err_doc;
3340

3441
mongoc_cursor_t *cursor;
35-
mongoc_collection_t *coll;
42+
43+
mongoc_client_t *client;
44+
mongoc_read_prefs_t *read_prefs;
45+
mongoc_read_concern_t *read_concern;
46+
47+
mongoc_change_stream_type_t change_stream_type;
48+
char db[140];
49+
char coll[140];
50+
3651
int64_t max_await_time_ms;
3752
int32_t batch_size;
3853

3954
mongoc_client_session_t *implicit_session;
4055
};
4156

4257
mongoc_change_stream_t *
43-
_mongoc_change_stream_new (const mongoc_collection_t *coll,
44-
const bson_t *pipeline,
45-
const bson_t *opts);
58+
_mongoc_change_stream_new_from_collection (const mongoc_collection_t *coll,
59+
const bson_t *pipeline,
60+
const bson_t *opts);
61+
62+
mongoc_change_stream_t *
63+
_mongoc_change_stream_new_from_database (const mongoc_database_t *db,
64+
const bson_t *pipeline,
65+
const bson_t *opts);
66+
67+
mongoc_change_stream_t *
68+
_mongoc_change_stream_new_from_client (mongoc_client_t *client,
69+
const bson_t *pipeline,
70+
const bson_t *opts);
4671

4772
#endif /* MONGOC_CHANGE_STREAM_PRIVATE_H */

0 commit comments

Comments
 (0)