1- import { type Document , Long , ObjectId , Timestamp } from 'bson' ;
21import { expect } from 'chai' ;
32import { once } from 'events' ;
43import * as sinon from 'sinon' ;
54import { setTimeout } from 'timers' ;
65
7- import { type ChangeStream } from '../../../src/change_stream' ;
86import {
7+ type ChangeStream ,
8+ type Collection ,
99 type CommandFailedEvent ,
1010 type CommandStartedEvent ,
11- type CommandSucceededEvent
12- } from '../../../src/cmap/command_monitoring_events' ;
13- import { type Collection } from '../../../src/collection' ;
14- import { LEGACY_HELLO_COMMAND } from '../../../src/constants' ;
15- import { MongoNetworkError } from '../../../src/error' ;
16- import { type MongoClient } from '../../../src/mongo_client' ;
11+ type CommandSucceededEvent ,
12+ type Document ,
13+ LEGACY_HELLO_COMMAND ,
14+ Long ,
15+ type MongoClient ,
16+ MongoNetworkError ,
17+ ObjectId ,
18+ Timestamp
19+ } from '../../mongodb' ;
1720import * as mock from '../../tools/mongodb-mock/index' ;
1821import { setupDatabase } from '../shared' ;
1922
@@ -636,9 +639,6 @@ describe('Change Stream prose tests', function () {
636639 let events = [ ] ;
637640 client . on ( 'commandStarted' , e => recordEvent ( events , e ) ) ;
638641 const changeStream = coll . watch ( [ ] , { startAfter } ) ;
639- changeStream . on ( 'error' , async ( ) => {
640- await changeStream . close ( ) ;
641- } ) ;
642642
643643 changeStream . on ( 'change' , change => {
644644 events . push ( { change : { insert : { x : change . fullDocument . x } } } ) ;
@@ -648,18 +648,20 @@ describe('Change Stream prose tests', function () {
648648 events = [ ] ;
649649 triggerResumableError ( changeStream , ( ) => events . push ( 'error' ) ) ;
650650 break ;
651- case 3 :
652- expect ( events ) . to . be . an ( 'array' ) . with . lengthOf ( 3 ) ;
653- expect ( events [ 0 ] ) . to . equal ( 'error' ) ;
654- expect ( events [ 1 ] ) . nested . property ( '$changeStream.resumeAfter' ) . to . exist ;
655- expect ( events [ 2 ] ) . to . eql ( { change : { insert : { x : 3 } } } ) ;
656- break ;
657651 }
658652 } ) ;
659653
660654 await once ( changeStream . cursor , 'init' ) ;
661655 await coll . insertOne ( { x : 2 } , { writeConcern : { w : 'majority' , j : true } } ) ;
662656 await coll . insertOne ( { x : 3 } , { writeConcern : { w : 'majority' , j : true } } ) ;
657+ await once ( changeStream , 'change' ) ;
658+
659+ expect ( events ) . to . be . an ( 'array' ) . with . lengthOf ( 3 ) ;
660+ expect ( events [ 0 ] ) . to . equal ( 'error' ) ;
661+ expect ( events [ 1 ] ) . nested . property ( '$changeStream.resumeAfter' ) . to . exist ;
662+ expect ( events [ 2 ] ) . to . eql ( { change : { insert : { x : 3 } } } ) ;
663+
664+ await changeStream . close ( ) ;
663665 }
664666 } ) ;
665667 } ) ;
0 commit comments