Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions refactor.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/bin/bash

# Input parameters
CLUSTER=908
NAMESPACE="nuobject2sh-dev"
DEPLOYMENT_COUNT=4
REFACTOR_IMAGE="hub.tess.io/yawzhang/storage_mgr:refactor_new-RelWithDebInfo"
NEW_IMAGE="hub.tess.io/yawzhang/storage_mgr:crc_1027-RelWithDebInfo"

# Function to check deployment status
check_deployment_status() {
local deployment=$1
local status
status=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get deployment "$deployment" -o jsonpath='{.status.readyReplicas}' 2>/dev/null)
if [[ "$status" -eq 1 ]]; then
return 0
else
return 1
fi
}

# Function to check pod logs
check_pod_logs() {
local pod=$1
local log_message=$2
tess kubectl --context="$CLUSTER" -n "$NAMESPACE" logs "$pod" | grep -q "$log_message"
return $?
}

start_idx=1
for i in $(seq "$start_idx" "$DEPLOYMENT_COUNT"); do
DEPLOYMENT="sm-long-running$i-1007"
POD=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | grep "$DEPLOYMENT")
echo "Processing deployment $DEPLOYMENT pod $POD..."

# PRE-CHECK
CURRENT_IMAGE=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pod "$POD" -o jsonpath='{.spec.containers[?(@.name=="sm-app")].image}')
if [[ "$CURRENT_IMAGE" == "$NEW_IMAGE" ]]; then
echo "[PRE-CHECK] Pod $POD is already using the new image $NEW_IMAGE. Skipping..."
continue
fi

# Step 1: Update deployment strategy to Recreate and set sm-app image to refactor image
echo "[Step 1]. Updating deployment $DEPLOYMENT strategy to Recreate and setting sm-app image to $REFACTOR_IMAGE..."
tess kubectl --context="$CLUSTER" -n "$NAMESPACE" patch deployment "$DEPLOYMENT" --type='json' -p='[
{"op": "replace", "path": "/spec/strategy", "value": {"type": "Recreate"}},
{"op": "replace", "path": "/spec/template/spec/containers/0/image", "value": "'"$REFACTOR_IMAGE"'"}
]'

sleep 40

# Step 2: Get new pod name and check the log
NEW_POD=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | grep "$DEPLOYMENT" | grep -v "$POD")
while [[ -z "$NEW_POD" ]]; do
echo "[Step 2]. No new pod found for deployment $DEPLOYMENT. deployment still upgrading, wait 3s and retrying."
sleep 3
NEW_POD=$(tess kubectl --context="$CLUSTER" -n "$NAMESPACE" get pods -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | grep "$DEPLOYMENT" | grep -v "$POD")
done
echo "[Step 2]. New pod created: $NEW_POD for deployment $DEPLOYMENT."
echo "[Step 2]. Checking logs for new pod $NEW_POD..."
max_retry_cnt=20
retry_cnt=0
while ! check_pod_logs "$NEW_POD" "exit status 0;"; do
if [[ $retry_cnt -ge $max_retry_cnt ]]; then
echo "[Step 2. Exceeded maximum retries while checking logs for new pod $NEW_POD."
exit 1
fi
echo "[Step 2]. Expected log message not found in new pod $NEW_POD, sleeping 3s and retrying."
sleep 3
retry_cnt=$((retry_cnt + 1))
done
echo "[Step 2]. refactor confirmation log found in new pod $NEW_POD."

# Double Check deployment status again, expecting it to not be ready
if check_deployment_status "$DEPLOYMENT"; then
echo "[Step 2]. Unexpected! Deployment $DEPLOYMENT is still ready after updating to refactor image for pod $POD."
exit 1
fi

# Step 3: Update deployment sm-app image to new image
echo "[Step 3]. Updating deployment $DEPLOYMENT to use new image $NEW_IMAGE for pod $NEW_POD..."
tess kubectl --context="$CLUSTER" -n "$NAMESPACE" set image deployment/"$DEPLOYMENT" sm-app="$NEW_IMAGE"
sleep 40

# Step 4: Check deployment status again, expecting it to be ready
while ! check_deployment_status "$DEPLOYMENT"; do
echo "[Step 4]. Deployment $DEPLOYMENT is not ready after updating to new image for pod $NEW_POD. sleep 5s and retrying."
sleep 5
done

