Skip to content

Commit 54b2a50

Browse files
committed
Merge pull request #654
2 parents c69a715 + b9865b8 commit 54b2a50

14 files changed

+4016
-27
lines changed

src/Operation/WithTransaction.php

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
<?php
2+
3+
namespace MongoDB\Operation;
4+
5+
use Exception;
6+
use MongoDB\Driver\Exception\RuntimeException;
7+
use MongoDB\Driver\Session;
8+
use function call_user_func;
9+
use function time;
10+
11+
/**
12+
* @internal
13+
*/
14+
class WithTransaction
15+
{
16+
/** @var callable */
17+
private $callback;
18+
19+
/** @var array */
20+
private $transactionOptions;
21+
22+
/**
23+
* @see Session::startTransaction for supported transaction options
24+
*
25+
* @param callable $callback A callback that will be invoked within the transaction
26+
* @param array $transactionOptions Additional options that are passed to Session::startTransaction
27+
*/
28+
public function __construct(callable $callback, array $transactionOptions = [])
29+
{
30+
$this->callback = $callback;
31+
$this->transactionOptions = $transactionOptions;
32+
}
33+
34+
/**
35+
* Execute the operation in the given session
36+
*
37+
* This helper takes care of retrying the commit operation or the entire
38+
* transaction if an error occurs.
39+
*
40+
* If the commit fails because of an UnknownTransactionCommitResult error, the
41+
* commit is retried without re-invoking the callback.
42+
* If the commit fails because of a TransientTransactionError, the entire
43+
* transaction will be retried. In this case, the callback will be invoked
44+
* again. It is important that the logic inside the callback is idempotent.
45+
*
46+
* In case of failures, the commit or transaction are retried until 120 seconds
47+
* from the initial call have elapsed. After that, no retries will happen and
48+
* the helper will throw the last exception received from the driver.
49+
*
50+
* @see Client::startSession
51+
*
52+
* @param Session $session A session object as retrieved by Client::startSession
53+
* @return void
54+
* @throws RuntimeException for driver errors while committing the transaction
55+
* @throws Exception for any other errors, including those thrown in the callback
56+
*/
57+
public function execute(Session $session)
58+
{
59+
$startTime = time();
60+
61+
while (true) {
62+
$session->startTransaction($this->transactionOptions);
63+
64+
try {
65+
call_user_func($this->callback, $session);
66+
} catch (Exception $e) {
67+
if ($session->isInTransaction()) {
68+
$session->abortTransaction();
69+
}
70+
71+
if ($e instanceof RuntimeException &&
72+
$e->hasErrorLabel('TransientTransactionError') &&
73+
! $this->isTransactionTimeLimitExceeded($startTime)
74+
) {
75+
continue;
76+
}
77+
78+
throw $e;
79+
}
80+
81+
if (! $session->isInTransaction()) {
82+
// Assume callback intentionally ended the transaction
83+
return;
84+
}
85+
86+
while (true) {
87+
try {
88+
$session->commitTransaction();
89+
} catch (RuntimeException $e) {
90+
if ($e->getCode() !== 50 /* MaxTimeMSExpired */ &&
91+
$e->hasErrorLabel('UnknownTransactionCommitResult') &&
92+
! $this->isTransactionTimeLimitExceeded($startTime)
93+
) {
94+
// Retry committing the transaction
95+
continue;
96+
}
97+
98+
if ($e->hasErrorLabel('TransientTransactionError') &&
99+
! $this->isTransactionTimeLimitExceeded($startTime)
100+
) {
101+
// Restart the transaction, invoking the callback again
102+
continue 2;
103+
}
104+
105+
throw $e;
106+
}
107+
108+
// Commit was successful
109+
break;
110+
}
111+
112+
// Transaction was successful
113+
break;
114+
}
115+
}
116+
117+
/**
118+
* Returns whether the time limit for retrying transactions in the convenient transaction API has passed
119+
*
120+
* @param int $startTime The time the transaction was started
121+
* @return bool
122+
*/
123+
private function isTransactionTimeLimitExceeded($startTime)
124+
{
125+
return time() - $startTime >= 120;
126+
}
127+
}

src/functions.php

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717

1818
namespace MongoDB;
1919

20+
use Exception;
2021
use MongoDB\BSON\Serializable;
2122
use MongoDB\Driver\Server;
2223
use MongoDB\Driver\Session;
2324
use MongoDB\Exception\InvalidArgumentException;
25+
use MongoDB\Exception\RuntimeException;
26+
use MongoDB\Operation\WithTransaction;
2427
use ReflectionClass;
2528
use ReflectionException;
2629
use function end;
@@ -338,3 +341,35 @@ function create_field_path_type_map(array $typeMap, $fieldPath)
338341

