Skip to content

Commit 6dabe58

Browse files
committed
SERVER-41437 minor transaction_oplog_application.js clean up
1 parent 83a0d0d commit 6dabe58

File tree

1 file changed

+117
-104
lines changed

1 file changed

+117
-104
lines changed

src/mongo/db/repl/transaction_oplog_application.cpp

Lines changed: 117 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -83,16 +83,15 @@ Status _applyOperationsForTransaction(OperationContext* opCtx,
8383
* Helper that will read the entire sequence of oplog entries for the transaction and apply each of
8484
* them.
8585
*
86-
* Currently used for oplog application of a commitTransaction oplog entry during recovery, rollback
87-
* and initial sync.
86+
* Currently used for oplog application of a commitTransaction oplog entry during recovery and
87+
* rollback.
8888
*/
8989
Status _applyTransactionFromOplogChain(OperationContext* opCtx,
9090
const OplogEntry& entry,
9191
repl::OplogApplication::Mode mode,
9292
Timestamp commitTimestamp,
9393
Timestamp durableTimestamp) {
94-
invariant(mode == repl::OplogApplication::Mode::kRecovering ||
95-
mode == repl::OplogApplication::Mode::kInitialSync);
94+
invariant(mode == repl::OplogApplication::Mode::kRecovering);
9695

9796
repl::MultiApplier::Operations ops;
9897
{
@@ -151,76 +150,88 @@ const repl::OplogEntry getPreviousOplogEntry(OperationContext* opCtx,
151150
Status applyCommitTransaction(OperationContext* opCtx,
152151
const OplogEntry& entry,
153152
repl::OplogApplication::Mode mode) {
154-
// Return error if run via applyOps command.
155-
uassert(50987,
156-
"commitTransaction is only used internally by secondaries.",
157-
mode != repl::OplogApplication::Mode::kApplyOpsCmd);
158-
159153
IDLParserErrorContext ctx("commitTransaction");
160154
auto commitOplogEntryOpTime = entry.getOpTime();
161155
auto commitCommand = CommitTransactionOplogObject::parse(ctx, entry.getObject());
162156
invariant(commitCommand.getCommitTimestamp());
163157

164-
if (mode == repl::OplogApplication::Mode::kRecovering) {
165-
return _applyTransactionFromOplogChain(opCtx,
166-
entry,
167-
mode,
168-
*commitCommand.getCommitTimestamp(),
169-
commitOplogEntryOpTime.getTimestamp());
158+
switch (mode) {
159+
case repl::OplogApplication::Mode::kRecovering: {
160+
return _applyTransactionFromOplogChain(opCtx,
161+
entry,
162+
mode,
163+
*commitCommand.getCommitTimestamp(),
164+
commitOplogEntryOpTime.getTimestamp());
165+
}
166+
case repl::OplogApplication::Mode::kInitialSync: {
167+
// Initial sync should never apply 'commitTransaction' since it unpacks committed
168+
// transactions onto various applier threads.
169+
MONGO_UNREACHABLE;
170+
}
171+
case repl::OplogApplication::Mode::kApplyOpsCmd: {
172+
// Return error if run via applyOps command.
173+
uasserted(50987, "commitTransaction is only used internally by secondaries.");
174+
}
175+
case repl::OplogApplication::Mode::kSecondary: {
176+
// Transaction operations are in its own batch, so we can modify their opCtx.
177+
invariant(entry.getSessionId());
178+
invariant(entry.getTxnNumber());
179+
opCtx->setLogicalSessionId(*entry.getSessionId());
180+
opCtx->setTxnNumber(*entry.getTxnNumber());
181+
182+
// The write on transaction table may be applied concurrently, so refreshing state
183+
// from disk may read that write, causing starting a new transaction on an existing
184+
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
185+
MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
186+
187+
auto transaction = TransactionParticipant::get(opCtx);
188+
invariant(transaction);
189+
transaction.unstashTransactionResources(opCtx, "commitTransaction");
190+
transaction.commitPreparedTransaction(
191+
opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime);
192+
return Status::OK();
193+
}
170194
}
171-
172-
invariant(mode == repl::OplogApplication::Mode::kSecondary);
173-
174-
// Transaction operations are in its own batch, so we can modify their opCtx.
175-
invariant(entry.getSessionId());
176-
invariant(entry.getTxnNumber());
177-
opCtx->setLogicalSessionId(*entry.getSessionId());
178-
opCtx->setTxnNumber(*entry.getTxnNumber());
179-
180-
// The write on transaction table may be applied concurrently, so refreshing state
181-
// from disk may read that write, causing starting a new transaction on an existing
182-
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
183-
MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
184-
185-
auto transaction = TransactionParticipant::get(opCtx);
186-
invariant(transaction);
187-
transaction.unstashTransactionResources(opCtx, "commitTransaction");
188-
transaction.commitPreparedTransaction(
189-
opCtx, *commitCommand.getCommitTimestamp(), commitOplogEntryOpTime);
190-
return Status::OK();
195+
MONGO_UNREACHABLE;
191196
}
192197

193198
Status applyAbortTransaction(OperationContext* opCtx,
194199
const OplogEntry& entry,
195200
repl::OplogApplication::Mode mode) {
196-
// Return error if run via applyOps command.
197-
uassert(50972,
198-
"abortTransaction is only used internally by secondaries.",
199-
mode != repl::OplogApplication::Mode::kApplyOpsCmd);
200-
201-
// We don't put transactions into the prepare state until the end of recovery and initial sync,
202-
// so there is no transaction to abort.
203-
if (mode == repl::OplogApplication::Mode::kRecovering ||
204-
mode == repl::OplogApplication::Mode::kInitialSync) {
205-
return Status::OK();
201+
switch (mode) {
202+
case repl::OplogApplication::Mode::kRecovering: {
203+
// We don't put transactions into the prepare state until the end of recovery,
204+
// so there is no transaction to abort.
205+
return Status::OK();
206+
}
207+
case repl::OplogApplication::Mode::kInitialSync: {
208+
// We don't put transactions into the prepare state until the end of initial sync,
209+
// so there is no transaction to abort.
210+
return Status::OK();
211+
}
212+
case repl::OplogApplication::Mode::kApplyOpsCmd: {
213+
// Return error if run via applyOps command.
214+
uasserted(50972, "abortTransaction is only used internally by secondaries.");
215+
}
216+
case repl::OplogApplication::Mode::kSecondary: {
217+
// Transaction operations are in its own batch, so we can modify their opCtx.
218+
invariant(entry.getSessionId());
219+
invariant(entry.getTxnNumber());
220+
opCtx->setLogicalSessionId(*entry.getSessionId());
221+
opCtx->setTxnNumber(*entry.getTxnNumber());
222+
223+
// The write on transaction table may be applied concurrently, so refreshing state
224+
// from disk may read that write, causing starting a new transaction on an existing
225+
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
226+
MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
227+
228+
auto transaction = TransactionParticipant::get(opCtx);
229+
transaction.unstashTransactionResources(opCtx, "abortTransaction");
230+
transaction.abortActiveTransaction(opCtx);
231+
return Status::OK();
232+
}
206233
}
207-
208-
invariant(mode == repl::OplogApplication::Mode::kSecondary);
209-
210-
// Transaction operations are in its own batch, so we can modify their opCtx.
211-
invariant(entry.getSessionId());
212-
invariant(entry.getTxnNumber());
213-
opCtx->setLogicalSessionId(*entry.getSessionId());
214-
opCtx->setTxnNumber(*entry.getTxnNumber());
215-
// The write on transaction table may be applied concurrently, so refreshing state
216-
// from disk may read that write, causing starting a new transaction on an existing
217-
// txnNumber. Thus, we start a new transaction without refreshing state from disk.
218-
MongoDOperationContextSessionWithoutRefresh sessionCheckout(opCtx);
219-
220-
auto transaction = TransactionParticipant::get(opCtx);
221-
transaction.unstashTransactionResources(opCtx, "abortTransaction");
222-
transaction.abortActiveTransaction(opCtx);
223-
return Status::OK();
234+
MONGO_UNREACHABLE;
224235
}
225236

226237
repl::MultiApplier::Operations readTransactionOperationsFromOplogChain(
@@ -291,7 +302,7 @@ namespace {
291302
*/
292303
Status _applyPrepareTransaction(OperationContext* opCtx,
293304
const OplogEntry& entry,
294-
repl::OplogApplication::Mode oplogApplicationMode) {
305+
repl::OplogApplication::Mode mode) {
295306

296307
// The operations here are reconstructed at their prepare time. However, that time will
297308
// be ignored because there is an outer write unit of work during their application.
@@ -301,8 +312,8 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
301312
return readTransactionOperationsFromOplogChain(opCtx, entry, {}, boost::none);
302313
}();
303314

304-
if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering ||
305-
oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
315+
if (mode == repl::OplogApplication::Mode::kRecovering ||
316+
mode == repl::OplogApplication::Mode::kInitialSync) {
306317
// We might replay a prepared transaction behind oldest timestamp. Note that since this is
307318
// scoped to the storage transaction, and readTransactionOperationsFromOplogChain implicitly
308319
// abandons the storage transaction when it releases the global lock, this must be done
@@ -339,7 +350,7 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
339350
auto transaction = TransactionParticipant::get(opCtx);
340351
transaction.unstashTransactionResources(opCtx, "prepareTransaction");
341352

342-
auto status = _applyOperationsForTransaction(opCtx, ops, oplogApplicationMode);
353+
auto status = _applyOperationsForTransaction(opCtx, ops, mode);
343354
fassert(31137, status);
344355

345356
if (MONGO_FAIL_POINT(applyOpsHangBeforePreparingTransaction)) {
@@ -355,16 +366,21 @@ Status _applyPrepareTransaction(OperationContext* opCtx,
355366
}
356367

357368
/**
358-
* Apply a prepared transaction during recovery.
369+
* Apply a prepared transaction when we are reconstructing prepared transactions.
359370
*/
360-
Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
361-
const OplogEntry& entry,
362-
repl::OplogApplication::Mode mode) {
363-
// Snapshot transactions never conflict with the PBWM lock.
364-
invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
371+
void _reconstructPreparedTransaction(OperationContext* opCtx,
372+
const OplogEntry& prepareEntry,
373+
repl::OplogApplication::Mode mode) {
374+
repl::UnreplicatedWritesBlock uwb(opCtx);
375+
376+
// Snapshot transaction can never conflict with the PBWM lock.
377+
opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
378+
365379
// We might replay a prepared transaction behind oldest timestamp.
366380
opCtx->recoveryUnit()->setRoundUpPreparedTimestamps(true);
367-
return _applyPrepareTransaction(opCtx, entry, mode);
381+
382+
// Checks out the session, applies the operations and prepares the transaction.
383+
uassertStatusOK(_applyPrepareTransaction(opCtx, prepareEntry, mode));
368384
}
369385
} // namespace
370386

@@ -376,33 +392,37 @@ Status applyRecoveredPrepareTransaction(OperationContext* opCtx,
376392
*/
377393
Status applyPrepareTransaction(OperationContext* opCtx,
378394
const OplogEntry& entry,
379-
repl::OplogApplication::Mode oplogApplicationMode) {
380-
// Don't apply the operations from the prepared transaction until either we see a commit
381-
// transaction oplog entry during recovery or are at the end of recovery.
382-
if (oplogApplicationMode == repl::OplogApplication::Mode::kRecovering) {
383-
if (!serverGlobalParams.enableMajorityReadConcern) {
384-
error() << "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
395+
repl::OplogApplication::Mode mode) {
396+
switch (mode) {
397+
case repl::OplogApplication::Mode::kRecovering: {
398+
if (!serverGlobalParams.enableMajorityReadConcern) {
399+
error()
400+
<< "Cannot replay a prepared transaction when 'enableMajorityReadConcern' is "
385401
"set to false. Restart the server with --enableMajorityReadConcern=true "
386402
"to complete recovery.";
387-
}
388-
fassert(51146, serverGlobalParams.enableMajorityReadConcern);
389-
return Status::OK();
390-
}
403+
fassertFailed(51146);
404+
}
391405

392-
// Don't apply the operations from the prepared transaction until either we see a commit
393-
// transaction oplog entry during the oplog application phase of initial sync or are at the end
394-
// of initial sync.
395-
if (oplogApplicationMode == repl::OplogApplication::Mode::kInitialSync) {
396-
return Status::OK();
406+
// Don't apply the operations from the prepared transaction until either we see a commit
407+
// transaction oplog entry during recovery or are at the end of recovery.
408+
return Status::OK();
409+
}
410+
case repl::OplogApplication::Mode::kInitialSync: {
411+
// Don't apply the operations from the prepared transaction until either we see a commit
412+
// transaction oplog entry during the oplog application phase of initial sync or are at
413+
// the end of initial sync.
414+
return Status::OK();
415+
}
416+
case repl::OplogApplication::Mode::kApplyOpsCmd: {
417+
// Return error if run via applyOps command.
418+
uasserted(51145,
419+
"prepare applyOps oplog entry is only used internally by secondaries.");
420+
}
421+
case repl::OplogApplication::Mode::kSecondary: {
422+
return _applyPrepareTransaction(opCtx, entry, repl::OplogApplication::Mode::kSecondary);
423+
}
397424
}
398-
399-
// Return error if run via applyOps command.
400-
uassert(51145,
401-
"prepare applyOps oplog entry is only used internally by secondaries.",
402-
oplogApplicationMode != repl::OplogApplication::Mode::kApplyOpsCmd);
403-
404-
invariant(oplogApplicationMode == repl::OplogApplication::Mode::kSecondary);
405-
return _applyPrepareTransaction(opCtx, entry, oplogApplicationMode);
425+
MONGO_UNREACHABLE;
406426
}
407427

408428
void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplication::Mode mode) {
@@ -443,14 +463,7 @@ void reconstructPreparedTransactions(OperationContext* opCtx, repl::OplogApplica
443463
opCtx->getServiceContext()->makeClient("reconstruct-prepared-transactions");
444464
AlternativeClientRegion acr(newClient);
445465
const auto newOpCtx = cc().makeOperationContext();
446-
repl::UnreplicatedWritesBlock uwb(newOpCtx.get());
447-
448-
// Snapshot transaction can never conflict with the PBWM lock.
449-
newOpCtx->lockState()->setShouldConflictWithSecondaryBatchApplication(false);
450-
451-
// Checks out the session, applies the operations and prepares the transaction.
452-
uassertStatusOK(
453-
applyRecoveredPrepareTransaction(newOpCtx.get(), prepareOplogEntry, mode));
466+
_reconstructPreparedTransaction(newOpCtx.get(), prepareOplogEntry, mode);
454467
}
455468
}
456469
}

0 commit comments

Comments
 (0)