Skip to content

Commit 5042a2f

Browse files
committed
Merge pull request #662
2 parents e421365 + f86f220 commit 5042a2f

File tree

3 files changed

+296
-5
lines changed

3 files changed

+296
-5
lines changed

src/ChangeStream.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ private function onIteration($incrementKey)
220220
*/
221221
private function resume()
222222
{
223-
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken());
223+
$this->iterator = call_user_func($this->resumeCallable, $this->getResumeToken(), $this->hasAdvanced);
224224
$this->iterator->rewind();
225225

226226
$this->onIteration($this->hasAdvanced);

src/Operation/Watch.php

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public function execute(Server $server)
250250
{
251251
return new ChangeStream(
252252
$this->createChangeStreamIterator($server),
253-
function($resumeToken) { return $this->resume($resumeToken); }
253+
function($resumeToken, $hasAdvanced) { return $this->resume($resumeToken, $hasAdvanced); }
254254
);
255255
}
256256

@@ -333,10 +333,11 @@ private function getInitialResumeToken()
333333
*
334334
* @see https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst#resume-process
335335
* @param array|object|null $resumeToken
336+
* @param bool $hasAdvanced
336337
* @return ChangeStreamIterator
337338
* @throws InvalidArgumentException
338339
*/
339-
private function resume($resumeToken = null)
340+
private function resume($resumeToken = null, $hasAdvanced = false)
340341
{
341342
if (isset($resumeToken) && ! is_array($resumeToken) && ! is_object($resumeToken)) {
342343
throw InvalidArgumentException::invalidType('$resumeToken', $resumeToken, 'array or object');
@@ -347,12 +348,14 @@ private function resume($resumeToken = null)
347348
// Select a new server using the original read preference
348349
$server = $this->manager->selectServer($this->aggregateOptions['readPreference']);
349350

351+
$resumeOption = isset($this->changeStreamOptions['startAfter']) && !$hasAdvanced ? 'startAfter' : 'resumeAfter';
352+
350353
unset($this->changeStreamOptions['resumeAfter']);
351354
unset($this->changeStreamOptions['startAfter']);
352355
unset($this->changeStreamOptions['startAtOperationTime']);
353356

354357
if ($resumeToken !== null) {
355-
$this->changeStreamOptions['resumeAfter'] = $resumeToken;
358+
$this->changeStreamOptions[$resumeOption] = $resumeToken;
356359
}
357360

358361
if ($resumeToken === null && $this->operationTime !== null) {

tests/Operation/WatchFunctionalTest.php

Lines changed: 289 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@
22

33
namespace MongoDB\Tests\Operation;
44

5+
use Closure;
56
use MongoDB\ChangeStream;
67
use MongoDB\BSON\TimestampInterface;
78
use MongoDB\Driver\Cursor;
9+
use MongoDB\Driver\Exception\CommandException;
10+
use MongoDB\Driver\Exception\ConnectionTimeoutException;
811
use MongoDB\Driver\Manager;
912
use MongoDB\Driver\ReadPreference;
1013
use MongoDB\Driver\Server;
@@ -25,6 +28,8 @@ class WatchFunctionalTest extends FunctionalTestCase
2528
{
2629
use SetUpTearDownTrait;
2730

31+
const NOT_MASTER = 10107;
32+
2833
private static $wireVersionForStartAtOperationTime = 7;
2934

3035
private $defaultOptions = ['maxAwaitTimeMS' => 500];
@@ -890,9 +895,11 @@ public function testRewindExtractsResumeTokenAndNextResumes()
890895
$changeStream->next();
891896
$this->assertTrue($changeStream->valid());
892897

893-
$options = ['resumeAfter' => $changeStream->current()->_id] + $this->defaultOptions;
898+
$resumeToken = $changeStream->current()->_id;
899+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
894900
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
895901
$changeStream = $operation->execute($this->getPrimaryServer());
902+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
896903

897904
$changeStream->rewind();
898905
$this->assertTrue($changeStream->valid());
@@ -979,6 +986,7 @@ public function testStartAfterOption()
979986
$options = $this->defaultOptions + ['startAfter' => $resumeToken];
980987
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
981988
$changeStream = $operation->execute($this->getPrimaryServer());
989+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
982990

983991
$changeStream->rewind();
984992
$this->assertTrue($changeStream->valid());
@@ -1193,6 +1201,286 @@ public function testSessionFreed()
11931201
$this->assertNull($rp->getValue($changeStream));
11941202
}
11951203

1204+
/**
1205+
* Prose test: "ChangeStream will automatically resume one time on a
1206+
* resumable error (including not master) with the initial pipeline and
1207+
* options, except for the addition/update of a resumeToken."
1208+
*/
1209+
public function testResumeRepeatsOriginalPipelineAndOptions()
1210+
{
1211+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1212+
1213+
$aggregateCommands = [];
1214+
1215+
$this->configureFailPoint([
1216+
'configureFailPoint' => 'failCommand',
1217+
'mode' => ['times' => 1],
1218+
'data' => ['failCommands' => ['getMore'], 'errorCode' => self::NOT_MASTER],
1219+
]);
1220+
1221+
(new CommandObserver)->observe(
1222+
function() use ($operation) {
1223+
$changeStream = $operation->execute($this->getPrimaryServer());
1224+
1225+
// The first next will hit the fail point, causing a resume
1226+
$changeStream->next();
1227+
$changeStream->next();
1228+
},
1229+
function(array $event) use (&$aggregateCommands) {
1230+
$command = $event['started']->getCommand();
1231+
if ($event['started']->getCommandName() !== 'aggregate') {
1232+
return;
1233+
}
1234+
1235+
$aggregateCommands[] = (array) $command;
1236+
}
1237+
);
1238+
1239+
$this->assertCount(2, $aggregateCommands);
1240+
1241+
$this->assertThat(
1242+
$aggregateCommands[0]['pipeline'][0]->{'$changeStream'},
1243+
$this->logicalNot(
1244+
$this->logicalOr(
1245+
$this->objectHasAttribute('resumeAfter'),
1246+
$this->objectHasAttribute('startAfter'),
1247+
$this->objectHasAttribute('startAtOperationTime')
1248+
)
1249+
)
1250+
);
1251+
1252+
$this->assertThat(
1253+
$aggregateCommands[1]['pipeline'][0]->{'$changeStream'},
1254+
$this->logicalOr(
1255+
$this->objectHasAttribute('resumeAfter'),
1256+
$this->objectHasAttribute('startAfter'),
1257+
$this->objectHasAttribute('startAtOperationTime')
1258+
)
1259+
);
1260+
1261+
$aggregateCommands = array_map(
1262+
function (array $aggregateCommand) {
1263+
// Remove resume options from the changestream document
1264+
if (isset($aggregateCommand['pipeline'][0]->{'$changeStream'})) {
1265+
$aggregateCommand['pipeline'][0]->{'$changeStream'} = array_diff_key(
1266+
(array) $aggregateCommand['pipeline'][0]->{'$changeStream'},
1267+
['resumeAfter' => false, 'startAfter' => false, 'startAtOperationTime' => false]
1268+
);
1269+
}
1270+
1271+
// Remove options we don't want to compare between commands
1272+
return array_diff_key($aggregateCommand, ['lsid' => false, '$clusterTime' => false]);
1273+
},
1274+
$aggregateCommands
1275+
);
1276+
1277+
// Ensure options in original and resuming aggregate command match
1278+
$this->assertEquals($aggregateCommands[0], $aggregateCommands[1]);
1279+
}
1280+
1281+
/**
1282+
* Prose test: "ChangeStream will not attempt to resume on any error
1283+
* encountered while executing an aggregate command."
1284+
*/
1285+
public function testErrorDuringAggregateCommandDoesNotCauseResume()
1286+
{
1287+
if (version_compare($this->getServerVersion(), '4.0.0', '<')) {
1288+
$this->markTestSkipped('failCommand is not supported');
1289+
}
1290+
1291+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1292+
1293+
$commandCount = 0;
1294+
1295+
$this->configureFailPoint([
1296+
'configureFailPoint' => 'failCommand',
1297+
'mode' => ['times' => 1],
1298+
'data' => ['failCommands' => ['aggregate'], 'errorCode' => self::NOT_MASTER],
1299+
]);
1300+
1301+
$this->expectException(CommandException::class);
1302+
1303+
(new CommandObserver)->observe(
1304+
function() use ($operation) {
1305+
$operation->execute($this->getPrimaryServer());
1306+
},
1307+
function(array $event) use (&$commandCount) {
1308+
$commandCount++;
1309+
}
1310+
);
1311+
1312+
$this->assertSame(1, $commandCount);
1313+
}
1314+
1315+
/**
1316+
* Prose test: "ChangeStream will perform server selection before attempting
1317+
* to resume, using initial readPreference"
1318+
*/
1319+
public function testOriginalReadPreferenceIsPreservedOnResume()
1320+
{
1321+
$readPreference = new ReadPreference('secondary');
1322+
$options = ['readPreference' => $readPreference] + $this->defaultOptions;
1323+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1324+
1325+
try {
1326+
$secondary = $this->manager->selectServer($readPreference);
1327+
} catch (ConnectionTimeoutException $e) {
1328+
$this->markTestSkipped('Secondary is not available');
1329+
}
1330+
1331+
$changeStream = $operation->execute($secondary);
1332+
$previousCursorId = $changeStream->getCursorId();
1333+
$this->killChangeStreamCursor($changeStream);
1334+
1335+
$changeStream->next();
1336+
$this->assertNotSame($previousCursorId, $changeStream->getCursorId());
1337+
1338+
$getCursor = Closure::bind(
1339+
function () {
1340+
return $this->iterator->getInnerIterator();
1341+
},
1342+
$changeStream,
1343+
ChangeStream::class
1344+
);
1345+
/** @var Cursor $cursor */
1346+
$cursor = $getCursor();
1347+
self::assertTrue($cursor->getServer()->isSecondary());
1348+
}
1349+
1350+
/**
1351+
* Prose test
1352+
* For a ChangeStream under these conditions:
1353+
* - Running against a server <4.0.7.
1354+
* - The batch is empty or has been iterated to the last document.
1355+
* Expected result:
1356+
* - getResumeToken must return the _id of the last document returned if one exists.
1357+
* - getResumeToken must return resumeAfter from the initial aggregate if the option was specified.
1358+
* - If resumeAfter was not specified, the getResumeToken result must be empty.
1359+
*/
1360+
public function testGetResumeTokenReturnsOriginalResumeTokenOnEmptyBatch()
1361+
{
1362+
if ($this->isPostBatchResumeTokenSupported()) {
1363+
$this->markTestSkipped('postBatchResumeToken is supported');
1364+
}
1365+
1366+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1367+
$changeStream = $operation->execute($this->getPrimaryServer());
1368+
1369+
$this->assertNull($changeStream->getResumeToken());
1370+
1371+
$this->insertDocument(['x' => 1]);
1372+
1373+
$changeStream->next();
1374+
$this->assertTrue($changeStream->valid());
1375+
$resumeToken = $changeStream->getResumeToken();
1376+
$this->assertSame($resumeToken, $changeStream->current()->_id);
1377+
1378+
$options = ['resumeAfter' => $resumeToken] + $this->defaultOptions;
1379+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1380+
$changeStream = $operation->execute($this->getPrimaryServer());
1381+
1382+
$this->assertSame($resumeToken, $changeStream->getResumeToken());
1383+
}
1384+
1385+
/**
1386+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1387+
* against a server >=4.1.1 that has not received any results yet MUST
1388+
* include a startAfter option and MUST NOT include a resumeAfter option
1389+
* when resuming a change stream."
1390+
*/
1391+
public function testResumingChangeStreamWithoutPreviousResultsIncludesStartAfterOption()
1392+
{
1393+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1394+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1395+
}
1396+
1397+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1398+
$changeStream = $operation->execute($this->getPrimaryServer());
1399+
1400+
$this->insertDocument(['x' => 1]);
1401+
1402+
$changeStream->next();
1403+
$this->assertTrue($changeStream->valid());
1404+
$resumeToken = $changeStream->getResumeToken();
1405+
1406+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1407+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1408+
$changeStream = $operation->execute($this->getPrimaryServer());
1409+
$changeStream->rewind();
1410+
$this->killChangeStreamCursor($changeStream);
1411+
1412+
$aggregateCommand = null;
1413+
1414+
(new CommandObserver)->observe(
1415+
function() use ($changeStream) {
1416+
$changeStream->next();
1417+
},
1418+
function(array $event) use (&$aggregateCommand) {
1419+
if ($event['started']->getCommandName() !== 'aggregate') {
1420+
return;
1421+
}
1422+
1423+
$aggregateCommand = $event['started']->getCommand();
1424+
}
1425+
);
1426+
1427+
$this->assertNotNull($aggregateCommand);
1428+
$this->assertObjectNotHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1429+
$this->assertObjectHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1430+
}
1431+
1432+
/**
1433+
* Prose test: "$changeStream stage for ChangeStream started with startAfter
1434+
* against a server >=4.1.1 that has received at least one result MUST
1435+
* include a resumeAfter option and MUST NOT include a startAfter option
1436+
* when resuming a change stream."
1437+
*/
1438+
public function testResumingChangeStreamWithPreviousResultsIncludesResumeAfterOption()
1439+
{
1440+
if (version_compare($this->getServerVersion(), '4.1.1', '<')) {
1441+
$this->markTestSkipped('Testing resumeAfter and startAfter can only be tested on servers >= 4.1.1');
1442+
}
1443+
1444+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
1445+
$changeStream = $operation->execute($this->getPrimaryServer());
1446+
1447+
$this->insertDocument(['x' => 1]);
1448+
1449+
$changeStream->next();
1450+
$this->assertTrue($changeStream->valid());
1451+
$resumeToken = $changeStream->getResumeToken();
1452+
1453+
$options = ['startAfter' => $resumeToken] + $this->defaultOptions;
1454+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $options);
1455+
$changeStream = $operation->execute($this->getPrimaryServer());
1456+
$changeStream->rewind();
1457+
1458+
$this->insertDocument(['x' => 2]);
1459+
$changeStream->next();
1460+
$this->assertTrue($changeStream->valid());
1461+
1462+
$this->killChangeStreamCursor($changeStream);
1463+
1464+
$aggregateCommand = null;
1465+
1466+
(new CommandObserver)->observe(
1467+
function() use ($changeStream) {
1468+
$changeStream->next();
1469+
},
1470+
function(array $event) use (&$aggregateCommand) {
1471+
if ($event['started']->getCommandName() !== 'aggregate') {
1472+
return;
1473+
}
1474+
1475+
$aggregateCommand = $event['started']->getCommand();
1476+
}
1477+
);
1478+
1479+
$this->assertNotNull($aggregateCommand);
1480+
$this->assertObjectNotHasAttribute('startAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1481+
$this->assertObjectHasAttribute('resumeAfter', $aggregateCommand->pipeline[0]->{'$changeStream'});
1482+
}
1483+
11961484
private function assertNoCommandExecuted(callable $callable)
11971485
{
11981486
$commands = [];

0 commit comments

Comments
 (0)