339342
return $typeMap;
340343
}
344+
345+
/**
346+
* Execute a callback within a transaction in the given session
347+
*
348+
* This helper takes care of retrying the commit operation or the entire
349+
* transaction if an error occurs.
350+
*
351+
* If the commit fails because of an UnknownTransactionCommitResult error, the
352+
* commit is retried without re-invoking the callback.
353+
* If the commit fails because of a TransientTransactionError, the entire
354+
* transaction will be retried. In this case, the callback will be invoked
355+
* again. It is important that the logic inside the callback is idempotent.
356+
*
357+
* In case of failures, the commit or transaction are retried until 120 seconds
358+
* from the initial call have elapsed. After that, no retries will happen and
359+
* the helper will throw the last exception received from the driver.
360+
*
361+
* @see Client::startSession
362+
* @see Session::startTransaction for supported transaction options
363+
*
364+
* @param Session $session A session object as retrieved by Client::startSession
365+
* @param callable $callback A callback that will be invoked within the transaction
366+
* @param array $transactionOptions Additional options that are passed to Session::startTransaction
367+
* @return void
368+
* @throws RuntimeException for driver errors while committing the transaction
369+
* @throws Exception for any other errors, including those thrown in the callback
370+
*/
371+
function with_transaction(Session $session, callable $callback, array $transactionOptions = [])
372+
{
373+
$operation = new WithTransaction($callback, $transactionOptions);
374+
$operation->execute($session);
375+
}

tests/SpecTests/ErrorExpectation.php

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Exception;
66
use MongoDB\Driver\Exception\BulkWriteException;
77
use MongoDB\Driver\Exception\CommandException;
8+
use MongoDB\Driver\Exception\ExecutionTimeoutException;
89
use MongoDB\Driver\Exception\RuntimeException;
910
use MongoDB\Exception\InvalidArgumentException;
1011
use MongoDB\Tests\TestCase;
@@ -26,9 +27,10 @@ final class ErrorExpectation
2627
*/
2728
private static $codeNameMap = [
2829
'Interrupted' => 11601,
29-
'WriteConflict' => 112,
30+
'MaxTimeMSExpired' => 50,
3031
'NoSuchTransaction' => 251,
3132
'OperationNotSupportedInTransaction' => 263,
33+
'WriteConflict' => 112,
3234
];
3335

3436
/** @var integer */
@@ -198,7 +200,7 @@ private function assertCodeName(TestCase $test, Exception $actual = null)
198200
* around this be comparing the error code against a map.
199201
*
200202
* TODO: Remove this once PHPC-1386 is resolved. */
201-
if ($actual instanceof BulkWriteException) {
203+
if ($actual instanceof BulkWriteException || $actual instanceof ExecutionTimeoutException) {
202204
$test->assertArrayHasKey($this->codeName, self::$codeNameMap);
203205
$test->assertSame(self::$codeNameMap[$this->codeName], $actual->getCode());
204206

@@ -207,6 +209,14 @@ private function assertCodeName(TestCase $test, Exception $actual = null)
207209

208210
$test->assertInstanceOf(CommandException::class, $actual);
209211
$result = $actual->getResultDocument();
212+
213+
if (isset($result->writeConcernError)) {
214+
$test->assertObjectHasAttribute('codeName', $result->writeConcernError);
215+
$test->assertSame($this->codeName, $result->writeConcernError->codeName);
216+
217+
return;
218+
}
219+
210220
$test->assertObjectHasAttribute('codeName', $result);
211221
$test->assertSame($this->codeName, $result->codeName);
212222
}

tests/SpecTests/Operation.php

Lines changed: 55 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
use function fclose;
2121
use function fopen;
2222
use function MongoDB\is_last_pipeline_operator_write;
23+
use function MongoDB\with_transaction;
2324
use function stream_get_contents;
2425
use function strtolower;
2526

@@ -127,6 +128,29 @@ public static function fromCommandMonitoring(stdClass $operation)
127128
return $o;
128129
}
129130

