@@ -50,15 +50,15 @@ ChangeStreamProxyStage::ChangeStreamProxyStage(OperationContext* opCtx,
5050 _latestOplogTimestamp = ResumeToken::parse (_postBatchResumeToken).getData ().clusterTime ;
5151}
5252
53- boost::optional<BSONObj > ChangeStreamProxyStage::getNextBson () {
53+ boost::optional<Document > ChangeStreamProxyStage::getNext () {
5454 if (auto next = _pipeline->getNext ()) {
5555 // While we have more results to return, we track both the timestamp and the resume token of
5656 // the latest event observed in the oplog, the latter via its sort key metadata field.
57- auto nextBSON = _validateAndConvertToBSON (*next);
57+ _validateResumeToken (*next);
5858 _latestOplogTimestamp = PipelineD::getLatestOplogTimestamp (_pipeline.get ());
5959 _postBatchResumeToken = next->metadata ().getSortKey ();
6060 _setSpeculativeReadTimestamp ();
61- return nextBSON ;
61+ return next ;
6262 }
6363
6464 // We ran out of results to return. Check whether the oplog cursor has moved forward since the
@@ -76,10 +76,10 @@ boost::optional<BSONObj> ChangeStreamProxyStage::getNextBson() {
7676 return boost::none;
7777}
7878
79- BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON (const Document& event) const {
79+ void ChangeStreamProxyStage::_validateResumeToken (const Document& event) const {
8080 // If we are producing output to be merged on mongoS, then no stages can have modified the _id.
8181 if (_includeMetaData) {
82- return event. toBsonWithMetaData () ;
82+ return ;
8383 }
8484 // Confirm that the document _id field matches the original resume token in the sort key field.
8585 auto eventBSON = event.toBson ();
@@ -95,7 +95,6 @@ BSONObj ChangeStreamProxyStage::_validateAndConvertToBSON(const Document& event)
9595 << BSON (" _id" << resumeToken) << " but found: "
9696 << (eventBSON[" _id" ] ? BSON (" _id" << eventBSON[" _id" ]) : BSONObj ()),
9797 idField.binaryEqual (resumeToken));
98- return eventBSON;
9998}
10099
101100void ChangeStreamProxyStage::_setSpeculativeReadTimestamp () {
0 commit comments