Skip to content

Commit 971545c

Browse files
committed
Merge pull request #479
2 parents de2e331 + 42bf825 commit 971545c

File tree

2 files changed

+133
-7
lines changed

2 files changed

+133
-7
lines changed

src/ChangeStream.php

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,26 @@ public function next()
117117
*/
118118
public function rewind()
119119
{
120-
$this->csIt->rewind();
120+
$resumable = false;
121+
try {
122+
$this->csIt->rewind();
123+
if ($this->valid()) {
124+
$this->extractResumeToken($this->csIt->current());
125+
}
126+
} catch (RuntimeException $e) {
127+
if (strpos($e->getMessage(), "not master") !== false) {
128+
$resumable = true;
129+
}
130+
if ($e->getCode() === self::CURSOR_NOT_FOUND) {
131+
$resumable = true;
132+
}
133+
if ($e instanceof ConnectionTimeoutException) {
134+
$resumable = true;
135+
}
136+
}
137+
if ($resumable) {
138+
$this->resume();
139+
}
121140
}
122141

123142
/**

tests/Operation/WatchFunctionalTest.php

Lines changed: 113 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,6 @@ public function testNextResumesAfterCursorNotFound()
7373
$this->assertSameDocument($expectedResult, $changeStream->current());
7474
}
7575

76-
/**
77-
* @todo test that rewind() also resumes once PHPLIB-322 is implemented
78-
*/
7976
public function testNextResumesAfterConnectionException()
8077
{
8178
/* In order to trigger a dropped connection, we'll use a new client with
@@ -129,6 +126,56 @@ function(stdClass $command) use (&$commands) {
129126
$this->assertSame($expectedCommands, $commands);
130127
}
131128

129+
public function testRewindResumesAfterConnectionException()
130+
{
131+
/* In order to trigger a dropped connection, we'll use a new client with
132+
* a socket timeout that is less than the change stream's maxAwaitTimeMS
133+
* option. */
134+
$manager = new Manager($this->getUri(), ['socketTimeoutMS' => 50]);
135+
$primaryServer = $manager->selectServer(new ReadPreference(ReadPreference::RP_PRIMARY));
136+
137+
$operation = new Watch($manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
138+
$changeStream = $operation->execute($primaryServer);
139+
140+
$commands = [];
141+
142+
try {
143+
(new CommandObserver)->observe(
144+
function() use ($changeStream) {
145+
$changeStream->rewind();
146+
},
147+
function(stdClass $command) use (&$commands) {
148+
$commands[] = key((array) $command);
149+
}
150+
);
151+
$this->fail('ConnectionTimeoutException was not thrown');
152+
} catch (ConnectionTimeoutException $e) {}
153+
154+
$expectedCommands = [
155+
/* The initial aggregate command for change streams returns a cursor
156+
* envelope with an empty initial batch, since there are no changes
157+
* to report at the moment the change stream is created. Therefore,
158+
* we expect a getMore to be issued when we first advance the change
159+
* stream (with either rewind() or next()). */
160+
'getMore',
161+
/* Since socketTimeoutMS is less than maxAwaitTimeMS, the previous
162+
* getMore command encounters a client socket timeout and leaves the
163+
* cursor open on the server. ChangeStream should catch this error
164+
* and resume by issuing a new aggregate command. */
165+
'aggregate',
166+
/* When ChangeStream resumes, it overwrites its original cursor with
167+
* the new cursor resulting from the last aggregate command. This
168+
* removes the last reference to the old cursor, which causes the
169+
* driver to kill it (via mongoc_cursor_destroy()). */
170+
'killCursors',
171+
/* Finally, ChangeStream will rewind the new cursor as the last step
172+
* of the resume process. This results in one last getMore. */
173+
'getMore',
174+
];
175+
176+
$this->assertSame($expectedCommands, $commands);
177+
}
178+
132179
public function testNoChangeAfterResumeBeforeInsert()
133180
{
134181
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
@@ -260,7 +307,6 @@ public function testInitialCursorIsNotClosed()
260307

261308
/**
262309
* @expectedException MongoDB\Exception\ResumeTokenException
263-
* @todo test that rewind() also attempts to extract the resume token once PHPLIB-322 is implemented
264310
*/
265311
public function testNextCannotExtractResumeToken()
266312
{
@@ -269,13 +315,28 @@ public function testNextCannotExtractResumeToken()
269315
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
270316
$changeStream = $operation->execute($this->getPrimaryServer());
271317

272-
$changeStream->rewind();
273-
318+
/* Note: we intentionally do not start iteration with rewind() to ensure
319+
* that we test extraction functionality within next(). */
274320
$this->insertDocument(['x' => 1]);
275321

276322
$changeStream->next();
277323
}
278324

325+
/**
326+
* @expectedException MongoDB\Exception\ResumeTokenException
327+
*/
328+
public function testRewindCannotExtractResumeToken()
329+
{
330+
$pipeline = [['$project' => ['_id' => 0 ]]];
331+
332+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), $pipeline, ['maxAwaitTimeMS' => 100]);
333+
$changeStream = $operation->execute($this->getPrimaryServer());
334+
335+
$this->insertDocument(['x' => 1]);
336+
337+
$changeStream->rewind();
338+
}
339+
279340
public function testMaxAwaitTimeMS()
280341
{
281342
/* On average, an acknowledged write takes about 20 ms to appear in a
@@ -320,6 +381,52 @@ public function testMaxAwaitTimeMS()
320381
$this->assertTrue($changeStream->valid());
321382
}
322383

384+
public function testRewindResumesAfterCursorNotFound()
385+
{
386+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
387+
$changeStream = $operation->execute($this->getPrimaryServer());
388+
389+
$this->killChangeStreamCursor($changeStream);
390+
391+
$changeStream->rewind();
392+
$this->assertFalse($changeStream->valid());
393+
$this->assertNull($changeStream->current());
394+
}
395+
396+
public function testRewindExtractsResumeTokenAndNextResumes()
397+
{
398+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], ['maxAwaitTimeMS' => 100]);
399+
$changeStream = $operation->execute($this->getPrimaryServer());
400+
401+
$this->insertDocument(['_id' => 1, 'x' => 'foo']);
402+
$this->insertDocument(['_id' => 2, 'x' => 'bar']);
403+
404+
$changeStream->rewind();
405+
$this->assertTrue($changeStream->valid());
406+
$expectedResult = [
407+
'_id' => $changeStream->current()->_id,
408+
'operationType' => 'insert',
409+
'fullDocument' => ['_id' => 1, 'x' => 'foo'],
410+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
411+
'documentKey' => ['_id' => 1],
412+
];
413+
$this->assertSameDocument($expectedResult, $changeStream->current());
414+
415+
$this->killChangeStreamCursor($changeStream);
416+
417+
$changeStream->next();
418+
$this->assertTrue($changeStream->valid());
419+
420+
$expectedResult = [
421+
'_id' => $changeStream->current()->_id,
422+
'operationType' => 'insert',
423+
'fullDocument' => ['_id' => 2, 'x' => 'bar'],
424+
'ns' => ['db' => $this->getDatabaseName(), 'coll' => $this->getCollectionName()],
425+
'documentKey' => ['_id' => 2],
426+
];
427+
$this->assertSameDocument($expectedResult, $changeStream->current());
428+
}
429+
323430
private function insertDocument($document)
324431
{
325432
$insertOne = new InsertOne($this->getDatabaseName(), $this->getCollectionName(), $document);

0 commit comments

Comments
 (0)