131+
/**
132+
* This method is exclusively used to prepare nested operations for the
133+
* withTransaction session operation
134+
*
135+
* @return Operation
136+
*/
137+
private static function fromConvenientTransactions(stdClass $operation)
138+
{
139+
$o = new self($operation);
140+
141+
if (isset($operation->error)) {
142+
$o->errorExpectation = ErrorExpectation::fromTransactions($operation);
143+
}
144+
145+
$o->resultExpectation = ResultExpectation::fromTransactions($operation, $o->getResultAssertionType());
146+
147+
if (isset($operation->collectionOptions)) {
148+
$o->collectionOptions = (array) $operation->collectionOptions;
149+
}
150+
151+
return $o;
152+
}
153+
130154
public static function fromCrud(stdClass $operation)
131155
{
132156
$o = new self($operation);
@@ -177,16 +201,17 @@ public static function fromTransactions(stdClass $operation)
177201
/**
178202
* Execute the operation and assert its outcome.
179203
*
180-
* @param FunctionalTestCase $test Test instance
181-
* @param Context $context Execution context
204+
* @param FunctionalTestCase $test Test instance
205+
* @param Context $context Execution context
206+
* @param bool $bubbleExceptions If true, any exception that was caught is rethrown
182207
*/
183-
public function assert(FunctionalTestCase $test, Context $context)
208+
public function assert(FunctionalTestCase $test, Context $context, $bubbleExceptions = false)
184209
{
185210
$result = null;
186211
$exception = null;
187212

188213
try {
189-
$result = $this->execute($context);
214+
$result = $this->execute($test, $context);
190215

191216
/* Eagerly iterate the results of a cursor. This both allows an
192217
* exception to be thrown sooner and ensures that any expected
@@ -211,6 +236,10 @@ public function assert(FunctionalTestCase $test, Context $context)
211236
if (isset($this->resultExpectation)) {
212237
$this->resultExpectation->assert($test, $result);
213238
}
239+
240+
if ($exception && $bubbleExceptions) {
241+
throw $exception;
242+
}
214243
}
215244

216245
/**
@@ -220,7 +249,7 @@ public function assert(FunctionalTestCase $test, Context $context)
220249
* @return mixed
221250
* @throws LogicException if the operation is unsupported
222251
*/
223-
private function execute(Context $context)
252+
private function execute(FunctionalTestCase $test, Context $context)
224253
{
225254
switch ($this->object) {
226255
case self::OBJECT_CLIENT:
@@ -248,9 +277,9 @@ private function execute(Context $context)
248277

249278
return $this->executeForDatabase($database, $context);
250279
case self::OBJECT_SESSION0:
251-
return $this->executeForSession($context->session0, $context);
280+
return $this->executeForSession($context->session0, $test, $context);
252281
case self::OBJECT_SESSION1:
253-
return $this->executeForSession($context->session1, $context);
282+
return $this->executeForSession($context->session1, $test, $context);
254283
default:
255284
throw new LogicException('Unsupported object: ' . $this->object);
256285
}
@@ -479,12 +508,13 @@ private function executeForGridFSBucket(Bucket $bucket, Context $context)
479508
/**
480509
* Executes the session operation and return its result.
481510
*
482-
* @param Session $session
483-
* @param Context $context Execution context
511+
* @param Session $session
512+
* @param FunctionalTestCase $test
513+
* @param Context $context Execution context
484514
* @return mixed
485515
* @throws LogicException if the session operation is unsupported
486516
*/
487-
private function executeForSession(Session $session, Context $context)
517+
private function executeForSession(Session $session, FunctionalTestCase $test, Context $context)
488518
{
489519
switch ($this->name) {
490520
case 'abortTransaction':
@@ -495,6 +525,21 @@ private function executeForSession(Session $session, Context $context)
495525
$options = isset($this->arguments['options']) ? (array) $this->arguments['options'] : [];
496526

497527
return $session->startTransaction($context->prepareOptions($options));
528+
case 'withTransaction':
529+
/** @var self[] $callbackOperations */
530+
$callbackOperations = array_map(function ($operation) {
531+
return self::fromConvenientTransactions($operation);
532+
}, $this->arguments['callback']->operations);
533+
534+
$callback = function () use ($callbackOperations, $test, $context) {
535+
foreach ($callbackOperations as $operation) {
536+
$operation->assert($test, $context, true);
537+
}
538+
};
539+
540+
$options = isset($this->arguments['options']) ? (array) $this->arguments['options'] : [];
541+
542+
return with_transaction($session, $callback, $context->prepareOptions($options));
498543
default:
499544
throw new LogicException('Unsupported session operation: ' . $this->name);
500545
}

0 commit comments

Comments
 (0)