Skip to content

Commit 3a4e920

Browse files
committed
Test that ChangeStream::next() resumes after dropped connection
The original intention of this test was to verify resumability in the event of a dropped connection; however, since socketTimeoutMS and maxAwaitTimeMS are fixed values, we cannot test a dropped connection followed by a successful resume. Instead, we will test that the driver attempts to resume once and only once before we expect a socket timeout after the second attempt. Note: ChangeStream::rewind() does not currently resume (see: PHPLIB-322)
1 parent 8acab3b commit 3a4e920

File tree

2 files changed

+69
-22
lines changed

2 files changed

+69
-22
lines changed

tests/CommandObserver.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use MongoDB\Driver\Monitoring\CommandStartedEvent;
77
use MongoDB\Driver\Monitoring\CommandSucceededEvent;
88
use MongoDB\Driver\Monitoring\CommandSubscriber;
9+
use Exception;
910

1011
/**
1112
* Observes command documents using the driver's monitoring API.
@@ -20,13 +21,19 @@ public function observe(callable $execution, callable $commandCallback)
2021

2122
\MongoDB\Driver\Monitoring\addSubscriber($this);
2223

23-
call_user_func($execution);
24+
try {
25+
call_user_func($execution);
26+
} catch (Exception $executionException) {}
2427

2528
\MongoDB\Driver\Monitoring\removeSubscriber($this);
2629

2730
foreach ($this->commands as $command) {
2831
call_user_func($commandCallback, $command);
2932
}
33+
34+
if (isset($executionException)) {
35+
throw $executionException;
36+
}
3037
}
3138

3239
public function commandStarted(CommandStartedEvent $event)

tests/Operation/WatchFunctionalTest.php

Lines changed: 61 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,15 @@
33
namespace MongoDB\Tests\Operation;
44

55
use MongoDB\Client;
6+
use MongoDB\Driver\Manager;
7+
use MongoDB\Driver\ReadPreference;
68
use MongoDB\Driver\Server;
9+
use MongoDB\Driver\Exception\ConnectionTimeoutException;
710
use MongoDB\Operation\DatabaseCommand;
811
use MongoDB\Operation\InsertOne;
912
use MongoDB\Operation\Watch;
13+
use MongoDB\Tests\CommandObserver;
14+
use stdClass;
1015

1116
class WatchFunctionalTest extends FunctionalTestCase
1217
{
@@ -61,6 +66,62 @@ public function testResume()
6166
$this->assertEquals($expectedResult, $changeStream->current());
6267
}
6368

69+
/**
70+
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
71+
*/
72+
public function testNextResumesAfterConnectionException()
73+
{
74+
/* In order to trigger a dropped connection, we'll use a new client with
75+
* a socket timeout that is less than the change stream's maxAwaitTimeMS
76+
* option. */
77+
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
78+
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
79+
80+
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
81+
$changeStream = $operation->execute($primaryServer);
82+
83+
/* Note: we intentionally do not start iteration with rewind() to ensure
84+
* that we test resume functionality within next(). */
85+
86+
$commands = [];
87+
88+
try {
89+
(new CommandObserver)->observe(
90+
function() use ($changeStream) {
91+
$changeStream->next();
92+
},
93+
function(stdClass $command) use (&$commands) {
94+
$commands[] = key((array) $command);
95+
}
96+
);
97+
$this->fail('ConnectionTimeoutException was not thrown');
98+
} catch (ConnectionTimeoutException $e) {}
99+
100+
$expectedCommands = [
101+
/* The initial aggregate command for change streams returns a cursor
102+
* envelope with an empty initial batch, since there are no changes
103+
* to report at the moment the change stream is created. Therefore,
104+
* we expect a getMore to be issued when we first advance the change
105+
* stream (with either rewind() or next()). */
106+
'getMore',
107+
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
108+
* getMore command encounters a client socket timeout and leaves the
109+
* cursor open on the server. ChangeStream should catch this error
110+
* and resume by issuing a new aggregate command. */
111+
'aggregate',
112+
/* When ChangeStream resumes, it overwrites its original cursor with
113+
* the new cursor resulting from the last aggregate command. This
114+
* removes the last reference to the old cursor, which causes the
115+
* driver to kill it (via mongoc_cursor_destroy()). */
116+
'killCursors',
117+
/* Finally, ChangeStream will rewind the new cursor as the last step
118+
* of the resume process. This results in one last getMore. */
119+
'getMore',
120+
];
121+
122+
$this->assertSame($expectedCommands, $commands);
123+
}
124+
64125
public function testNoChangeAfterResumeBeforeInsert()
65126
{
66127
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
@@ -197,27 +258,6 @@ public function testFailureAfterResumeTokenRemoved()
197258
$changeStream->next();
198259
}
199260

200-
public function testConnectionException()
201-
{
202-
$client = new Client($this->getUri(), ['socketTimeoutMS' => 1005], []);
203-
$collection = $client->selectCollection($this->getDatabaseName(), $this->getCollectionName());
204-
205-
$changeStream = $collection->watch();
206-
$changeStream->next();
207-
208-
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
209-
210-
$changeStream->next();
211-
$expectedResult = (object) ([
212-
'_id' => $changeStream->current()->_id,
213-
'operationType' => 'insert',
214-
'fullDocument' => (object) ['_id' => 1, 'x' => 'foo'],
215-
'ns' => (object) ['db' => 'phplib_test', 'coll' => 'WatchFunctionalTest.226d95f1'],
216-
'documentKey' => (object) ['_id' => 1]
217-
]);
218-
$this->assertEquals($changeStream->current(), $expectedResult);
219-
}
220-
221261
public function testMaxAwaitTimeMS()
222262
{
223263
/* On average, an acknowledged write takes about 20 ms to appear in a

0 commit comments

Comments
 (0)