Skip to content

Commit 167d66f

Browse files
Federation: queue_type is not RabbitMQ 3.12.x compatible, so make it optional
1 parent d706c86 commit 167d66f

File tree

1 file changed

+9
-6
lines changed

1 file changed

+9
-6
lines changed

src/requests/federation.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ pub struct ExchangeFederationParams<'a> {
113113
pub exchange: Option<&'a str>,
114114
/// Maximum hops for federation chains to prevent infinite loops
115115
pub max_hops: Option<u8>,
116-
/// Queue type for the temporary federation queue
117-
pub queue_type: QueueType,
116+
/// Queue type for the temporary federation queue (requires RabbitMQ 3.13+)
117+
#[serde(skip_serializing_if = "Option::is_none")]
118+
pub queue_type: Option<QueueType>,
118119
/// Time-to-live for the federation queue in milliseconds
119120
pub ttl: Option<u32>,
120121
/// Message TTL for federated messages in milliseconds
@@ -130,7 +131,7 @@ impl ExchangeFederationParams<'_> {
130131
/// for the federation link. Use quorum queues for durability or classic for simplicity.
131132
pub fn new(queue_type: QueueType) -> Self {
132133
Self {
133-
queue_type,
134+
queue_type: Some(queue_type),
134135
..Default::default()
135136
}
136137
}
@@ -233,7 +234,9 @@ impl<'a> From<FederationUpstreamParams<'a>> for RuntimeParameterDefinition<'a> {
233234
}
234235

235236
if let Some(ef) = params.exchange_federation {
236-
value.insert("queue-type".to_owned(), json!(ef.queue_type));
237+
if let Some(qt) = &ef.queue_type {
238+
value.insert("queue-type".to_owned(), json!(qt));
239+
}
237240
value.insert(
238241
"resource-cleanup-mode".to_owned(),
239242
json!(ef.resource_cleanup_mode),
@@ -301,7 +304,7 @@ pub struct OwnedQueueFederationParams {
301304
pub struct OwnedExchangeFederationParams {
302305
pub exchange: Option<String>,
303306
pub max_hops: Option<u8>,
304-
pub queue_type: QueueType,
307+
pub queue_type: Option<QueueType>,
305308
pub ttl: Option<u32>,
306309
pub message_ttl: Option<u32>,
307310
pub resource_cleanup_mode: FederationResourceCleanupMode,
@@ -330,7 +333,7 @@ impl From<FederationUpstream> for OwnedFederationUpstreamParams {
330333
Some(OwnedExchangeFederationParams {
331334
exchange: upstream.exchange,
332335
max_hops: upstream.max_hops,
333-
queue_type: upstream.queue_type.unwrap_or_default(),
336+
queue_type: upstream.queue_type,
334337
ttl: upstream.expires,
335338
message_ttl: upstream.message_ttl,
336339
resource_cleanup_mode: upstream.resource_cleanup_mode,

0 commit comments

Comments
 (0)