Skip to content

Commit 72fcb63

Browse files
authored
Merge branch 'master' into fix_ut_raft
2 parents 70c9af6 + 39781c3 commit 72fcb63

File tree

11 files changed

+93
-17
lines changed

11 files changed

+93
-17
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
name: Homestore 3.x Build
2+
3+
on:
4+
workflow_dispatch:
5+
push:
6+
branches:
7+
- stable/v3.x
8+
9+
jobs:
10+
Build:
11+
strategy:
12+
fail-fast: false
13+
matrix:
14+
platform: ["ubuntu-22.04", "ubuntu-20.04"]
15+
build-type: ["Debug", "Release"]
16+
malloc-impl: ["libc", "tcmalloc"]
17+
prerelease: ["True", "False"]
18+
exclude:
19+
- build-type: Debug
20+
platform: ubuntu-20.04
21+
- build-type: Debug
22+
malloc-impl: tcmalloc
23+
- malloc-impl: tcmalloc
24+
platform: ubuntu-20.04
25+
- malloc-impl: libc
26+
build-type: Release
27+
platform: ubuntu-22.04
28+
- prerelease: "True"
29+
platform: ubuntu-20.04
30+
uses: ./.github/workflows/build_commit.yml
31+
with:
32+
platform: ${{ matrix.platform }}
33+
build-type: ${{ matrix.build-type }}
34+
malloc-impl: ${{ matrix.malloc-impl }}
35+
prerelease: ${{ matrix.prerelease }}

conanfile.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
class HomestoreConan(ConanFile):
77
name = "homestore"
8-
version = "6.4.25"
8+
version = "6.4.27"
99
homepage = "https://github.com/eBay/Homestore"
1010
description = "HomeStore Storage Engine"
1111
topics = ("ebay", "nublox")
@@ -31,7 +31,6 @@ class HomestoreConan(ConanFile):
3131
'skip_testing': False,
3232
}
3333

34-
3534
generators = "cmake", "cmake_find_package"
3635
exports_sources = "cmake/*", "src/*", "CMakeLists.txt", "test_wrap.sh", "LICENSE"
3736
keep_imports = True

