Skip to content

Commit 38ff8ad

Browse files
authored
[ISSUE #2554]🚀Fix pop consumer can't start🔥 (#2555)
1 parent 8bc3bec commit 38ff8ad

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

rocketmq-client/src/admin/default_mq_admin_ext_impl.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ impl DefaultMQAdminExtImpl {
9898
rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
9999
timeout_millis: Duration,
100100
client_config: ArcMut<ClientConfig>,
101+
admin_ext_group: CheetahString,
101102
) -> Self {
102103
DefaultMQAdminExtImpl {
103104
service_state: ServiceState::CreateJust,
@@ -108,7 +109,7 @@ impl DefaultMQAdminExtImpl {
108109
NAMESPACE_ORDER_TOPIC_CONFIG,
109110
)],
110111
client_config,
111-
admin_ext_group: Default::default(),
112+
admin_ext_group,
112113
inner: None,
113114
}
114115
}

rocketmq-example/examples/consumer/pop_consumer.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub async fn main() -> Result<()> {
6262

6363
async fn switch_pop_consumer() -> Result<()> {
6464
let mut mq_admin_ext = DefaultMQAdminExt::new();
65+
mq_admin_ext.client_config_mut().namesrv_addr =
66+
Some(CheetahString::from_static_str(DEFAULT_NAMESRVADDR));
6567
MQAdminExt::start(&mut mq_admin_ext).await.unwrap();
6668
let broker_datas =
6769
MQAdminExt::examine_topic_route_info(&mq_admin_ext, CheetahString::from_static_str(TOPIC))

rocketmq-tools/src/admin/default_mq_admin_ext.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -61,18 +61,20 @@ pub struct DefaultMQAdminExt {
6161

6262
impl DefaultMQAdminExt {
6363
pub fn new() -> Self {
64+
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
6465
let client_config = ArcMut::new(ClientConfig::new());
6566
let mut default_mqadmin_ext_impl = ArcMut::new(DefaultMQAdminExtImpl::new(
6667
None,
6768
Duration::from_millis(5000),
6869
client_config.clone(),
70+
admin_ext_group.clone(),
6971
));
7072
let inner = default_mqadmin_ext_impl.clone();
7173
default_mqadmin_ext_impl.set_inner(inner);
7274
Self {
7375
client_config,
7476
default_mqadmin_ext_impl,
75-
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
77+
admin_ext_group,
7678
create_topic_key: CheetahString::from_static_str(
7779
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
7880
),
@@ -81,15 +83,17 @@ impl DefaultMQAdminExt {
8183
}
8284

8385
pub fn with_timeout(timeout_millis: Duration) -> Self {
86+
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
8487
let client_config = ArcMut::new(ClientConfig::new());
8588
Self {
8689
client_config: client_config.clone(),
8790
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
8891
None,
8992
timeout_millis,
9093
client_config,
94+
admin_ext_group.clone(),
9195
)),
92-
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
96+
admin_ext_group,
9397
create_topic_key: CheetahString::from_static_str(
9498
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
9599
),
@@ -98,6 +102,7 @@ impl DefaultMQAdminExt {
98102
}
99103

100104
pub fn with_rpc_hook(rpc_hook: impl RPCHook) -> Self {
105+
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
101106
let rpc_hook_inner: Box<dyn RPCHook> = Box::new(rpc_hook);
102107
let client_config = ArcMut::new(ClientConfig::new());
103108
Self {
@@ -106,8 +111,9 @@ impl DefaultMQAdminExt {
106111
Some(Arc::new(rpc_hook_inner)),
107112
Duration::from_millis(5000),
108113
client_config,
114+
admin_ext_group.clone(),
109115
)),
110-
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
116+
admin_ext_group,
111117
create_topic_key: CheetahString::from_static_str(
112118
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
113119
),
@@ -116,6 +122,7 @@ impl DefaultMQAdminExt {
116122
}
117123

118124
pub fn with_rpc_hook_and_timeout(rpc_hook: impl RPCHook, timeout_millis: Duration) -> Self {
125+
let admin_ext_group = CheetahString::from_static_str(ADMIN_EXT_GROUP);
119126
let rpc_hook_inner: Box<dyn RPCHook> = Box::new(rpc_hook);
120127
let client_config = ArcMut::new(ClientConfig::new());
121128
Self {
@@ -124,8 +131,9 @@ impl DefaultMQAdminExt {
124131
Some(Arc::new(rpc_hook_inner)),
125132
timeout_millis,
126133
client_config,
134+
admin_ext_group.clone(),
127135
)),
128-
admin_ext_group: CheetahString::from_static_str(ADMIN_EXT_GROUP),
136+
admin_ext_group,
129137
create_topic_key: CheetahString::from_static_str(
130138
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
131139
),
@@ -134,15 +142,17 @@ impl DefaultMQAdminExt {
134142
}
135143

136144
pub fn with_admin_ext_group(admin_ext_group: impl Into<CheetahString>) -> Self {
145+
let admin_ext_group = admin_ext_group.into();
137146
let client_config = ArcMut::new(ClientConfig::new());
138147
Self {
139148
client_config: client_config.clone(),
140149
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
141150
None,
142151
Duration::from_millis(5000),
143152
client_config,
153+
admin_ext_group.clone(),
144154
)),
145-
admin_ext_group: admin_ext_group.into(),
155+
admin_ext_group,
146156
create_topic_key: CheetahString::from_static_str(
147157
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
148158
),
@@ -154,21 +164,33 @@ impl DefaultMQAdminExt {
154164
admin_ext_group: impl Into<CheetahString>,
155165
timeout_millis: Duration,
156166
) -> Self {
167+
let admin_ext_group = admin_ext_group.into();
157168
let client_config = ArcMut::new(ClientConfig::new());
158169
Self {
159170
client_config: client_config.clone(),
160171
default_mqadmin_ext_impl: ArcMut::new(DefaultMQAdminExtImpl::new(
161172
None,
162173
timeout_millis,
163174
client_config,
175+
admin_ext_group.clone(),
164176
)),
165-
admin_ext_group: admin_ext_group.into(),
177+
admin_ext_group,
166178
create_topic_key: CheetahString::from_static_str(
167179
TopicValidator::AUTO_CREATE_TOPIC_KEY_TOPIC,
168180
),
169181
timeout_millis,
170182
}
171183
}
184+
185+
#[inline]
186+
pub fn client_config(&self) -> &ArcMut<ClientConfig> {
187+
&self.client_config
188+
}
189+
190+
#[inline]
191+
pub fn client_config_mut(&mut self) -> &mut ArcMut<ClientConfig> {
192+
&mut self.client_config
193+
}
172194
}
173195

174196
impl Default for DefaultMQAdminExt {

0 commit comments

Comments
 (0)