-
Notifications
You must be signed in to change notification settings - Fork 26
Expand file tree
/
Copy pathraft_repl_dev.cpp
More file actions
1668 lines (1447 loc) · 81 KB
/
raft_repl_dev.cpp
File metadata and controls
1668 lines (1447 loc) · 81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include <flatbuffers/idl.h>
#include <flatbuffers/minireflect.h>
#include <folly/executors/InlineExecutor.h>
#include <iomgr/iomgr_flip.hpp>
#include <boost/lexical_cast.hpp>
#include <boost/uuid/nil_generator.hpp>
#include <sisl/fds/buffer.hpp>
#include <sisl/grpc/generic_service.hpp>
#include <sisl/grpc/rpc_client.hpp>
#include <sisl/fds/vector_pool.hpp>
#include <homestore/blkdata_service.hpp>
#include <homestore/logstore_service.hpp>
#include <homestore/superblk_handler.hpp>
#include "common/homestore_assert.hpp"
#include "common/homestore_config.hpp"
// #include "common/homestore_flip.hpp"
#include "replication/service/raft_repl_service.h"
#include "replication/repl_dev/raft_repl_dev.h"
#include "device/device.h"
#include "push_data_rpc_generated.h"
#include "fetch_data_rpc_generated.h"
namespace homestore {
std::atomic< uint64_t > RaftReplDev::s_next_group_ordinal{1};
RaftReplDev::RaftReplDev(RaftReplService& svc, superblk< raft_repl_dev_superblk >&& rd_sb, bool load_existing) :
m_repl_svc{svc},
m_msg_mgr{svc.msg_manager()},
m_group_id{rd_sb->group_id},
m_my_repl_id{svc.get_my_repl_uuid()},
m_raft_server_id{nuraft_mesg::to_server_id(m_my_repl_id)},
m_rd_sb{std::move(rd_sb)},
m_metrics{fmt::format("{}_{}", group_id_str(), m_raft_server_id).c_str()} {
m_state_machine = std::make_shared< RaftStateMachine >(*this);
if (load_existing) {
m_data_journal = std::make_shared< ReplLogStore >(
*this, *m_state_machine, m_rd_sb->logdev_id, m_rd_sb->logstore_id,
[this](logstore_seq_num_t lsn, log_buffer buf, void* key) { on_log_found(lsn, buf, key); },
[this](std::shared_ptr< HomeLogStore > hs, logstore_seq_num_t lsn) {
m_log_store_replay_done = true;
set_log_store_last_durable_lsn(hs->tail_lsn());
});
m_next_dsn = m_rd_sb->last_applied_dsn + 1;
m_commit_upto_lsn = m_rd_sb->durable_commit_lsn;
m_last_flushed_commit_lsn = m_commit_upto_lsn;
m_compact_lsn = m_rd_sb->compact_lsn;
m_rdev_name = fmt::format("rdev{}", m_rd_sb->group_ordinal);
// Its ok not to do compare exchange, because loading is always single threaded as of now
if (m_rd_sb->group_ordinal >= s_next_group_ordinal.load()) {
s_next_group_ordinal.store(m_rd_sb->group_ordinal + 1);
}
if (m_rd_sb->is_timeline_consistent) {
logstore_service()
.open_log_store(m_rd_sb->logdev_id, m_rd_sb->free_blks_journal_id, false)
.thenValue([this](auto log_store) {
m_free_blks_journal = std::move(log_store);
m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id();
});
}
} else {
m_data_journal = std::make_shared< ReplLogStore >(*this, *m_state_machine);
m_rd_sb->logdev_id = m_data_journal->logdev_id();
m_rd_sb->logstore_id = m_data_journal->logstore_id();
m_rd_sb->last_applied_dsn = 0;
m_rd_sb->destroy_pending = 0x0;
m_rd_sb->group_ordinal = s_next_group_ordinal.fetch_add(1);
m_rdev_name = fmt::format("rdev{}", m_rd_sb->group_ordinal);
if (m_rd_sb->is_timeline_consistent) {
m_free_blks_journal = logstore_service().create_new_log_store(m_rd_sb->logdev_id, false /* append_mode */);
m_rd_sb->free_blks_journal_id = m_free_blks_journal->get_store_id();
}
m_rd_sb.write();
bind_data_service();
}
RD_LOG(INFO,
"Started {} RaftReplDev group_id={}, replica_id={}, raft_server_id={} commited_lsn={}, "
"compact_lsn={}, checkpoint_lsn:{}, next_dsn={} "
"log_dev={} log_store={}",
(load_existing ? "Existing" : "New"), group_id_str(), my_replica_id_str(), m_raft_server_id,
m_commit_upto_lsn.load(), m_compact_lsn.load(), m_rd_sb->checkpoint_lsn, m_next_dsn.load(),
m_rd_sb->logdev_id, m_rd_sb->logstore_id);
}
bool RaftReplDev::bind_data_service() {
RD_LOG(INFO, "Starting data channel, group_id={}, replica_id={}", group_id_str(), my_replica_id_str());
bool success = false;
#ifdef _PRERELEASE
success =
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, [this](intrusive< sisl::GenericRpcData >& rpc_data) {
if (iomgr_flip::instance()->delay_flip("slow_down_data_channel", [this, rpc_data]() mutable {
RD_LOGI("Resuming after slow down data channel flip");
on_push_data_received(rpc_data);
})) {
RD_LOGI("Slow down data channel flip is enabled, scheduling to call later");
} else {
on_push_data_received(rpc_data);
}
});
#else
success =
m_msg_mgr.bind_data_service_request(PUSH_DATA, m_group_id, bind_this(RaftReplDev::on_push_data_received, 1));
#endif
if (!success) {
RD_LOGE("Failed to bind data service request for PUSH_DATA");
return false;
}
success =
m_msg_mgr.bind_data_service_request(FETCH_DATA, m_group_id, bind_this(RaftReplDev::on_fetch_data_received, 1));
if (!success) {
RD_LOGE("Failed to bind data service request for FETCH_DATA");
return false;
}
return true;
}
bool RaftReplDev::join_group() {
bind_data_service();
auto raft_result =
m_msg_mgr.join_group(m_group_id, "homestore_replication",
std::dynamic_pointer_cast< nuraft_mesg::mesg_state_mgr >(shared_from_this()));
if (!raft_result) {
HS_DBG_ASSERT(false, "Unable to join the group_id={} with error={}", group_id_str(), raft_result.error());
return false;
}
return true;
}
AsyncReplResult<> RaftReplDev::replace_member(const replica_member_info& member_out,
const replica_member_info& member_in, uint32_t commit_quorum) {
if (is_stopping()) {
LOGINFO("repl dev is being shutdown!");
return make_async_error<>(ReplServiceError::STOPPING);
}
incr_pending_request_num();
LOGINFO("Replace member group_id={} member_out={} member_in={}", group_id_str(),
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
if (commit_quorum >= 1) {
// Two members are down and leader cant form the quorum. Reduce the quorum size.
reset_quorum_size(commit_quorum);
}
// Step 1: Check if leader itself is requested to move out.
if (m_my_repl_id == member_out.id && m_my_repl_id == get_leader_id()) {
// If leader is the member requested to move out, then give up leadership and return error.
// Client will retry replace_member request to the new leader.
raft_server()->yield_leadership(true /* immediate */, -1 /* successor */);
RD_LOGI("Replace member leader is the member_out so yield leadership");
reset_quorum_size(0);
decr_pending_request_num();
return make_async_error<>(ReplServiceError::NOT_LEADER);
}
// Step 2. Add the new member.
return m_msg_mgr.add_member(m_group_id, member_in.id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_in, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> {
// TODO Currently we ignore the cancelled, fix nuraft_mesg to not timeout
// when adding member. Member is added to cluster config until member syncs fully
// with atleast stop gap. This will take a lot of time for block or
// object storage.
if (e.hasError()) {
// Ignore the server already exists as server already added to the cluster.
// The pg member change requests from control path are idemepotent and request
// can be resend and one of the add or remove can failed and has to retried.
if (e.error() == nuraft::cmd_result_code::CANCELLED ||
e.error() == nuraft::cmd_result_code::SERVER_ALREADY_EXISTS) {
RD_LOGW("Ignoring error returned from nuraft add_member {}", e.error());
} else {
RD_LOGE("Replace member error in add member : {}", e.error());
reset_quorum_size(0);
decr_pending_request_num();
return make_async_error<>(RaftReplService::to_repl_error(e.error()));
}
}
RD_LOGI("Replace member added member={} to group_id={}", boost::uuids::to_string(member_in.id),
group_id_str());
// Step 3. Append log entry to mark the old member is out and new member is added.
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
replace_members_ctx members;
members.replica_out = member_out;
members.replica_in = member_in;
sisl::blob header(r_cast< uint8_t* >(&members), sizeof(replace_members_ctx));
rreq->init(
repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_REPLACE, true, header, sisl::blob{}, 0, m_listener);
auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
LOGERROR("Replace member propose to raft failed {}", err);
reset_quorum_size(0);
decr_pending_request_num();
return make_async_error<>(std::move(err));
}
RD_LOGI("Replace member proposed to raft group_id={}", group_id_str());
// Step 4. Remove the old member. Even if the old member is temporarily
// down and recovers, nuraft mesg see member remove from cluster log
// entry and call exit_group() and leave().
return m_msg_mgr.rem_member(m_group_id, member_out.id)
.via(&folly::InlineExecutor::instance())
.thenValue([this, member_out, commit_quorum](auto&& e) -> AsyncReplResult<> {
if (e.hasError()) {
// Ignore the server not found as server removed from the cluster
// as requests are idempotent and can be resend.
if (e.error() == nuraft::cmd_result_code::SERVER_NOT_FOUND) {
RD_LOGW("Remove member not found in group error, ignoring");
} else {
// Its ok to retry this request as the request
// of replace member is idempotent.
RD_LOGE("Replace member failed to remove member : {}", e.error());
reset_quorum_size(0);
decr_pending_request_num();
return make_async_error<>(ReplServiceError::RETRY_REQUEST);
}
} else {
RD_LOGI("Replace member removed member={} from group_id={}",
boost::uuids::to_string(member_out.id), group_id_str());
}
// Revert the quorum size back to 0.
reset_quorum_size(0);
decr_pending_request_num();
return make_async_success<>();
});
});
}
void RaftReplDev::reset_quorum_size(uint32_t commit_quorum) {
RD_LOGI("Reset raft quorum size={}", commit_quorum);
nuraft::raft_params params = raft_server()->get_current_params();
params.with_custom_commit_quorum_size(commit_quorum);
params.with_custom_election_quorum_size(commit_quorum);
raft_server()->update_params(params);
}
folly::SemiFuture< ReplServiceError > RaftReplDev::destroy_group() {
// Set the intent to destroy the group
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::DESTROYING; });
// Propose to the group to destroy
auto rreq = repl_req_ptr_t(new repl_req_ctx{});
// if we have a rreq {originator=1, term=1, dsn=0, lsn=7} in follower and a baseline resync is triggerd before the
// rreq is committed in the follower, then the on_commit of the rreq will not be called and as a result this rreq
// will become a garbage rreq in this follower. now if we trigger a destroy_group, a new rreq {originator=1, term=1,
// dsn=0} will created in the follower since the default dsn of a repl_key is 0.after the log of this rreq is
// appended to log store and get a new lsn, if we link the new lsn to the old rreq (rreq is identified by
// {originator, term, dsn}) which has alread have a lsn, then a assert will be throw out. pls refer to
// repl_req_ctx::set_lsn
// here, we set the dsn to a new one , which is definitely unique in the follower, so that the new rreq will not
// have a conflict with the old rreq.
rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
journal_type_t::HS_CTRL_DESTROY, true, sisl::blob{}, sisl::blob{}, 0, m_listener);
auto err = m_state_machine->propose_to_raft(std::move(rreq));
if (err != ReplServiceError::OK) {
m_stage.update([](auto* stage) { *stage = repl_dev_stage_t::ACTIVE; });
return folly::makeSemiFuture< ReplServiceError >(std::move(err));
LOGERROR("RaftReplDev::destroy_group failed {}", err);
}
LOGINFO("Raft repl dev destroy_group={}", group_id_str());
return m_destroy_promise.getSemiFuture();
}
void RaftReplDev::use_config(json_superblk raft_config_sb) { m_raft_config_sb = std::move(raft_config_sb); }
void RaftReplDev::on_create_snapshot(nuraft::snapshot& s, nuraft::async_result< bool >::handler_type& when_done) {
RD_LOG(DEBUG, "create_snapshot last_idx={}/term={}", s.get_last_log_idx(), s.get_last_log_term());
auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s);
auto result = m_listener->create_snapshot(snp_ctx).get();
auto null_except = std::shared_ptr< std::exception >();
HS_REL_ASSERT(result.hasError() == false, "Not expecting creating snapshot to return false. ");
auto ret_val{true};
if (when_done) { when_done(ret_val, null_except); }
}
// 1 before repl_dev.stop() is called, the upper layer should make sure that there is no pending request. so graceful
// shutdown can consider when stopping repl_dev, there is no pending request.
// 2 before the log is appended to log store, repl_dev will guarantee the corresponding data is persisted on disk. so
// even if we do not care about this when stop, it will be ok, since log will replayed after restart.
// we do not have shutdown for async_alloc_write according to the two points above.
void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& data,
repl_req_ptr_t rreq) {
if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); }
{
auto const guard = m_stage.access();
if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) {
RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage));
handle_error(rreq,
(stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING
: ReplServiceError::SERVER_IS_LEAVING);
return;
}
}
auto status = rreq->init(
repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true /* is_proposer */, header,
key, data.size, m_listener);
// Add the request to the repl_dev_rreq map, it will be accessed throughout the life cycle of this request
auto const [it, happened] = m_repl_key_req_map.emplace(rreq->rkey(), rreq);
RD_DBG_ASSERT(happened, "Duplicate repl_key={} found in the map", rreq->rkey().to_string());
if (status != ReplServiceError::OK) {
RD_LOGD("Initializing rreq failed error={}, failing this req", status);
handle_error(rreq, status);
return;
}
// If it is header only entry, directly propose to the raft
if (rreq->has_linked_data()) {
if (rreq->is_proposer() && rreq->has_state(repl_req_state_t::DATA_COMMITTED)) {
RD_LOGD("data blks has already been allocated and committed, failing this req");
handle_error(rreq, ReplServiceError::DATA_DUPLICATED);
return;
}
push_data_to_all_followers(rreq, data);
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
COUNTER_INCREMENT(m_metrics, outstanding_data_write_cnt, 1);
auto const data_write_start_time = Clock::now();
// Write the data
data_service()
.async_write(data, rreq->local_blkid())
.thenValue([this, rreq, data_write_start_time](auto&& err) {
// update outstanding no matter error or not;
COUNTER_DECREMENT(m_metrics, outstanding_data_write_cnt, 1);
if (err) {
HS_DBG_ASSERT(false, "Error in writing data, err_code={}", err.value());
handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR);
} else {
// update metrics for originated rreq;
const auto write_num_pieces = rreq->local_blkid().num_pieces();
HISTOGRAM_OBSERVE(m_metrics, rreq_pieces_per_write, write_num_pieces);
HISTOGRAM_OBSERVE(m_metrics, rreq_data_write_latency_us,
get_elapsed_time_us(data_write_start_time));
HISTOGRAM_OBSERVE(m_metrics, rreq_total_data_write_latency_us,
get_elapsed_time_us(rreq->created_time()));
auto raft_status = m_state_machine->propose_to_raft(rreq);
if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); }
}
});
} else {
RD_LOGD("Skipping data channel send since value size is 0");
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
auto raft_status = m_state_machine->propose_to_raft(rreq);
if (raft_status != ReplServiceError::OK) { handle_error(rreq, raft_status); }
}
}
void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list const& data) {
auto& builder = rreq->create_fb_builder();
// Prepare the rpc request packet with all repl_reqs details
builder.FinishSizePrefixed(CreatePushDataRequest(
builder, server_id(), rreq->term(), rreq->dsn(),
builder.CreateVector(rreq->header().cbytes(), rreq->header().size()),
builder.CreateVector(rreq->key().cbytes(), rreq->key().size()), data.size, get_time_since_epoch_ms()));
rreq->m_pkts = sisl::io_blob::sg_list_to_ioblob_list(data);
rreq->m_pkts.insert(rreq->m_pkts.begin(), sisl::io_blob{builder.GetBufferPointer(), builder.GetSize(), false});
/*RD_LOGI("Data Channel: Pushing data to all followers: rreq=[{}] data=[{}]", rreq->to_string(),
flatbuffers::FlatBufferToString(builder.GetBufferPointer() + sizeof(flatbuffers::uoffset_t),
PushDataRequestTypeTable()));*/
auto peers = get_active_peers();
auto calls = std::vector< nuraft_mesg::NullAsyncResult >();
for (auto peer : peers) {
RD_LOGD("Data Channel: Pushing data to follower {}, rreq=[{}]", peer, rreq->to_string());
calls.push_back(group_msg_service()
->data_service_request_unidirectional(peer, PUSH_DATA, rreq->m_pkts)
.via(&folly::InlineExecutor::instance()));
}
folly::collectAllUnsafe(calls).thenValue([this, rreq](auto&& v_res) {
for (auto const& res : v_res) {
if (sisl_likely(res.value())) {
auto r = res.value();
if (r.hasError()) {
// Just logging PushData error, no action is needed as follower can try by fetchData.
RD_LOGW("Data Channel: Error in pushing data to all followers: rreq=[{}] error={}",
rreq->to_string(), r.error());
}
}
}
RD_LOGD("Data Channel: Data push completed for rreq=[{}]", rreq->to_string());
// Release the buffer which holds the packets
rreq->release_fb_builder();
rreq->m_pkts.clear();
});
}
void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
auto const push_data_rcv_time = Clock::now();
auto const& incoming_buf = rpc_data->request_blob();
if (!incoming_buf.cbytes()) {
RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call");
rpc_data->send_response();
return;
}
auto const fb_size =
flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t);
auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes());
if (fb_size + push_req->data_size() != incoming_buf.size()) {
RD_LOGW("Data Channel: PushData received with size mismatch, header size {}, data size {}, received size {}",
fb_size, push_req->data_size(), incoming_buf.size());
rpc_data->send_response();
return;
}
sisl::blob header = sisl::blob{push_req->user_header()->Data(), push_req->user_header()->size()};
sisl::blob key = sisl::blob{push_req->user_key()->Data(), push_req->user_key()->size()};
repl_key rkey{.server_id = push_req->issuer_replica_id(), .term = push_req->raft_term(), .dsn = push_req->dsn()};
auto const req_orig_time_ms = push_req->time_ms();
RD_LOGD("Data Channel: PushData received: time diff={} ms.", get_elapsed_time_ms(req_orig_time_ms));
#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("drop_push_data_request")) {
LOGINFO("Data Channel: Flip is enabled, skip on_push_data_received to simulate fetch remote data, "
"server_id={}, term={}, dsn={}",
push_req->issuer_replica_id(), push_req->raft_term(), push_req->dsn());
return;
}
#endif
auto rreq = applier_create_req(rkey, journal_type_t::HS_DATA_LINKED, header, key, push_req->data_size(),
true /* is_data_channel */);
if (rreq == nullptr) {
RD_LOG(ERROR,
"Data Channel: Creating rreq on applier has failed, will ignore the push and let Raft channel send "
"trigger a fetch explicitly if needed. rkey={}",
rkey.to_string());
return;
}
if (!rreq->save_pushed_data(rpc_data, incoming_buf.cbytes() + fb_size, push_req->data_size())) {
RD_LOGD("Data Channel: Data already received for rreq=[{}], ignoring this data", rreq->to_string());
return;
}
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
COUNTER_INCREMENT(m_metrics, outstanding_data_write_cnt, 1);
// Schedule a write and upon completion, mark the data as written.
data_service()
.async_write(r_cast< const char* >(rreq->data()), push_req->data_size(), rreq->local_blkid())
.thenValue([this, rreq, push_data_rcv_time](auto&& err) {
// update outstanding no matter error or not;
COUNTER_DECREMENT(m_metrics, outstanding_data_write_cnt, 1);
if (err) {
COUNTER_INCREMENT(m_metrics, write_err_cnt, 1);
RD_DBG_ASSERT(false, "Error in writing data, error_code={}", err.value());
handle_error(rreq, ReplServiceError::DRIVE_WRITE_ERROR);
} else {
rreq->release_data();
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
const auto data_log_diff_us =
push_data_rcv_time.time_since_epoch().count() > rreq->created_time().time_since_epoch().count()
? get_elapsed_time_us(rreq->created_time(), push_data_rcv_time)
: get_elapsed_time_us(push_data_rcv_time, rreq->created_time());
auto const data_write_latency = get_elapsed_time_us(push_data_rcv_time);
auto const total_data_write_latency = get_elapsed_time_us(rreq->created_time());
auto const write_num_pieces = rreq->local_blkid().num_pieces();
HISTOGRAM_OBSERVE(m_metrics, rreq_pieces_per_write, write_num_pieces);
HISTOGRAM_OBSERVE(m_metrics, rreq_push_data_latency_us, data_write_latency);
HISTOGRAM_OBSERVE(m_metrics, rreq_total_data_write_latency_us, total_data_write_latency);
RD_LOGD("Data Channel: Data write completed for rreq=[{}], time_diff_data_log_us={}, "
"data_write_latency_us={}, total_data_write_latency_us(rreq creation to write complete)={}, "
"local_blkid.num_pieces={}",
rreq->to_string(), data_log_diff_us, data_write_latency, total_data_write_latency,
write_num_pieces);
}
});
}
repl_req_ptr_t RaftReplDev::applier_create_req(repl_key const& rkey, journal_type_t code, sisl::blob const& user_header,
sisl::blob const& key, uint32_t data_size,
[[maybe_unused]] bool is_data_channel) {
auto const [it, happened] = m_repl_key_req_map.try_emplace(rkey, repl_req_ptr_t(new repl_req_ctx()));
RD_DBG_ASSERT((it != m_repl_key_req_map.end()), "Unexpected error in map_repl_key_to_req");
auto rreq = it->second;
if (!happened) {
// We already have the entry in the map, reset its start time to prevent it from being incorrectly gc during
// use.
rreq->set_created_time();
// Check if we are already allocated the blk by previous caller, in that case we need to return the req.
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
// Do validation if we have the correct mapping
// RD_REL_ASSERT(blob_equals(user_header, rreq->header), "User header mismatch for repl_key={}",
// rkey.to_string());
// RD_REL_ASSERT(blob_equals(user_key, rreq->key), "User key mismatch for repl_key={}", rkey.to_string());
RD_LOGD("Repl_key=[{}] already received ", rkey.to_string());
return rreq;
}
}
// rreq->init will allocate the block if it has linked data.
auto status = rreq->init(rkey, code, false /* is_proposer */, user_header, key, data_size, m_listener);
if (!rreq->has_linked_data()) { return rreq; }
#ifdef _PRERELEASE
if (is_data_channel) {
if (iomgr_flip::instance()->test_flip("fake_reject_append_data_channel")) {
LOGINFO("Data Channel: Reject append_entries flip is triggered for rkey={}", rkey.to_string());
status = ReplServiceError::NO_SPACE_LEFT;
}
} else {
if (iomgr_flip::instance()->test_flip("fake_reject_append_raft_channel")) {
LOGINFO("Raft Channel: Reject append_entries flip is triggered for rkey={}", rkey.to_string());
status = ReplServiceError::NO_SPACE_LEFT;
}
}
#endif
if (status != ReplServiceError::OK) {
RD_LOGD("For Repl_key=[{}] alloc hints returned error={}, failing this req", rkey.to_string(), status);
// Do not call handle_error here, because handle_error is for rreq which needs to be terminated. This one can be
// retried.
return nullptr;
}
RD_LOGD("in follower_create_req: rreq={}, addr={}", rreq->to_string(), reinterpret_cast< uintptr_t >(rreq.get()));
return rreq;
}
folly::Future< folly::Unit > RaftReplDev::notify_after_data_written(std::vector< repl_req_ptr_t >* rreqs) {
std::vector< folly::Future< folly::Unit > > futs;
futs.reserve(rreqs->size());
std::vector< repl_req_ptr_t > unreceived_data_reqs;
// Walk through the list of requests and wait for the data to be received and written
for (auto const& rreq : *rreqs) {
if (!rreq->has_linked_data()) { continue; }
auto const status = uint32_cast(rreq->state());
if (status & uint32_cast(repl_req_state_t::DATA_WRITTEN)) {
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
continue;
}
if (!(status & uint32_cast(repl_req_state_t::DATA_RECEIVED))) {
// This is a relatively rare scenario which can happen, where the data is not received or localized yet,
// because it was called as part of pack/unpack i.e bulk data transfer for a new replica. For these
// cases, the first step of localization doesn't happen (because raft isn't going to give us
// append_entry handler callback). Hence we do that step of receiving data now. The same scenario can
// happen in case of leader is not the propose (i.e raft forwarding is enabled)
unreceived_data_reqs.emplace_back(rreq);
} else {
futs.emplace_back(rreq->m_data_written_promise.getFuture());
}
}
if (!unreceived_data_reqs.empty()) {
// Wait 10 times the actual data fetch timeout during normal scenario. We do that because, unlike in normal
// scenario where can reject the append and let raft retry the log_entry send (which will cause fetch
// retry), this flow can't fail or reject. So only option is to wait longer and if it fails, either a) Crash
// this node and let addition of new raft server fail (or) b) Make this node is unavailable for read and
// allow it to write (which is an incorrect data) and let remediation flow, replace this node. This atleast
// allow other repl_dev's in the system accessibly instead of crashing the entire node.
//
// TODO: We are doing option a) now, but we should support option b)
if (!wait_for_data_receive(unreceived_data_reqs, HS_DYNAMIC_CONFIG(consensus.data_receive_timeout_ms) * 10)) {
HS_REL_ASSERT(false, "Data fetch timeout, should not happen");
}
for (auto const& rreq : unreceived_data_reqs) {
futs.emplace_back(rreq->m_data_written_promise.getFuture());
}
}
// All the entries are done already, no need to wait
if (futs.size() == 0) { return folly::makeFuture< folly::Unit >(folly::Unit{}); }
return folly::collectAllUnsafe(futs).thenValue([this, rreqs](auto&& e) {
#ifndef NDEBUG
for (auto const& rreq : *rreqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data())) { continue; }
HS_DBG_ASSERT(rreq->has_state(repl_req_state_t::DATA_WRITTEN),
"Data written promise raised without updating DATA_WRITTEN state for rkey={}",
rreq->rkey().to_string());
RD_LOGD("Raft Channel: Data write completed and blkid mapped: rreq=[{}]", rreq->to_string());
}
#endif
RD_LOGT("Data Channel: {} pending reqs's data are written", rreqs->size());
return folly::makeFuture< folly::Unit >(folly::Unit{});
});
}
bool RaftReplDev::wait_for_data_receive(std::vector< repl_req_ptr_t > const& rreqs, uint64_t timeout_ms,
std::vector< repl_req_ptr_t >* timeout_rreqs) {
std::vector< folly::Future< folly::Unit > > futs;
std::vector< repl_req_ptr_t > only_wait_reqs;
only_wait_reqs.reserve(rreqs.size());
futs.reserve(rreqs.size());
for (auto const& rreq : rreqs) {
if ((rreq == nullptr) || (!rreq->has_linked_data()) || (rreq->has_state(repl_req_state_t::DATA_RECEIVED))) {
continue;
}
only_wait_reqs.emplace_back(rreq);
futs.emplace_back(rreq->m_data_received_promise.getFuture());
}
// All the data has been received already, no need to wait
if (futs.size() == 0) { return true; }
// If we are currently in resync mode, we can fetch the data immediately. Otherwise, stage it and wait for
// sometime before do an explicit fetch. This is so that, it is possible raft channel has come ahead of data
// channel and waiting for sometime avoid expensive fetch. On steady state, after a little bit of wait data
// would be reached automatically.
RD_LOG(DEBUG,
"We haven't received data for {} out {} in reqs batch, will fetch and wait for {} ms, in_resync_mode()={} ",
only_wait_reqs.size(), rreqs.size(), timeout_ms, is_resync_mode());
// We are yet to support reactive fetch from remote.
if (is_resync_mode()) {
check_and_fetch_remote_data(only_wait_reqs);
} else {
m_repl_svc.add_to_fetch_queue(shared_from_this(), only_wait_reqs);
}
// block waiting here until all the futs are ready (data channel filled in and promises are made);
auto all_futs_ready = folly::collectAllUnsafe(futs).wait(std::chrono::milliseconds(timeout_ms)).isReady();
if (!all_futs_ready && timeout_rreqs != nullptr) {
timeout_rreqs->clear();
for (size_t i{0}; i < futs.size(); ++i) {
if (!futs[i].isReady()) { timeout_rreqs->emplace_back(only_wait_reqs[i]); }
}
all_futs_ready = timeout_rreqs->empty();
}
return all_futs_ready;
}
void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreqs) {
auto total_size_to_fetch = 0ul;
std::vector< repl_req_ptr_t > next_batch_rreqs;
auto const max_batch_size = HS_DYNAMIC_CONFIG(consensus.data_fetch_max_size_kb) * 1024ull;
auto const originator = rreqs.front()->remote_blkid().server_id;
for (auto const& rreq : rreqs) {
auto const cur_state = uint32_cast(rreq->state());
if (cur_state == uint32_cast(repl_req_state_t::ERRORED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: rreq=[{}] already errored out, ignoring the fetch", rreq->to_string());
continue;
} else if (cur_state == uint32_cast(repl_req_state_t::DATA_RECEIVED)) {
// We already received the data before, just ignore this data
RD_LOGD("Raft Channel: Data already received for rreq=[{}], ignoring the fetch", rreq->to_string());
continue;
}
RD_REL_ASSERT_EQ(
rreq->remote_blkid().server_id, originator,
"Batch of remote pull has different originator, not expected, continuing can cause data corruption");
auto const size = rreq->remote_blkid().blkid.blk_count() * get_blk_size();
if ((total_size_to_fetch + size) >= max_batch_size) {
fetch_data_from_remote(std::move(next_batch_rreqs));
next_batch_rreqs.clear();
total_size_to_fetch = 0;
}
total_size_to_fetch += size;
next_batch_rreqs.emplace_back(rreq);
}
fetch_data_from_remote(std::move(next_batch_rreqs));
}
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
if (rreqs.size() == 0) { return; }
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
entries.reserve(rreqs.size());
shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
RD_LOGD("Data Channel : FetchData from remote: rreq.size={}, my server_id={}", rreqs.size(), server_id());
auto const& originator = rreqs.front()->remote_blkid().server_id;
for (auto const& rreq : rreqs) {
entries.push_back(CreateRequestEntry(*builder, rreq->lsn(), rreq->term(), rreq->dsn(),
builder->CreateVector(rreq->header().cbytes(), rreq->header().size()),
builder->CreateVector(rreq->key().cbytes(), rreq->key().size()),
rreq->remote_blkid().server_id /* blkid_originator */,
builder->CreateVector(rreq->remote_blkid().blkid.serialize().cbytes(),
rreq->remote_blkid().blkid.serialized_size())));
// relax this assert if there is a case in same batch originator can be different (can't think of one now)
// but if there were to be such case, we need to group rreqs by originator and send them in separate
// batches;
RD_DBG_ASSERT_EQ(rreq->remote_blkid().server_id, originator, "Unexpected originator for rreq={}",
rreq->to_string());
RD_LOGT("Fetching data from originator={}, remote: rreq=[{}], remote_blkid={}, my server_id={}", originator,
rreq->to_string(), rreq->remote_blkid().blkid.to_string(), server_id());
}
builder->FinishSizePrefixed(
CreateFetchData(*builder, CreateFetchDataRequest(*builder, builder->CreateVector(entries))));
COUNTER_INCREMENT(m_metrics, fetch_rreq_cnt, 1);
COUNTER_INCREMENT(m_metrics, fetch_total_entries_cnt, rreqs.size());
COUNTER_INCREMENT(m_metrics, outstanding_data_fetch_cnt, 1);
// leader can change, on the receiving side, we need to check if the leader is still the one who originated the
// blkid;
auto const fetch_start_time = Clock::now();
group_msg_service()
->data_service_request_bidirectional(
originator, FETCH_DATA,
sisl::io_blob_list_t{
sisl::io_blob{builder->GetBufferPointer(), builder->GetSize(), false /* is_aligned */}})
.via(&folly::InlineExecutor::instance())
.thenValue([this, builder, rreqs = std::move(rreqs), fetch_start_time](auto response) {
COUNTER_DECREMENT(m_metrics, outstanding_data_fetch_cnt, 1);
auto const fetch_latency_us = get_elapsed_time_us(fetch_start_time);
HISTOGRAM_OBSERVE(m_metrics, rreq_data_fetch_latency_us, fetch_latency_us);
RD_LOGD("Data Channel: FetchData from remote completed, time taken={} us", fetch_latency_us);
if (!response) {
// if we are here, it means the original who sent the log entries are down.
// we need to handle error and when the other member becomes leader, it will resend the log entries;
RD_LOG(ERROR,
"Not able to fetching data from originator={}, error={}, probably originator is down. Will "
"retry when new leader start appending log entries",
rreqs.front()->remote_blkid().server_id, response.error());
for (auto const& rreq : rreqs) {
// TODO: Set the data_received promise with error, so that waiting threads can be unblocked and
// reject the request. Without that, it will timeout and then reject it.
// We could have get to a scenario, where didn't receive the data at the time of fetch, but we
// received after issuing fetch and that leader has already switched. In this case, we don't want to
// fail the request.
if (!rreq->has_state(repl_req_state_t::DATA_RECEIVED)) {
handle_error(rreq, RaftReplService::to_repl_error(response.error()));
}
}
COUNTER_INCREMENT(m_metrics, fetch_err_cnt, 1);
return;
}
builder->Release();
iomanager.run_on_forget(iomgr::reactor_regex::random_worker,
[this, r = std::move(response.value()), rreqs = std::move(rreqs)]() {
handle_fetch_data_response(std::move(r), std::move(rreqs));
});
});
}
void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
auto const& incoming_buf = rpc_data->request_blob();
if (!incoming_buf.cbytes()) {
RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call");
rpc_data->send_response();
return;
}
auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes());
RD_LOGD("Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size());
std::vector< sisl::sg_list > sgs_vec;
std::vector< folly::Future< bool > > futs;
sgs_vec.reserve(fetch_req->request()->entries()->size());
futs.reserve(fetch_req->request()->entries()->size());
for (auto const& req : *(fetch_req->request()->entries())) {
auto const& lsn = req->lsn();
auto const& originator = req->blkid_originator();
auto const& remote_blkid = req->remote_blkid();
// Edit this check if in the future we want to fetch from non-originator;
if (originator != server_id()) {
auto const error_msg =
fmt::format("Did not expect to receive fetch data from "
"remote when I am not the originator of this request, originator={}, my_server_id={}",
originator, server_id());
RD_LOGW("{}", error_msg);
auto status = ::grpc::Status(::grpc::INVALID_ARGUMENT, error_msg);
rpc_data->set_status(status);
rpc_data->send_response();
return;
}
// fetch data based on the remote_blkid
// We are the originator of the blkid, read data locally;
MultiBlkId local_blkid;
// convert remote_blkid serialized data to local blkid
local_blkid.deserialize(sisl::blob{remote_blkid->Data(), remote_blkid->size()}, true /* copy */);
RD_LOGD("Data Channel: FetchData received: dsn={} lsn={} my_blkid={}", req->dsn(), lsn,
local_blkid.to_string());
// prepare the sgs data buffer to read into;
auto const total_size = local_blkid.blk_count() * get_blk_size();
sisl::sg_list sgs;
sgs.size = total_size;
sgs.iovs.emplace_back(
iovec{.iov_base = iomanager.iobuf_alloc(get_blk_size(), total_size), .iov_len = total_size});
// accumulate the sgs for later use (send back to the requester));
sgs_vec.push_back(sgs);
futs.emplace_back(async_read(local_blkid, sgs, total_size));
}
folly::collectAllUnsafe(futs).thenValue(
[this, rpc_data = std::move(rpc_data), sgs_vec = std::move(sgs_vec)](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
COUNTER_INCREMENT(m_metrics, read_err_cnt, 1);
RD_REL_ASSERT(false, "Error in reading data");
// TODO: Find a way to return error to the Listener
// TODO: actually will never arrive here as iomgr will assert
// (should not assert but to raise alert and leave the raft group);
}
}
RD_LOGD("Data Channel: FetchData data read completed for {} buffers", sgs_vec.size());
// now prepare the io_blob_list to response back to requester;
nuraft_mesg::io_blob_list_t pkts = sisl::io_blob_list_t{};
for (auto const& sgs : sgs_vec) {
auto const ret = sisl::io_blob::sg_list_to_ioblob_list(sgs);
pkts.insert(pkts.end(), ret.begin(), ret.end());
}
rpc_data->set_comp_cb([sgs_vec = std::move(sgs_vec)](boost::intrusive_ptr< sisl::GenericRpcData >&) {
for (auto const& sgs : sgs_vec) {
for (auto const& iov : sgs.iovs) {
iomanager.iobuf_free(reinterpret_cast< uint8_t* >(iov.iov_base));
}
}
});
rpc_data->send_response(pkts);
});
}
void RaftReplDev::handle_fetch_data_response(sisl::GenericClientResponse response,
std::vector< repl_req_ptr_t > rreqs) {
auto resp_blob = response.response_blob();
auto raw_data = resp_blob.cbytes();
auto total_size = resp_blob.size();
COUNTER_INCREMENT(m_metrics, fetch_total_blk_size, total_size);
RD_DBG_ASSERT_GT(total_size, 0, "Empty response from remote");
RD_DBG_ASSERT(raw_data, "Empty response from remote");
RD_LOGD("Data Channel: FetchData completed for {} requests", rreqs.size());
for (auto const& rreq : rreqs) {
auto const data_size = rreq->remote_blkid().blkid.blk_count() * get_blk_size();
if (!rreq->save_fetched_data(response, raw_data, data_size)) {
RD_DBG_ASSERT(rreq->local_blkid().is_valid(), "Invalid blkid for rreq={}", rreq->to_string());
auto const local_size = rreq->local_blkid().blk_count() * get_blk_size();
RD_DBG_ASSERT_EQ(data_size, local_size, "Data size mismatch for rreq={} remote size: {}, local size: {}",
rreq->to_string(), data_size, local_size);
RD_LOGD("Data Channel: Data already received for rreq=[{}], skip and move on to next rreq.",
rreq->to_string());
} else {
auto const data_write_start_time = Clock::now();
COUNTER_INCREMENT(m_metrics, total_write_cnt, 1);
COUNTER_INCREMENT(m_metrics, outstanding_data_write_cnt, 1);
data_service()
.async_write(r_cast< const char* >(rreq->data()), data_size, rreq->local_blkid())
.thenValue([this, rreq, data_write_start_time](auto&& err) {
// update outstanding no matter error or not;
COUNTER_DECREMENT(m_metrics, outstanding_data_write_cnt, 1);
auto const data_write_latency = get_elapsed_time_us(data_write_start_time);
auto const total_data_write_latency = get_elapsed_time_us(rreq->created_time());
auto const write_num_pieces = rreq->local_blkid().num_pieces();
HISTOGRAM_OBSERVE(m_metrics, rreq_pieces_per_write, write_num_pieces);
HISTOGRAM_OBSERVE(m_metrics, rreq_data_write_latency_us, data_write_latency);
HISTOGRAM_OBSERVE(m_metrics, rreq_total_data_write_latency_us, total_data_write_latency);
RD_REL_ASSERT(!err,
"Error in writing data"); // TODO: Find a way to return error to the Listener
rreq->release_data();
rreq->add_state(repl_req_state_t::DATA_WRITTEN);
rreq->m_data_written_promise.setValue();
RD_LOGD("Data Channel: Data Write completed rreq=[{}], data_write_latency_us={}, "
"total_write_latency_us={}, write_num_pieces={}",
rreq->to_string(), data_write_latency, total_data_write_latency, write_num_pieces);
});
RD_LOGD("Data Channel: Data fetched from remote: rreq=[{}], data_size: {}, total_size: {}, local_blkid: {}",
rreq->to_string(), data_size, total_size, rreq->local_blkid().to_string());
}
raw_data += data_size;
total_size -= data_size;
}
RD_DBG_ASSERT_EQ(total_size, 0, "Total size mismatch, some data is not consumed");
}
void RaftReplDev::commit_blk(repl_req_ptr_t rreq) {
if (rreq->local_blkid().is_valid()) {
if (data_service().commit_blk(rreq->local_blkid()) != BlkAllocStatus::SUCCESS) {
if (hs()->device_mgr()->is_boot_in_degraded_mode() && m_log_store_replay_done)
return;
else
RD_DBG_ASSERT(false, "fail to commit blk when applying log in non-degraded mode.")
}
}
}
void RaftReplDev::handle_rollback(repl_req_ptr_t rreq) {
// 1. call the listener to rollback
m_listener->on_rollback(rreq->lsn(), rreq->header(), rreq->key(), rreq);
// 2. remove the request from maps
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);
m_repl_key_req_map.erase(rreq->rkey());
// 3. free the allocated blocks
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", blkid.to_string());
RD_LOGD("Rollback rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
}
void RaftReplDev::handle_commit(repl_req_ptr_t rreq, bool recovery) {
if (!rreq->has_state(repl_req_state_t::DATA_COMMITTED)) { commit_blk(rreq); }
// Remove the request from repl_key map.
m_repl_key_req_map.erase(rreq->rkey());
// Remove the request from lsn map.
m_state_machine->unlink_lsn_to_req(rreq->lsn(), rreq);
auto cur_dsn = m_next_dsn.load(std::memory_order_relaxed);
while (cur_dsn <= rreq->dsn()) {
m_next_dsn.compare_exchange_strong(cur_dsn, rreq->dsn() + 1);
}
RD_LOGD("Raft channel: Commit rreq=[{}]", rreq->to_string());
if (rreq->op_code() == journal_type_t::HS_CTRL_DESTROY) {
leave();
} else if (rreq->op_code() == journal_type_t::HS_CTRL_REPLACE) {
replace_member(rreq);
} else {
m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkid(), rreq);
}
if (!recovery) {
auto prev_lsn = m_commit_upto_lsn.exchange(rreq->lsn());
RD_DBG_ASSERT_GT(rreq->lsn(), prev_lsn,
"Out of order commit of lsns, it is not expected in RaftReplDev. cur_lsns={}, prev_lsns={}",
rreq->lsn(), prev_lsn);
}
if (!rreq->is_proposer()) { rreq->clear(); }
}
void RaftReplDev::handle_config_commit(const repl_lsn_t lsn, raft_cluster_config_ptr_t& new_conf) {
// when reaching here, the new config has already been applied to the cluster.
// since we didn't create repl req for config change, we just need to update m_commit_upto_lsn here.
// keep this variable in case it is needed later
(void)new_conf;
auto prev_lsn = m_commit_upto_lsn.load(std::memory_order_relaxed);
if (prev_lsn >= lsn || !m_commit_upto_lsn.compare_exchange_strong(prev_lsn, lsn)) {
RD_LOGE("Raft Channel: unexpected log {} commited before config {} committed", prev_lsn, lsn);
}
}
void RaftReplDev::handle_error(repl_req_ptr_t const& rreq, ReplServiceError err) {