src/include/homestore/logstore/log_store.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ class HomeLogStore : public std::enable_shared_from_this< HomeLogStore > {
368368
// Sync flush sections
369369
std::atomic< logstore_seq_num_t > m_sync_flush_waiter_lsn{invalid_lsn()};
370370
std::mutex m_sync_flush_mtx;
371+
std::mutex m_single_sync_flush_mtx;
371372
std::condition_variable m_sync_flush_cv;
372373

373374
std::vector< seq_ld_key_pair > m_truncation_barriers; // List of truncation barriers

src/lib/device/journal_vdev.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,8 @@ off_t JournalVirtualDev::Descriptor::alloc_next_append_blk(size_t sz) {
284284
// update reserved size;
285285
m_reserved_sz += sz;
286286

287-
high_watermark_check();
287+
// TODO enable after resource manager changes.
288+
// high_watermark_check();
288289

289290
// assert that returnning logical offset is in good range
290291
HS_DBG_ASSERT_LE(tail_off, m_end_offset);

src/lib/logstore/log_dev.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,11 @@ int64_t LogDev::append_async(const logstore_id_t store_id, const logstore_seq_nu
262262
auto threshold_size = LogDev::flush_data_threshold_size();
263263
m_log_records->create(idx, store_id, seq_num, data, cb_context);
264264

265+
if (HS_DYNAMIC_CONFIG(logstore.flush_threshold_size) == 0) {
266+
// This is set in tests to disable implicit flush. This will be removed in future.
267+
return idx;
268+
}
269+
265270
if (flush_wait ||
266271
((prev_size < threshold_size && ((prev_size + data.size()) >= threshold_size) &&
267272
!m_is_flushing.load(std::memory_order_relaxed)))) {

src/lib/logstore/log_store.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,10 +173,11 @@ void HomeLogStore::read_async(logstore_seq_num_t seq_num, void* cookie, const lo
173173
#endif
174174

175175
void HomeLogStore::on_write_completion(logstore_req* req, const logdev_key& ld_key) {
176+
std::unique_lock lk(m_sync_flush_mtx);
176177
// Upon completion, create the mapping between seq_num and log dev key
177178
m_records.update(req->seq_num, [&](logstore_record& rec) -> bool {
178179
rec.m_dev_key = ld_key;
179-
// THIS_LOGSTORE_LOG(DEBUG, "Completed write of lsn {} logdev_key={}", req->seq_num, ld_key);
180+
THIS_LOGSTORE_LOG(DEBUG, "Completed write of store lsn {} logdev_key={}", req->seq_num, ld_key);
180181
return true;
181182
});
182183
// assert(flush_ld_key.idx >= m_last_flush_ldkey.idx);
@@ -402,6 +403,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {
402403
HS_DBG_ASSERT_EQ(LogDev::can_flush_in_this_thread(), false,
403404
"Logstore flush sync cannot be called on same thread which could do logdev flush");
404405

406+
std::unique_lock lk(m_single_sync_flush_mtx);
405407
if (upto_seq_num == invalid_lsn()) { upto_seq_num = m_records.active_upto(); }
406408

407409
// if we have flushed already, we are done
@@ -426,6 +428,7 @@ void HomeLogStore::flush_sync(logstore_seq_num_t upto_seq_num) {
426428

427429
// NOTE: We are not resetting the lsn because same seq number should never have 2 completions and thus not
428430
// doing it saves an atomic instruction
431+
THIS_LOGSTORE_LOG(TRACE, "flush_sync over upto_seq_num {}", upto_seq_num);
429432
}
430433
}
431434

src/lib/replication/log_store/home_raft_log_store.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ ulong HomeRaftLogStore::next_slot() const {
130130
}
131131

132132
ulong HomeRaftLogStore::last_index() const {
133-
uint64_t last_index = m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn);
133+
uint64_t last_index = to_repl_lsn(m_log_store->get_contiguous_completed_seq_num(m_last_durable_lsn));
134134
return last_index;
135135
}
136136

src/lib/replication/log_store/repl_log_store.cpp

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,20 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
4141

4242
// Start fetch the batch of data for this lsn range from remote if its not available yet.
4343
auto reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
44+
auto proposer_reqs = sisl::VectorPool< repl_req_ptr_t >::alloc();
4445
for (int64_t lsn = int64_cast(start_lsn); lsn <= end_lsn; ++lsn) {
4546
auto rreq = m_sm.lsn_to_req(lsn);
4647
// Skip this call in proposer, since this method will synchronously flush the data, which is not required for
4748
// leader. Proposer will call the flush as part of commit after receiving quorum, upon which time, there is a
4849
// high possibility the log entry is already flushed. Skip it for rreq == nullptr which is the case for raft
4950
// config entries.
50-
if ((rreq == nullptr) || rreq->is_proposer()) { continue; }
51-
reqs->emplace_back(std::move(rreq));
51+
if ((rreq == nullptr) /*|| rreq->is_proposer()*/) {
52+
continue;
53+
} else if (rreq->is_proposer()) {
54+
proposer_reqs->emplace_back(std::move(rreq));
55+
} else {
56+
reqs->emplace_back(std::move(rreq));
57+
}
5258
}
5359

5460
RD_LOGT("Raft Channel: end_of_append_batch start_lsn={} count={} num_data_to_be_written={}", start_lsn, count,
@@ -73,8 +79,17 @@ void ReplLogStore::end_of_append_batch(ulong start_lsn, ulong count) {
7379
for (auto const& rreq : *reqs) {
7480
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
7581
}
82+
} else if (!proposer_reqs->empty()) {
83+
RD_LOGT("Raft Channel: end_of_append_batch, I am proposer, only flush log s from {} , count {}", start_lsn,
84+
count);
85+
// Mark all the reqs also completely written
86+
HomeRaftLogStore::end_of_append_batch(start_lsn, count);
87+
for (auto const& rreq : *proposer_reqs) {
88+
if (rreq) { rreq->add_state(repl_req_state_t::LOG_FLUSHED); }
89+
}
7690
}
7791
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
92+
sisl::VectorPool< repl_req_ptr_t >::free(proposer_reqs);
7893
}
7994

8095
std::string ReplLogStore::rdev_name() const { return m_rd.rdev_name(); }

src/lib/replication/repl_dev/raft_repl_dev.cpp

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -148,13 +148,15 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const&
148148
repl_req_ptr_t rreq) {
149149
if (!rreq) { auto rreq = repl_req_ptr_t(new repl_req_ctx{}); }
150150

151-
auto const guard = m_stage.access();
152-
if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) {
153-
RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage));
154-
handle_error(rreq,
155-
(stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING
156-
: ReplServiceError::SERVER_IS_LEAVING);
157-
return;
151+
{
152+
auto const guard = m_stage.access();
153+
if (auto const stage = *guard.get(); stage != repl_dev_stage_t::ACTIVE) {
154+
RD_LOGW("Raft channel: Not ready to accept writes, stage={}", enum_name(stage));
155+
handle_error(rreq,
156+
(stage == repl_dev_stage_t::INIT) ? ReplServiceError::SERVER_IS_JOINING
157+
: ReplServiceError::SERVER_IS_LEAVING);
158+
return;
159+
}
158160
}
159161

160162
rreq->init(repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1)},
@@ -249,6 +251,11 @@ void RaftReplDev::push_data_to_all_followers(repl_req_ptr_t rreq, sisl::sg_list
249251

250252
void RaftReplDev::on_push_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
251253
auto const& incoming_buf = rpc_data->request_blob();
254+
if (!incoming_buf.cbytes()) {
255+
RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call");
256+
rpc_data->send_response();
257+
return;
258+
}
252259
auto const fb_size =
253260
flatbuffers::ReadScalar< flatbuffers::uoffset_t >(incoming_buf.cbytes()) + sizeof(flatbuffers::uoffset_t);
254261
auto push_req = GetSizePrefixedPushDataRequest(incoming_buf.cbytes());
@@ -517,7 +524,7 @@ void RaftReplDev::check_and_fetch_remote_data(std::vector< repl_req_ptr_t > rreq
517524
void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
518525
if (rreqs.size() == 0) { return; }
519526

520-
std::vector<::flatbuffers::Offset< RequestEntry > > entries;
527+
std::vector< ::flatbuffers::Offset< RequestEntry > > entries;
521528
entries.reserve(rreqs.size());
522529

523530
shared< flatbuffers::FlatBufferBuilder > builder = std::make_shared< flatbuffers::FlatBufferBuilder >();
@@ -597,6 +604,11 @@ void RaftReplDev::fetch_data_from_remote(std::vector< repl_req_ptr_t > rreqs) {
597604

598605
void RaftReplDev::on_fetch_data_received(intrusive< sisl::GenericRpcData >& rpc_data) {
599606
auto const& incoming_buf = rpc_data->request_blob();
607+
if (!incoming_buf.cbytes()) {
608+
RD_LOGW("Data Channel: PushData received with empty buffer, ignoring this call");
609+
rpc_data->send_response();
610+
return;
611+
}
600612
auto fetch_req = GetSizePrefixedFetchData(incoming_buf.cbytes());
601613

602614
RD_LOGD("Data Channel: FetchData received: fetch_req.size={}", fetch_req->request()->entries()->size());

src/lib/replication/repl_dev/raft_state_machine.cpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
#include "service/raft_repl_service.h"
88
#include "repl_dev/raft_state_machine.h"
99
#include "repl_dev/raft_repl_dev.h"
10+
#include <homestore/homestore.hpp>
11+
#include "common/homestore_config.hpp"
1012

1113
SISL_LOGGING_DECL(replication)
1214

@@ -187,11 +189,10 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
187189
RD_LOGD("Raft channel: Received Commit message lsn {} store {} logdev {} size {}", lsn,
188190
m_rd.m_data_journal->logstore_id(), m_rd.m_data_journal->logdev_id(), params.data->size());
189191
repl_req_ptr_t rreq = lsn_to_req(lsn);
190-
if (!rreq) { RD_LOGD("Raft channel got null rreq"); }
192+
RD_DBG_ASSERT(rreq != nullptr, "Raft channel got null rreq");
191193
RD_LOGD("Raft channel: Received Commit message rreq=[{}]", rreq->to_compact_string());
192194
if (rreq->is_proposer()) {
193195
// This is the time to ensure flushing of journal happens in the proposer
194-
if (m_rd.m_data_journal->last_durable_index() < uint64_cast(lsn)) { m_rd.m_data_journal->flush(); }
195196
rreq->add_state(repl_req_state_t::LOG_FLUSHED);
196197
}
197198

0 commit comments

Comments
 (0)