Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,11 @@
use crate::processor::pop_message_processor::QueueLockManager;

pub(crate) struct PopBufferMergeService<MS> {
buffer: DashMap<CheetahString /* mergeKey */, PopCheckPointWrapper>,
commit_offsets:
DashMap<CheetahString /* topic@cid@queueId */, QueueWithTime<PopCheckPointWrapper>>,
buffer: DashMap<CheetahString /* mergeKey */, ArcMut<PopCheckPointWrapper>>,
commit_offsets: DashMap<
CheetahString, /* topic@cid@queueId */
QueueWithTime<ArcMut<PopCheckPointWrapper>>,
>,
serving: AtomicBool,
counter: AtomicI32,
scan_times: u64,
Expand Down Expand Up @@ -102,14 +104,94 @@
}

impl<MS: MessageStore> PopBufferMergeService<MS> {
/// Adds a checkpoint to the buffer
pub fn add_ck(
&mut self,
&self,

Check warning on line 109 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L109

Added line #L109 was not covered by tests
point: &PopCheckPoint,
revive_queue_id: i32,
revive_queue_offset: i64,
next_begin_offset: i64,
) -> bool {
unimplemented!("add_ck not implemented")
// Check if buffer merge is enabled
let broker_config = self.broker_runtime_inner.broker_config();
if !broker_config.enable_pop_buffer_merge {
return false;
}

// Check if service is active
if !self.serving.load(Ordering::Acquire) {
return false;
}

// Check timeout condition
let now = get_current_millis() as i64;

if point.get_revive_time() - now < broker_config.pop_ck_stay_buffer_time_out as i64 + 1500 {
if broker_config.enable_pop_log {
warn!("[PopBuffer]add ck, timeout, {:?}, {}", point, now);
}
return false;
}

// Check buffer size
if self.counter.load(Ordering::Acquire) as i64 > broker_config.pop_ck_max_buffer_size {
warn!(
"[PopBuffer]add ck, max size, {:?}, {}",
point,
self.counter.load(Ordering::Acquire)

Check warning on line 141 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L115-L141

Added lines #L115 - L141 were not covered by tests
);
return false;
}

// Create wrapper
let point_wrapper = ArcMut::new(PopCheckPointWrapper::new(
revive_queue_id,
revive_queue_offset,
Arc::new(point.clone()),
next_begin_offset,
));

// Check if queue is valid
if !self.check_queue_ok(&point_wrapper) {
return false;
}

// Check for merge key conflict
let merge_key = point_wrapper.get_merge_key();
if self.buffer.contains_key(merge_key) {
warn!(
"[PopBuffer]mergeKey conflict when add ck. ck:{:?}, mergeKey:{}",

Check warning on line 163 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L143-L163

Added lines #L143 - L163 were not covered by tests
point_wrapper, merge_key
);
return false;
}

// Add to offset queue
self.put_offset_queue(point_wrapper.clone());

if broker_config.enable_pop_log {
info!("[PopBuffer]add ck, {:?}", point_wrapper);
}

Check warning on line 174 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L166-L174

Added lines #L166 - L174 were not covered by tests
// Add to buffer
self.buffer.insert(merge_key.clone(), point_wrapper);
self.counter.fetch_add(1, Ordering::AcqRel);

true
}

Check warning on line 180 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L176-L180

Added lines #L176 - L180 were not covered by tests

// Helper methods
fn check_queue_ok(&self, point_wrapper: &PopCheckPointWrapper) -> bool {
let queue = self.commit_offsets.get(point_wrapper.get_lock_key());
match queue {
None => true,
Some(value) => {
value.get().lock().len()
< self
.broker_runtime_inner
.broker_config()
.pop_ck_offset_max_queue_size as usize

Check warning on line 192 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L183-L192

Added lines #L183 - L192 were not covered by tests
}
}
}

pub fn add_ck_just_offset(
Expand Down Expand Up @@ -167,7 +249,7 @@
{
if self.broker_runtime_inner.broker_config().enable_pop_log {
warn!(
"[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {}, {}, {}",
"[PopBuffer]add ack fail, rqId={}, almost timeout for revive, {:?}, {}, {}",

Check warning on line 252 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L252

Added line #L252 was not covered by tests
revive_qid,
point_wrapper.value(),
ack_msg,
Expand All @@ -185,7 +267,7 @@
{
if self.broker_runtime_inner.broker_config().enable_pop_log {
warn!(
"[PopBuffer]add ack fail, rqId={}, timeout for revive, {}, {}, {}",
"[PopBuffer]add ack fail, rqId={}, timeout for revive, {:?}, {}, {}",

Check warning on line 270 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L270

Added line #L270 was not covered by tests
revive_qid,
point_wrapper.value(),
ack_msg,
Expand Down Expand Up @@ -552,7 +634,7 @@
);
point_wrapper.set_ck_stored(true);

self.put_offset_queue(point_wrapper);
self.put_offset_queue(ArcMut::new(point_wrapper));

Check warning on line 637 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L637

Added line #L637 was not covered by tests
}

fn is_ck_done_for_finish(&self, point_wrapper: &PopCheckPointWrapper) -> bool {
Expand All @@ -567,7 +649,7 @@
true
}

fn put_offset_queue(&mut self, point_wrapper: PopCheckPointWrapper) -> bool {
fn put_offset_queue(&self, point_wrapper: ArcMut<PopCheckPointWrapper>) -> bool {

Check warning on line 652 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L652

Added line #L652 was not covered by tests
let queue = self
.commit_offsets
.entry(point_wrapper.lock_key.clone())
Expand Down Expand Up @@ -759,6 +841,7 @@
}
}

#[derive(Debug)]
pub struct PopCheckPointWrapper {
revive_queue_id: i32,
// -1: not stored, >=0: stored, Long.MAX: storing.
Expand Down Expand Up @@ -887,7 +970,7 @@
&self.lock_key
}

pub fn get_merge_key(&self) -> &str {
pub fn get_merge_key(&self) -> &CheetahString {

Check warning on line 973 in rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/processor/processor_service/pop_buffer_merge_service.rs#L973

Added line #L973 was not covered by tests
&self.merge_key
}

Expand Down
6 changes: 5 additions & 1 deletion rocketmq-common/src/common/broker/broker_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,8 @@ pub struct BrokerConfig {
pub pop_polling_size: usize,
pub enable_pop_message_threshold: bool,
pub pop_inflight_message_threshold: i64,
pub pop_ck_max_buffer_size: i64,
pub pop_ck_offset_max_queue_size: u64,
}

impl Default for BrokerConfig {
Expand Down Expand Up @@ -314,7 +316,9 @@ impl Default for BrokerConfig {
max_pop_polling_size: 100000,
pop_polling_size: 1024,
enable_pop_message_threshold: false,
pop_inflight_message_threshold: 10000,
pop_inflight_message_threshold: 10_000,
pop_ck_max_buffer_size: 200_000,
pop_ck_offset_max_queue_size: 20_000,
}
}
}
Expand Down
Loading