Skip to content

Commit 2cc1652

Browse files
authored
[ISSUE #6574]♻️Refactor OffsetStore to use enum variants for improved clarity and simplify offset management (#6575)
1 parent 76de3a0 commit 2cc1652

File tree

1 file changed

+107
-71
lines changed

1 file changed

+107
-71
lines changed

rocketmq-client/src/consumer/store/offset_store.rs

Lines changed: 107 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -21,142 +21,178 @@ use crate::consumer::store::local_file_offset_store::LocalFileOffsetStore;
2121
use crate::consumer::store::read_offset_type::ReadOffsetType;
2222
use crate::consumer::store::remote_broker_offset_store::RemoteBrokerOffsetStore;
2323

24-
pub struct OffsetStore {
25-
remote_broker_offset_store: Option<RemoteBrokerOffsetStore>,
26-
local_file_offset_store: Option<LocalFileOffsetStore>,
24+
/// The active offset storage backend for a message consumer.
25+
///
26+
/// Exactly one variant is active during the lifetime of a consumer:
27+
/// - [`Remote`][OffsetStore::Remote] is used in clustering mode, where offsets are coordinated
28+
/// through the broker.
29+
/// - [`Local`][OffsetStore::Local] is used in broadcasting mode, where offsets are stored in a
30+
/// local file on the consumer host.
31+
pub enum OffsetStore {
32+
/// Offsets persisted to and retrieved from the remote broker.
33+
Remote(RemoteBrokerOffsetStore),
34+
/// Offsets persisted to and retrieved from a local file.
35+
Local(LocalFileOffsetStore),
2736
}
2837

2938
impl OffsetStore {
30-
pub fn new(
31-
remote_broker_offset_store: Option<RemoteBrokerOffsetStore>,
32-
local_file_offset_store: Option<LocalFileOffsetStore>,
33-
) -> Self {
34-
Self {
35-
remote_broker_offset_store,
36-
local_file_offset_store,
37-
}
38-
}
39-
39+
/// Returns an [`OffsetStore`] backed by remote broker offset storage.
4040
pub fn new_with_remote(remote_broker_offset_store: RemoteBrokerOffsetStore) -> Self {
41-
Self {
42-
remote_broker_offset_store: Some(remote_broker_offset_store),
43-
local_file_offset_store: None,
44-
}
41+
Self::Remote(remote_broker_offset_store)
4542
}
4643

44+
/// Returns an [`OffsetStore`] backed by local file offset storage.
4745
pub fn new_with_local(local_file_offset_store: LocalFileOffsetStore) -> Self {
48-
Self {
49-
remote_broker_offset_store: None,
50-
local_file_offset_store: Some(local_file_offset_store),
51-
}
46+
Self::Local(local_file_offset_store)
5247
}
5348

49+
/// Asynchronously loads persisted offsets from the underlying storage backend.
50+
///
51+
/// # Errors
52+
///
53+
/// Returns an error if the underlying storage backend fails to read or deserialize
54+
/// the persisted offset data.
5455
pub async fn load(&self) -> rocketmq_error::RocketMQResult<()> {
55-
if let Some(store) = &self.remote_broker_offset_store {
56-
store.load().await?;
56+
match self {
57+
Self::Remote(store) => store.load().await,
58+
Self::Local(store) => store.load().await,
5759
}
58-
if let Some(store) = &self.local_file_offset_store {
59-
store.load().await?;
60-
}
61-
Ok(())
6260
}
6361

62+
/// Asynchronously updates the stored offset for the given queue.
63+
///
64+
/// When `increase_only` is `true`, the offset is updated only if the new value is
65+
/// greater than the currently stored value.
6466
pub async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool) {
65-
if let Some(store) = &self.remote_broker_offset_store {
66-
store.update_offset(mq, offset, increase_only).await;
67-
}
68-
if let Some(store) = &self.local_file_offset_store {
69-
store.update_offset(mq, offset, increase_only).await;
67+
match self {
68+
Self::Remote(store) => store.update_offset(mq, offset, increase_only).await,
69+
Self::Local(store) => store.update_offset(mq, offset, increase_only).await,
7070
}
7171
}
7272

73+
/// Asynchronously updates the stored offset for the given queue and prevents further
74+
/// updates until the freeze is lifted.
7375
pub async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64) {
74-
if let Some(store) = &self.remote_broker_offset_store {
75-
store.update_and_freeze_offset(mq, offset).await;
76-
}
77-
if let Some(store) = &self.local_file_offset_store {
78-
store.update_and_freeze_offset(mq, offset).await;
76+
match self {
77+
Self::Remote(store) => store.update_and_freeze_offset(mq, offset).await,
78+
Self::Local(store) => store.update_and_freeze_offset(mq, offset).await,
7979
}
8080
}
81-
pub async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 {
82-
if let Some(ref store) = self.remote_broker_offset_store {
83-
return store.read_offset(mq, type_).await;
84-
}
8581

86-
if let Some(ref store) = self.local_file_offset_store {
87-
return store.read_offset(mq, type_).await;
82+
/// Asynchronously reads the current offset for the given queue according to the
83+
/// specified read strategy.
84+
///
85+
/// Returns `0` if no offset has been recorded for the queue.
86+
pub async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64 {
87+
match self {
88+
Self::Remote(store) => store.read_offset(mq, type_).await,
89+
Self::Local(store) => store.read_offset(mq, type_).await,
8890
}
89-
0
9091
}
9192

93+
/// Asynchronously persists the offsets for all queues in `mqs` to the underlying
94+
/// storage backend.
9295
pub async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>) {
93-
if let Some(ref mut store) = self.remote_broker_offset_store {
94-
store.persist_all(mqs).await;
95-
}
96-
if let Some(ref mut store) = self.local_file_offset_store {
97-
store.persist_all(mqs).await;
96+
match self {
97+
Self::Remote(store) => store.persist_all(mqs).await,
98+
Self::Local(store) => store.persist_all(mqs).await,
9899
}
99100
}
100101

102+
/// Asynchronously persists the offset for the specified queue to the underlying
103+
/// storage backend.
101104
pub async fn persist(&mut self, mq: &MessageQueue) {
102-
if let Some(ref mut store) = self.remote_broker_offset_store {
103-
store.persist(mq).await;
104-
}
105-
if let Some(ref mut store) = self.local_file_offset_store {
106-
store.persist(mq).await;
105+
match self {
106+
Self::Remote(store) => store.persist(mq).await,
107+
Self::Local(store) => store.persist(mq).await,
107108
}
108109
}
109110

111+
/// Asynchronously removes the stored offset entry for the specified queue.
110112
pub async fn remove_offset(&self, mq: &MessageQueue) {
111-
if let Some(store) = &self.remote_broker_offset_store {
112-
store.remove_offset(mq).await;
113-
}
114-
if let Some(store) = &self.local_file_offset_store {
115-
store.remove_offset(mq).await;
113+
match self {
114+
Self::Remote(store) => store.remove_offset(mq).await,
115+
Self::Local(store) => store.remove_offset(mq).await,
116116
}
117117
}
118+
119+
/// Asynchronously returns a snapshot of the offset table filtered by the given topic.
118120
pub async fn clone_offset_table(&self, topic: &str) -> HashMap<MessageQueue, i64> {
119-
if let Some(store) = &self.remote_broker_offset_store {
120-
return store.clone_offset_table(topic).await;
121-
}
122-
if let Some(store) = &self.local_file_offset_store {
123-
return store.clone_offset_table(topic).await;
121+
match self {
122+
Self::Remote(store) => store.clone_offset_table(topic).await,
123+
Self::Local(store) => store.clone_offset_table(topic).await,
124124
}
125-
HashMap::new()
126125
}
127126

127+
/// Asynchronously pushes the given offset for the specified queue to the broker.
128+
///
129+
/// When `is_oneway` is `true`, the request is sent without waiting for an acknowledgement.
130+
///
131+
/// # Errors
132+
///
133+
/// Returns an error if the broker is unreachable or rejects the offset commit request.
128134
pub async fn update_consume_offset_to_broker(
129135
&mut self,
130136
mq: &MessageQueue,
131137
offset: i64,
132138
is_oneway: bool,
133139
) -> rocketmq_error::RocketMQResult<()> {
134-
if let Some(ref mut store) = self.remote_broker_offset_store {
135-
store.update_consume_offset_to_broker(mq, offset, is_oneway).await?;
140+
match self {
141+
Self::Remote(store) => store.update_consume_offset_to_broker(mq, offset, is_oneway).await,
142+
Self::Local(store) => store.update_consume_offset_to_broker(mq, offset, is_oneway).await,
136143
}
137-
if let Some(ref mut store) = self.local_file_offset_store {
138-
store.update_consume_offset_to_broker(mq, offset, is_oneway).await?;
139-
}
140-
Ok(())
141144
}
142145
}
143146

147+
/// Defines the contract for a consumer offset storage backend.
148+
///
149+
/// Implementations are responsible for loading, updating, persisting, and querying
150+
/// per-queue consume offsets. The two standard backends are
151+
/// [`RemoteBrokerOffsetStore`] and [`LocalFileOffsetStore`].
144152
pub trait OffsetStoreTrait {
153+
/// Asynchronously loads persisted offsets from the underlying storage.
154+
///
155+
/// # Errors
156+
///
157+
/// Returns an error if the underlying storage backend fails to read or deserialize
158+
/// the persisted offset data.
145159
async fn load(&self) -> rocketmq_error::RocketMQResult<()>;
146160

161+
/// Asynchronously updates the stored offset for the given queue.
162+
///
163+
/// When `increase_only` is `true`, the offset is updated only if the new value is
164+
/// greater than the currently stored value.
147165
async fn update_offset(&self, mq: &MessageQueue, offset: i64, increase_only: bool);
148166

167+
/// Asynchronously updates the stored offset for the given queue and prevents further
168+
/// updates until the freeze is lifted.
149169
async fn update_and_freeze_offset(&self, mq: &MessageQueue, offset: i64);
170+
171+
/// Asynchronously reads the current offset for the given queue according to the
172+
/// specified read strategy.
173+
///
174+
/// Returns `0` if no offset has been recorded for the queue.
150175
async fn read_offset(&self, mq: &MessageQueue, type_: ReadOffsetType) -> i64;
151176

177+
/// Asynchronously persists the offsets for all queues in `mqs` to the underlying storage.
152178
async fn persist_all(&mut self, mqs: &HashSet<MessageQueue>);
153179

180+
/// Asynchronously persists the offset for the specified queue to the underlying storage.
154181
async fn persist(&mut self, mq: &MessageQueue);
155182

183+
/// Asynchronously removes the stored offset entry for the specified queue.
156184
async fn remove_offset(&self, mq: &MessageQueue);
157185

186+
/// Asynchronously returns a snapshot of the offset table filtered by the given topic.
158187
async fn clone_offset_table(&self, topic: &str) -> HashMap<MessageQueue, i64>;
159188

189+
/// Asynchronously pushes the given offset for the specified queue to the broker.
190+
///
191+
/// When `is_oneway` is `true`, the request is sent without waiting for an acknowledgement.
192+
///
193+
/// # Errors
194+
///
195+
/// Returns an error if the broker is unreachable or rejects the offset commit request.
160196
async fn update_consume_offset_to_broker(
161197
&mut self,
162198
mq: &MessageQueue,

0 commit comments

Comments
 (0)