# Step 5: Update deployment strategy back to RollingUpdate and set maxUnavailable and maxSurge
echo "[Step 5]. Updating deployment $DEPLOYMENT strategy back to RollingUpdate with maxUnavailable=0 and maxSurge=1..."
tess kubectl --context="$CLUSTER" -n "$NAMESPACE" patch deployment "$DEPLOYMENT" --type='json' -p='[
{"op": "replace", "path": "/spec/strategy", "value": {"type": "RollingUpdate", "rollingUpdate": {"maxUnavailable": 1, "maxSurge": 1}}}
]'

# Final check to ensure deployment is ready
while ! check_deployment_status "$DEPLOYMENT"; do
echo "[Step 6]. Deployment $DEPLOYMENT is not ready after updating strategy back to RollingUpdate. sleep 5s and retrying."
sleep 5
done

echo "[Step 6]. Deployment $DEPLOYMENT processed successfully."
done

echo "All pods processed successfully."
1 change: 1 addition & 0 deletions src/include/homestore/logstore/log_store_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ struct logstore_superblk {

[[nodiscard]] static logstore_superblk default_value();
static void init(logstore_superblk& m);
static void init(logstore_superblk& meta, logstore_seq_num_t first_seq_num);
static void clear(logstore_superblk& m);
[[nodiscard]] static bool is_valid(const logstore_superblk& m);

Expand Down
2 changes: 2 additions & 0 deletions src/include/homestore/logstore_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class LogStoreService {
*/
void start(bool format);

void refactor();

/**
* @brief Stop the LogStoreService. It resets all parameters and can be restarted with start method.
*
Expand Down
3 changes: 3 additions & 0 deletions src/include/homestore/superblk_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,12 @@ class superblk {
sisl::byte_array raw_buf() { return m_raw_buf; }

void write() {
LOGINFO("Writing superblk {} of size {}", m_meta_sub_name, m_raw_buf->size());
if (m_meta_blk) {
LOGINFO("Updating existing superblk {}", m_meta_sub_name);
meta_service().update_sub_sb(m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
} else {
LOGINFO("Adding new superblk {}", m_meta_sub_name);
meta_service().add_sub_sb(m_meta_sub_name, m_raw_buf->cbytes(), m_raw_buf->size(), m_meta_blk);
}
}
Expand Down
6 changes: 5 additions & 1 deletion src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ CPManager::CPManager() :
start_cp_thread();
}

CPManager::~CPManager() { HS_REL_ASSERT(!m_cur_cp, "CPManager is tiering down without calling shutdown"); }
CPManager::~CPManager() {
delete (m_cur_cp);
rcu_xchg_pointer(&m_cur_cp, nullptr);
LOGINFO("CPManager destroyed");
}

void CPManager::start(bool first_time_boot) {
if (first_time_boot) {
Expand Down
27 changes: 3 additions & 24 deletions src/lib/homestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ bool HomeStore::start(const hs_input_params& input, hs_before_services_starting_
do_start();
return false;
} else {
return true;
RELEASE_ASSERT(false, "refactor mode should not bu used for the first time boot");
}
}

Expand Down Expand Up @@ -292,21 +292,8 @@ void HomeStore::do_start() {

if (has_repl_data_service()) {
s_cast< GenericReplService* >(m_repl_service.get())->start(); // Replservice starts logstore & data service
} else {
if (has_data_service()) { m_data_service->start(); }
if (has_log_service() && inp_params.auto_recovery) {
// In case of custom recovery, let consumer starts the recovery and it is consumer module's responsibilities
// to start log store
m_log_service->start(is_first_time_boot() /* format */);
}
}

// If this is the first time boot, we need to commit the formatting so that it will not be considered as first time
// boot going forward on next reboot.
if (m_dev_mgr->is_first_time_boot()) {
// Take the first CP after we have initialized all subsystems and wait for it to complete.
m_cp_mgr->trigger_cp_flush(true /* force */).get();
m_dev_mgr->commit_formatting();
LOGINFO("Refactor mode enabled, skipping further HomeStore start steps after ReplicationService start");
return;
}

m_cp_mgr->start_timer();
Expand All @@ -316,15 +303,8 @@ void HomeStore::do_start() {
}

