Skip to content

Commit b3da293

Browse files
committed
PHPLIB-423: Add additional change stream prose tests
1 parent 975ff03 commit b3da293

File tree

1 file changed

+115
-10
lines changed

1 file changed

+115
-10
lines changed

tests/Operation/WatchFunctionalTest.php

Lines changed: 115 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use MongoDB\ChangeStream;
66
use MongoDB\BSON\TimestampInterface;
7+
use MongoDB\Driver\Cursor;
78
use MongoDB\Driver\Manager;
89
use MongoDB\Driver\ReadPreference;
910
use MongoDB\Driver\Server;
@@ -33,6 +34,98 @@ public function setUp()
3334
$this->createCollection();
3435
}
3536

37+
/**
38+
* Prose test: "ChangeStream must continuously track the last seen
39+
* resumeToken"
40+
*/
41+
public function testGetResumeToken()
42+
{
43+
if ($this->isPostBatchResumeTokenSupported()) {
44+
$this->markTestSkipped('postBatchResumeToken is supported');
45+
}
46+
47+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
48+
$changeStream = $operation->execute($this->getPrimaryServer());
49+
50+
$changeStream->rewind();
51+
$this->assertFalse($changeStream->valid());
52+
$this->assertNull($changeStream->getResumeToken());
53+
54+
$this->insertDocument(['x' => 1]);
55+
$this->insertDocument(['x' => 2]);
56+
57+
$changeStream->next();
58+
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
59+
60+
$changeStream->next();
61+
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
62+
63+
$this->insertDocument(['x' => 3]);
64+
65+
$changeStream->next();
66+
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
67+
}
68+
69+
/**
70+
* Prose test: "ChangeStream must continuously track the last seen
71+
* resumeToken"
72+
*/
73+
public function testGetResumeTokenWithPostBatchResumeToken()
74+
{
75+
if ( ! $this->isPostBatchResumeTokenSupported()) {
76+
$this->markTestSkipped('postBatchResumeToken is not supported');
77+
}
78+
79+
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
80+
81+
$events = [];
82+
83+
(new CommandObserver)->observe(
84+
function() use ($operation, &$changeStream) {
85+
$changeStream = $operation->execute($this->getPrimaryServer());
86+
},
87+
function (array $event) use (&$events) {
88+
$events[] = $event;
89+
}
90+
);
91+
92+
$this->assertCount(1, $events);
93+
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
94+
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
95+
96+
$changeStream->rewind();
97+
$this->assertFalse($changeStream->valid());
98+
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
99+
100+
$this->insertDocument(['x' => 1]);
101+
$this->insertDocument(['x' => 2]);
102+
103+
$events = [];
104+
105+
(new CommandObserver)->observe(
106+
function() use ($changeStream) {
107+
$changeStream->next();
108+
},
109+
function (array $event) use (&$events) {
110+
$events[] = $event;
111+
}
112+
);
113+
114+
$this->assertCount(1, $events);
115+
$this->assertSame('getMore', $events[0]['started']->getCommandName());
116+
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
117+
118+
$changeStream->next();
119+
$this->assertSameDocument($changeStream->current()->_id, $changeStream->getResumeToken());
120+
121+
$changeStream->next();
122+
$this->assertSameDocument($postBatchResumeToken, $changeStream->getResumeToken());
123+
}
124+
125+
/**
126+
* Prose test: "ChangeStream will resume after a killCursors command is
127+
* issued for its child cursor."
128+
*/
36129
public function testNextResumesAfterCursorNotFound()
37130
{
38131
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
@@ -127,7 +220,6 @@ public function testResumeBeforeReceivingAnyResultsIncludesPostBatchResumeToken(
127220

128221
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
129222

130-
$operationTime = null;
131223
$events = [];
132224

133225
(new CommandObserver)->observe(
@@ -141,12 +233,7 @@ function (array $event) use (&$events) {
141233

142234
$this->assertCount(1, $events);
143235
$this->assertSame('aggregate', $events[0]['started']->getCommandName());
144-
$reply = $events[0]['succeeded']->getReply();
145-
$this->assertObjectHasAttribute('cursor', $reply);
146-
$this->assertInternalType('object', $reply->cursor);
147-
$this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
148-
$postBatchResumeToken = $reply->cursor->postBatchResumeToken;
149-
$this->assertInternalType('object', $postBatchResumeToken);
236+
$postBatchResumeToken = $this->getPostBatchResumeTokenFromReply($events[0]['succeeded']->getReply());
150237

151238
$this->assertFalse($changeStream->valid());
152239
$this->killChangeStreamCursor($changeStream);
@@ -190,6 +277,11 @@ private function assertResumeAfter($expectedResumeToken, stdClass $command)
190277
$this->assertEquals($expectedResumeToken, $command->pipeline[0]->{'$changeStream'}->resumeAfter);
191278
}
192279

280+
/**
281+
* Prose test: "$changeStream stage for ChangeStream against a server >=4.0
282+
* and <4.0.7 that has not received any results yet MUST include a
283+
* startAtOperationTime option when resuming a changestream."
284+
*/
193285
public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime()
194286
{
195287
if ( ! $this->isStartAtOperationTimeSupported()) {
@@ -202,7 +294,6 @@ public function testResumeBeforeReceivingAnyResultsIncludesStartAtOperationTime(
202294

203295
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), [], $this->defaultOptions);
204296

205-
$operationTime = null;
206297
$events = [];
207298

208299
(new CommandObserver)->observe(
@@ -554,6 +645,10 @@ public function testNonEmptyPipeline()
554645
$this->assertSameDocument($expectedResult, $changeStream->current());
555646
}
556647

648+
/**
649+
* Prose test: "Ensure that a cursor returned from an aggregate command with
650+
* a cursor id and an initial empty batch is not closed on the driver side."
651+
*/
557652
public function testInitialCursorIsNotClosed()
558653
{
559654
$operation = new Watch($this->manager, $this->getDatabaseName(), $this->getCollectionName(), []);
@@ -567,7 +662,7 @@ public function testInitialCursorIsNotClosed()
567662
* internal Cursor and call isDead(). */
568663
$this->assertNotEquals('0', (string) $changeStream->getCursorId());
569664

570-
$rc = new ReflectionClass('MongoDB\ChangeStream');
665+
$rc = new ReflectionClass(ChangeStream::class);
571666
$rp = $rc->getProperty('iterator');
572667
$rp->setAccessible(true);
573668

@@ -577,7 +672,7 @@ public function testInitialCursorIsNotClosed()
577672

578673
$cursor = $iterator->getInnerIterator();
579674

580-
$this->assertInstanceOf('MongoDB\Driver\Cursor', $cursor);
675+
$this->assertInstanceOf(Cursor::class, $cursor);
581676
$this->assertFalse($cursor->isDead());
582677
}
583678

@@ -1109,6 +1204,16 @@ function(array $event) use (&$commands) {
11091204
$this->assertEmpty($commands);
11101205
}
11111206

1207+
private function getPostBatchResumeTokenFromReply(stdClass $reply)
1208+
{
1209+
$this->assertObjectHasAttribute('cursor', $reply);
1210+
$this->assertInternalType('object', $reply->cursor);
1211+
$this->assertObjectHasAttribute('postBatchResumeToken', $reply->cursor);
1212+
$this->assertInternalType('object', $reply->cursor->postBatchResumeToken);
1213+
1214+
return $reply->cursor->postBatchResumeToken;
1215+
}
1216+
11121217
private function insertDocument($document)
11131218
{
11141219
$insertOne = new InsertOne(

0 commit comments

Comments
 (0)