Skip to content

Commit 1d1379a

Browse files
mtandreievergreen
authored andcommitted
SERVER-42996 Move ApplierState to OplogApplier
1 parent 3315355 commit 1d1379a

22 files changed

+180
-132
lines changed

src/mongo/db/repl/SConscript

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,7 @@ env.Library(
722722
'storage_interface_mock.cpp',
723723
],
724724
LIBDEPS=[
725+
'oplog_application_interface',
725726
'oplog_buffer_blocking_queue',
726727
'repl_coordinator_interface',
727728
'repl_settings',

src/mongo/db/repl/oplog_applier.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,16 @@ void OplogApplier::_consume(OperationContext* opCtx, OplogBuffer* oplogBuffer) {
287287
invariant(oplogBuffer->tryPop(opCtx, &opToPopAndDiscard) || inShutdown());
288288
}
289289

290+
OplogApplier::ApplierState OplogApplier::getApplierState() const {
291+
stdx::lock_guard<Latch> lock(_mutex);
292+
return _applierState;
293+
}
294+
295+
void OplogApplier::setApplierState(ApplierState st) {
296+
stdx::lock_guard<Latch> lock(_mutex);
297+
_applierState = st;
298+
}
299+
290300
std::unique_ptr<ThreadPool> makeReplWriterPool() {
291301
return makeReplWriterPool(replWriterThreadCount);
292302
}

src/mongo/db/repl/oplog_applier.h

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,65 @@ class OplogApplier {
190190
StatusWith<OpTime> multiApply(OperationContext* opCtx, Operations ops);
191191

192192
const Options& getOptions() const;
193+
/**
194+
* Step-up
195+
* =======
196+
* On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from
197+
* the perspective of producer and applier, so there's nothing to do with them.
198+
* When a node enters drain mode, producer state = Stopped, applier state = Draining.
199+
*
200+
* If the applier state is Draining, it will signal repl coord when there's nothing to apply.
201+
* The applier goes into Stopped state at the same time.
202+
*
203+
* The states go like the following:
204+
* - secondary and during catchup mode
205+
* (producer: Running, applier: Running)
206+
* |
207+
* | finish catch-up, enter drain mode
208+
* V
209+
* - drain mode
210+
* (producer: Stopped, applier: Draining)
211+
* |
212+
* | applier signals drain is complete
213+
* V
214+
* - primary is in master mode
215+
* (producer: Stopped, applier: Stopped)
216+
*
217+
*
218+
* Step-down
219+
* =========
220+
* The state transitions become:
221+
* - primary is in master mode
222+
* (producer: Stopped, applier: Stopped)
223+
* |
224+
* | step down
225+
* V
226+
* - secondary mode, starting bgsync
227+
* (producer: Starting, applier: Running)
228+
* |
229+
* | bgsync runs start()
230+
* V
231+
* - secondary mode, normal
232+
* (producer: Running, applier: Running)
233+
*
234+
* When a node steps down during draining mode, it's OK to change from (producer: Stopped,
235+
* applier: Draining) to (producer: Starting, applier: Running).
236+
*
237+
* When a node steps down during catchup mode, the states remain the same (producer: Running,
238+
* applier: Running).
239+
*/
240+
enum class ApplierState { Running, Draining, Stopped };
241+
242+
/**
243+
* In normal cases: Running -> Draining -> Stopped -> Running.
244+
* Draining -> Running is also possible if a node steps down during drain mode.
245+
*
246+
* Only the applier can make the transition from Draining to Stopped by calling
247+
* signalDrainComplete().
248+
*/
249+
virtual ApplierState getApplierState() const;
250+
251+
virtual void setApplierState(ApplierState st);
193252

194253
private:
195254
/**
@@ -228,6 +287,8 @@ class OplogApplier {
228287

229288
// Configures this OplogApplier.
230289
const Options _options;
290+
291+
ApplierState _applierState = ApplierState::Running;
231292
};
232293

233294
/**

src/mongo/db/repl/oplog_applier_impl.cpp

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -557,10 +557,7 @@ StatusWith<OpTime> OplogApplierImpl::_multiApply(OperationContext* opCtx,
557557
// entries from the oplog until we finish writing.
558558
Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
559559

560-
// TODO (SERVER-42996): This is a temporary invariant to protect against segfaults. This will
561-
// be removed once ApplierState is moved from ReplicationCoordinator to OplogApplier.
562-
invariant(_replCoord);
563-
if (_replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Stopped) {
560+
if (getApplierState() == ApplierState::Stopped) {
564561
severe() << "attempting to replicate ops while primary";
565562
return {ErrorCodes::CannotApplyOplogWhilePrimary,
566563
"attempting to replicate ops while primary"};

src/mongo/db/repl/opqueue_batcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ void OpQueueBatcher::run() {
154154
// Draining state guarantees the producer has already been fully stopped and no more
155155
// operations will be pushed in to the oplog buffer until the applier state changes.
156156
auto isDraining =
157-
replCoord->getApplierState() == ReplicationCoordinator::ApplierState::Draining;
157+
_oplogApplier->getApplierState() == OplogApplier::ApplierState::Draining;
158158
// Check the oplog buffer after the applier state to ensure the producer is stopped.
159159
if (isDraining && _oplogBuffer->isEmpty()) {
160160
ops.setTermWhenExhausted(termWhenBufferIsEmpty);

src/mongo/db/repl/replication_coordinator.h

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -483,64 +483,6 @@ class ReplicationCoordinator : public SyncSourceSelector {
483483
*/
484484
virtual Status setFollowerModeStrict(OperationContext* opCtx, const MemberState& newState) = 0;
485485

486-
/**
487-
* Step-up
488-
* =======
489-
* On stepup, repl coord enters catch-up mode. It's the same as the secondary mode from
490-
* the perspective of producer and applier, so there's nothing to do with them.
491-
* When a node enters drain mode, producer state = Stopped, applier state = Draining.
492-
*
493-
* If the applier state is Draining, it will signal repl coord when there's nothing to apply.
494-
* The applier goes into Stopped state at the same time.
495-
*
496-
* The states go like the following:
497-
* - secondary and during catchup mode
498-
* (producer: Running, applier: Running)
499-
* |
500-
* | finish catch-up, enter drain mode
501-
* V
502-
* - drain mode
503-
* (producer: Stopped, applier: Draining)
504-
* |
505-
* | applier signals drain is complete
506-
* V
507-
* - primary is in master mode
508-
* (producer: Stopped, applier: Stopped)
509-
*
510-
*
511-
* Step-down
512-
* =========
513-
* The state transitions become:
514-
* - primary is in master mode
515-
* (producer: Stopped, applier: Stopped)
516-
* |
517-
* | step down
518-
* V
519-
* - secondary mode, starting bgsync
520-
* (producer: Starting, applier: Running)
521-
* |
522-
* | bgsync runs start()
523-
* V
524-
* - secondary mode, normal
525-
* (producer: Running, applier: Running)
526-
*
527-
* When a node steps down during draining mode, it's OK to change from (producer: Stopped,
528-
* applier: Draining) to (producer: Starting, applier: Running).
529-
*
530-
* When a node steps down during catchup mode, the states remain the same (producer: Running,
531-
* applier: Running).
532-
*/
533-
enum class ApplierState { Running, Draining, Stopped };
534-
535-
/**
536-
* In normal cases: Running -> Draining -> Stopped -> Running.
537-
* Draining -> Running is also possible if a node steps down during drain mode.
538-
*
539-
* Only the applier can make the transition from Draining to Stopped by calling
540-
* signalDrainComplete().
541-
*/
542-
virtual ApplierState getApplierState() = 0;
543-
544486
/**
545487
* Signals that a previously requested pause and drain of the applier buffer
546488
* has completed.

src/mongo/db/repl/replication_coordinator_external_state.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535

3636
#include "mongo/bson/timestamp.h"
3737
#include "mongo/db/repl/member_state.h"
38+
#include "mongo/db/repl/oplog_applier.h"
3839
#include "mongo/db/repl/optime.h"
3940
#include "mongo/executor/task_executor.h"
4041
#include "mongo/util/concurrency/thread_pool.h"
@@ -300,6 +301,16 @@ class ReplicationCoordinatorExternalState {
300301
*/
301302
virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const = 0;
302303

304+
/*
305+
* Returns the OplogApplier's current state.
306+
*/
307+
virtual OplogApplier::ApplierState getApplierState() const = 0;
308+
309+
/*
310+
* Updates the OplogApplier's current state.
311+
*/
312+
virtual void setApplierState(const OplogApplier::ApplierState st) = 0;
313+
303314
/*
304315
* Creates noop writer instance. Setting the _noopWriter member is not protected by a guard,
305316
* hence it must be called before multi-threaded operations start.

src/mongo/db/repl/replication_coordinator_external_state_impl.cpp

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -910,6 +910,15 @@ std::size_t ReplicationCoordinatorExternalStateImpl::getOplogFetcherInitialSyncM
910910
return oplogFetcherInitialSyncMaxFetcherRestarts.load();
911911
}
912912

913+
914+
OplogApplier::ApplierState ReplicationCoordinatorExternalStateImpl::getApplierState() const {
915+
return _oplogApplier.get()->getApplierState();
916+
}
917+
918+
void ReplicationCoordinatorExternalStateImpl::setApplierState(const OplogApplier::ApplierState st) {
919+
_oplogApplier.get()->setApplierState(st);
920+
}
921+
913922
JournalListener::Token ReplicationCoordinatorExternalStateImpl::getToken() {
914923
return repl::ReplicationCoordinator::get(_service)->getMyLastAppliedOpTimeAndWallTime();
915924
}

src/mongo/db/repl/replication_coordinator_external_state_impl.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ class ReplicationCoordinatorExternalStateImpl final : public ReplicationCoordina
108108
virtual bool isReadConcernSnapshotSupportedByStorageEngine(OperationContext* opCtx) const;
109109
virtual std::size_t getOplogFetcherSteadyStateMaxFetcherRestarts() const override;
110110
virtual std::size_t getOplogFetcherInitialSyncMaxFetcherRestarts() const override;
111+
virtual OplogApplier::ApplierState getApplierState() const override;
112+
virtual void setApplierState(const OplogApplier::ApplierState st) override;
111113

112114
// Methods from JournalListener.
113115
virtual JournalListener::Token getToken();

src/mongo/db/repl/replication_coordinator_external_state_mock.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,31 @@
4646

4747
namespace mongo {
4848
namespace repl {
49+
namespace {
50+
/**
51+
* Minimal implementation of OplogApplier for testing.
52+
*/
53+
class OplogApplierMock : public OplogApplier {
54+
OplogApplierMock(const OplogApplierMock&) = delete;
55+
OplogApplierMock& operator=(const OplogApplierMock&) = delete;
56+
57+
public:
58+
explicit OplogApplierMock(OplogApplier::Options options);
59+
60+
void _run(OplogBuffer* oplogBuffer) final;
61+
StatusWith<OpTime> _multiApply(OperationContext* opCtx, Operations ops) final;
62+
};
63+
64+
OplogApplierMock::OplogApplierMock(OplogApplier::Options options)
65+
: OplogApplier(nullptr, nullptr, nullptr, options) {}
66+
67+
void OplogApplierMock::_run(OplogBuffer* oplogBuffer) {}
4968

69+
StatusWith<OpTime> OplogApplierMock::_multiApply(OperationContext* opCtx, Operations ops) {
70+
return OpTime();
71+
}
72+
73+
} // namespace
5074
ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock()
5175
: _localRsConfigDocument(ErrorCodes::NoMatchingDocument, "No local config document"),
5276
_localRsLastVoteDocument(ErrorCodes::NoMatchingDocument, "No local lastVote document"),
@@ -57,7 +81,9 @@ ReplicationCoordinatorExternalStateMock::ReplicationCoordinatorExternalStateMock
5781
_storeLocalLastVoteDocumentStatus(Status::OK()),
5882
_storeLocalLastVoteDocumentShouldHang(false),
5983
_connectionsClosed(false),
60-
_threadsStarted(false) {}
84+
_threadsStarted(false),
85+
_oplogApplier(std::make_unique<OplogApplierMock>(
86+
repl::OplogApplier::Options(repl::OplogApplication::Mode::kSecondary))) {}
6187

6288
ReplicationCoordinatorExternalStateMock::~ReplicationCoordinatorExternalStateMock() {}
6389

@@ -293,6 +319,14 @@ void ReplicationCoordinatorExternalStateMock::setIsReadCommittedEnabled(bool val
293319
_isReadCommittedSupported = val;
294320
}
295321

322+
void ReplicationCoordinatorExternalStateMock::setApplierState(const OplogApplier::ApplierState st) {
323+
_oplogApplier->setApplierState(st);
324+
}
325+
326+
OplogApplier::ApplierState ReplicationCoordinatorExternalStateMock::getApplierState() const {
327+
return _oplogApplier->getApplierState();
328+
}
329+
296330
void ReplicationCoordinatorExternalStateMock::onDrainComplete(OperationContext* opCtx) {}
297331

298332
OpTime ReplicationCoordinatorExternalStateMock::onTransitionToPrimary(OperationContext* opCtx) {

0 commit comments

Comments
 (0)