void HomeStore::shutdown() {
if (!m_init_done) {
LOGWARN("Homestore shutdown is called before init is completed");
return;
}

LOGINFO("Homestore shutdown is started");

m_resource_mgr->stop();

// 1 stop all the services, after which all the upper layer api call are rejected and there is not on-going request.
// Note that, after stopping, all the service are alive.
if (has_repl_data_service())
Expand All @@ -340,7 +320,6 @@ void HomeStore::shutdown() {
// 2 call cp_manager shutdown, which will which trigger cp flush to make sure all the in-memory data of all the
// services are flushed to disk. since all the upper layer api call are rejected and there is not on-going request,
// so after cp flush is done, we can guarantee all the necessary data are persisted to disk.
m_cp_mgr->shutdown();
m_cp_mgr.reset();

// 3 call reset/shutdown to clear all the services and after that all the services are dead, excluding metasevice
Expand Down
125 changes: 76 additions & 49 deletions src/lib/logstore/log_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,43 +70,12 @@ void LogDev::start(bool format, std::shared_ptr< JournalVirtualDev > vdev) {
HS_LOG_ASSERT(!m_logdev_meta.is_empty(),
"Expected meta data to be read already before loading this log dev id: {}", m_logdev_id);
auto const store_list = m_logdev_meta.load();

// Notify to the caller that a new log store was reserved earlier and it is being loaded, with its meta info
for (const auto& spair : store_list) {
on_log_store_found(spair.first, spair.second);
}

THIS_LOGDEV_LOG(INFO, "get start vdev offset during recovery {} log indx {} ",
m_logdev_meta.get_start_dev_offset(), m_logdev_meta.get_start_log_idx());

m_vdev_jd->update_data_start_offset(m_logdev_meta.get_start_dev_offset());
m_log_idx = m_logdev_meta.get_start_log_idx();
do_load(m_logdev_meta.get_start_dev_offset());
m_log_records->reinit(m_log_idx);
m_last_flush_idx = m_log_idx - 1;
}

// Now that we have create/load logdev metablk, so the log dev is ready to be used
m_is_ready = true;

if (allow_timer_flush()) start_timer();
handle_unopened_log_stores(format);

{
// Also call the logstore to inform that start/replay is completed.
folly::SharedMutexWritePriority::WriteHolder holder(m_store_map_mtx);
if (!format) {
for (auto& p : m_id_logstore_map) {
auto& lstore{p.second.log_store};
if (lstore && lstore->get_log_replay_done_cb()) {
lstore->get_log_replay_done_cb()(lstore, lstore->start_lsn() - 1);
lstore->truncate(lstore->truncated_upto());
}
}
}
LOGINFO("just refactor for lgodev {}, donot need rebuild logstore and load logs, return directly", m_logdev_id);
}
}

void LogDev::refactor() { LOGINFO("get all blks and call refactor superblk, not support now"); }

LogDev::~LogDev() {
THIS_LOGDEV_LOG(INFO, "Logdev stopping id {}", m_logdev_id);
HS_LOG_ASSERT((m_pending_flush_size.load() == 0),
Expand Down Expand Up @@ -146,21 +115,8 @@ void LogDev::stop() {
}
}

folly::SharedMutexWritePriority::ReadHolder holder(m_store_map_mtx);
for (auto& [_, store] : m_id_logstore_map) {
store.log_store->stop();
}

// trigger a new flush to make sure all pending writes are flushed
flush_under_guard();

// after we call stop, we need to do any pending device truncations
truncate();
m_id_logstore_map.clear();
if (allow_timer_flush()) {
auto f = stop_timer();
std::move(f).get();
}
THIS_LOGDEV_LOG(INFO, "no need to stop logstore in refactor mode, return directly");
return;
}

void LogDev::destroy() {
Expand Down Expand Up @@ -869,7 +825,9 @@ void LogDevMetadata::rollback_super_blk_found(const sisl::byte_view& buf, void*

std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::load() {
std::vector< std::pair< logstore_id_t, logstore_superblk > > ret_list;
std::vector< std::pair< logstore_id_t, logstore_superblk > > all_list;
ret_list.reserve(1024);
all_list.reserve(1024);
if (store_capacity()) {
m_id_reserver = std::make_unique< sisl::IDReserver >(store_capacity());
} else {
Expand All @@ -889,6 +847,12 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
m_id_reserver->reserve(idx);
ret_list.push_back(std::make_pair<>(idx, store_sb[idx]));
++n;
LOGINFO("Loaded valid logstore superblk for log_dev={}, store_id={} start_lsn={}", m_sb->logdev_id, idx,
store_sb[idx].m_first_seq_num);
all_list.push_back(std::make_pair<>(idx, store_sb[idx]));
} else {
LOGINFO("Found invalid logstore superblk for log_dev={}, store_id={}", m_sb->logdev_id, idx);
all_list.push_back(std::make_pair<>(idx, store_sb[idx]));
}
++idx;
}
Expand All @@ -898,9 +862,72 @@ std::vector< std::pair< logstore_id_t, logstore_superblk > > LogDevMetadata::loa
m_rollback_info.insert({rec.store_id, rec.idx_range});
}

LOGINFO("call refactor superblk for logdev={}, all_list_size={}, reserved_list_size={}", m_sb->logdev_id,
all_list.size(), ret_list.size());
refactor_superblk(all_list);

return ret_list;
}

void LogDevMetadata::refactor_superblk(const std::vector< std::pair< logstore_id_t, logstore_superblk > >& all_list) {
// increase size if needed
auto nstores = (m_store_info.size() == 0) ? 0u : *m_store_info.rbegin() + 1;
auto req_sz = sizeof(new_logdev_superblk) + (nstores * sizeof(logstore_superblk));
if (meta_service().is_aligned_buf_needed(req_sz)) { req_sz = sisl::round_up(req_sz, meta_service().align_size()); }
LOGINFO("Refactoring logdev_superblk log_dev={}, current_size={}, required_size={}, nstores={}", m_sb->logdev_id,
m_sb.size(), req_sz, nstores);
if (req_sz != m_sb.size()) {
const auto old_buf = m_sb.raw_buf();
m_sb.create(req_sz);
logstore_superblk* sb_area = m_sb->get_logstore_superblk();
std::fill_n(sb_area, store_capacity(), logstore_superblk::default_value());
std::memcpy(voidptr_cast(m_sb.raw_buf()->bytes()), static_cast< const void* >(old_buf->cbytes()),
std::min(old_buf->size(), m_sb.size()));
} else {
LOGINFO("No need to resize logdev_superblk log_dev={}, skip refactor", m_sb->logdev_id);
return;
}

// convert old superblk to new superblk
new_logdev_superblk new_sb(m_sb.get());
std::memcpy(voidptr_cast(m_sb.raw_buf()->bytes()), static_cast< const void* >(&new_sb),
sizeof(new_logdev_superblk));

// initialize all logstore superblks to default value
logstore_superblk* sb_area =
reinterpret_cast< logstore_superblk* >(m_sb.raw_buf()->bytes() + sizeof(new_logdev_superblk));
uint32_t store_cap = (m_sb.size() - sizeof(new_logdev_superblk)) / sizeof(logstore_superblk);
std::fill_n(sb_area, store_cap, logstore_superblk::default_value());

// copy log store superblks
for (const auto& [store_id, store_sb] : all_list) {
HS_REL_ASSERT(logstore_superblk::is_valid(store_sb),
"Refactoring logdev superblk with invalid logstore superblk for store id {}-{}", new_sb.logdev_id,
store_id);
LOGINFO("Refactoring logdev_superblk log_dev={}, store_id={} start_lsn={}", new_sb.logdev_id, store_id,
store_sb.m_first_seq_num);
logstore_superblk::init(sb_area[store_id], store_sb.m_first_seq_num);
}
m_sb.write();
LOGINFO("Refactored logdev_superblk written to disk, log_dev={}", new_sb.logdev_id);

// check if refactor is successful
new_logdev_superblk* test_sb = reinterpret_cast< new_logdev_superblk* >(m_sb.raw_buf()->bytes());
LOGINFO("Verifying refactored logdev_superblk log_dev={}, num_stores={}, start_dev_offset={}", test_sb->logdev_id,
test_sb->num_stores, test_sb->start_dev_offset);
const logstore_superblk* test_store_sb =
reinterpret_cast< logstore_superblk* >(m_sb.raw_buf()->bytes() + sizeof(new_logdev_superblk));
for (const auto& [store_id, store_sb] : all_list) {
if (test_store_sb[store_id].m_first_seq_num != store_sb.m_first_seq_num) {
LOGERROR("Refactored logdev superblk verification failed for store id {}, expected is {}, actual is {}",
store_id, store_sb.m_first_seq_num, test_store_sb[store_id].m_first_seq_num);
} else {
LOGINFO("Refactored logdev={} superblk verification succeeded for store id {}, lsn={}", test_sb->logdev_id,
store_id, test_store_sb[store_id].m_first_seq_num);
}
}
}

logstore_id_t LogDevMetadata::reserve_store(bool persist_now) {
auto const idx = m_id_reserver->reserve(); // Search the id reserver and alloc an idx;
m_store_info.insert(idx);
Expand Down
Loading