Skip to content

Commit 43c2062

Browse files
authored
PYTHON-3093 Change streams support for user-facing PIT pre- and post-images (#972)
1 parent 98d3933 commit 43c2062

File tree

9 files changed

+1076
-54
lines changed

9 files changed

+1076
-54
lines changed

pymongo/change_stream.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(
9696
session: Optional["ClientSession"],
9797
start_after: Optional[Mapping[str, Any]],
9898
comment: Optional[Any] = None,
99+
full_document_before_change: Optional[str] = None,
99100
) -> None:
100101
if pipeline is None:
101102
pipeline = []
@@ -118,6 +119,7 @@ def __init__(
118119

119120
self._pipeline = copy.deepcopy(pipeline)
120121
self._full_document = full_document
122+
self._full_document_before_change = full_document_before_change
121123
self._uses_start_after = start_after is not None
122124
self._uses_resume_after = resume_after is not None
123125
self._resume_token = copy.deepcopy(start_after or resume_after)
@@ -147,6 +149,9 @@ def _change_stream_options(self):
147149
if self._full_document is not None:
148150
options["fullDocument"] = self._full_document
149151

152+
if self._full_document_before_change is not None:
153+
options["fullDocumentBeforeChange"] = self._full_document_before_change
154+
150155
resume_token = self.resume_token
151156
if resume_token is not None:
152157
if self._uses_start_after:

pymongo/collection.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2501,6 +2501,7 @@ def watch(
25012501
session: Optional["ClientSession"] = None,
25022502
start_after: Optional[Mapping[str, Any]] = None,
25032503
comment: Optional[Any] = None,
2504+
full_document_before_change: Optional[str] = None,
25042505
) -> CollectionChangeStream[_DocumentType]:
25052506
"""Watch changes on this collection.
25062507
@@ -2559,6 +2560,8 @@ def watch(
25592560
updates will include both a delta describing the changes to the
25602561
document, as well as a copy of the entire document that was
25612562
changed from some time after the change occurred.
2563+
- `full_document_before_change`: Allowed values: `whenAvailable` and `required`. Change events
2564+
may now result in a `fullDocumentBeforeChange` response field.
25622565
- `resume_after` (optional): A resume token. If provided, the
25632566
change stream will start returning changes that occur directly
25642567
after the operation specified in the resume token. A resume token
@@ -2585,6 +2588,8 @@ def watch(
25852588
:Returns:
25862589
A :class:`~pymongo.change_stream.CollectionChangeStream` cursor.
25872590
2591+
.. versionchanged:: 4.2
2592+
Added ``full_document_before_change`` parameter.
25882593
25892594
.. versionchanged:: 4.1
25902595
Added ``comment`` parameter.
@@ -2613,7 +2618,8 @@ def watch(
26132618
start_at_operation_time,
26142619
session,
26152620
start_after,
2616-
comment=comment,
2621+
comment,
2622+
full_document_before_change,
26172623
)
26182624

26192625
def rename(

pymongo/database.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,6 +389,8 @@ def create_collection(
389389
- ``pipeline`` (list): a list of aggregation pipeline stages
390390
- ``comment`` (str): a user-provided comment to attach to this command.
391391
This option is only supported on MongoDB >= 4.4.
392+
- ``changeStreamPreAndPostImages`` (dict): a document with a boolean field ``enabled`` for
393+
enabling pre- and post-images.
392394
393395
.. versionchanged:: 4.2
394396
Added ``encrypted_fields`` parameter.
@@ -530,6 +532,7 @@ def watch(
530532
session: Optional["ClientSession"] = None,
531533
start_after: Optional[Mapping[str, Any]] = None,
532534
comment: Optional[Any] = None,
535+
full_document_before_change: Optional[str] = None,
533536
) -> DatabaseChangeStream[_DocumentType]:
534537
"""Watch changes on this database.
535538
@@ -576,11 +579,13 @@ def watch(
576579
pipeline stages are valid after a ``$changeStream`` stage, see the
577580
MongoDB documentation on change streams for the supported stages.
578581
- `full_document` (optional): The fullDocument to pass as an option
579-
to the ``$changeStream`` stage. Allowed values: 'updateLookup'.
582+
to the ``$changeStream`` stage. Allowed values: 'updateLookup', 'whenAvailable', 'required'.
580583
When set to 'updateLookup', the change notification for partial
581584
updates will include both a delta describing the changes to the
582585
document, as well as a copy of the entire document that was
583586
changed from some time after the change occurred.
587+
- `full_document_before_change`: Allowed values: `whenAvailable` and `required`. Change events
588+
may now result in a `fullDocumentBeforeChange` response field.
584589
- `resume_after` (optional): A resume token. If provided, the
585590
change stream will start returning changes that occur directly
586591
after the operation specified in the resume token. A resume token
@@ -607,6 +612,9 @@ def watch(
607612
:Returns:
608613
A :class:`~pymongo.change_stream.DatabaseChangeStream` cursor.
609614
615+
.. versionchanged:: 4.2
616+
Added ``full_document_before_change`` parameter.
617+
610618
.. versionchanged:: 4.1
611619
Added ``comment`` parameter.
612620
@@ -631,7 +639,8 @@ def watch(
631639
start_at_operation_time,
632640
session,
633641
start_after,
634-
comment=comment,
642+
comment,
643+
full_document_before_change,
635644
)
636645

637646
def _command(

pymongo/mongo_client.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,7 @@ def watch(
871871
session: Optional[client_session.ClientSession] = None,
872872
start_after: Optional[Mapping[str, Any]] = None,
873873
comment: Optional[Any] = None,
874+
full_document_before_change: Optional[str] = None,
874875
) -> ChangeStream[_DocumentType]:
875876
"""Watch changes on this cluster.
876877
@@ -922,6 +923,8 @@ def watch(
922923
updates will include both a delta describing the changes to the
923924
document, as well as a copy of the entire document that was
924925
changed from some time after the change occurred.
926+
- `full_document_before_change`: Allowed values: `whenAvailable` and `required`. Change events
927+
may now result in a `fullDocumentBeforeChange` response field.
925928
- `resume_after` (optional): A resume token. If provided, the
926929
change stream will start returning changes that occur directly
927930
after the operation specified in the resume token. A resume token
@@ -948,6 +951,9 @@ def watch(
948951
:Returns:
949952
A :class:`~pymongo.change_stream.ClusterChangeStream` cursor.
950953
954+
.. versionchanged:: 4.2
955+
Added ``full_document_before_change`` parameter.
956+
951957
.. versionchanged:: 4.1
952958
Added ``comment`` parameter.
953959
@@ -972,7 +978,8 @@ def watch(
972978
start_at_operation_time,
973979
session,
974980
start_after,
975-
comment=comment,
981+
comment,
982+
full_document_before_change,
976983
)
977984

978985
@property

0 commit comments

Comments
 (0)