Skip to content

Commit 0370cb5

Browse files
committed
chore(deps): Bump pulsar-rs to latest SHA
This pulls in a fix for the connection_manager prematurely deleting connections that can be used for `update_topics` (which runs automatically in the background). Pulsar has not released an official crate for this yet, so we'll use the commit SHA. Ref: LOG-23224
1 parent 4758373 commit 0370cb5

File tree

4 files changed

+24
-10
lines changed

4 files changed

+24
-10
lines changed

Cargo.lock

Lines changed: 9 additions & 9 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ openssl-probe = { version = "0.1.6", default-features = false }
386386
ordered-float = { version = "4.6.0", default-features = false, features = ["serde"] }
387387
percent-encoding = { version = "2.3.1", default-features = false }
388388
postgres-openssl = { version = "0.5.1", default-features = false, features = ["runtime"], optional = true }
389-
pulsar = { git = "https://github.com/streamnative/pulsar-rs", rev = "5dfd10ce1c92d3200eb9cc010ab224ded46c9adc", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
389+
pulsar = { git = "https://github.com/streamnative/pulsar-rs", rev = "174070149bab64359f8a31094d7bfb857b045bf1", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true }
390390
quick-junit = { version = "0.5.1" }
391391
rand.workspace = true
392392
rand_distr.workspace = true

src/sinks/pulsar/config.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ impl PulsarSinkConfig {
308308
.map_or(default_retry_options.keep_alive, |secs| {
309309
Duration::from_secs(secs)
310310
}),
311+
..ConnectionRetryOptions::default()
311312
}
312313
});
313314

src/sources/pulsar.rs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,10 @@ pub struct PulsarConnectionRetryOptions {
329329
#[configurable(metadata(docs::type_unit = "seconds"))]
330330
#[configurable(metadata(docs::examples = 60))]
331331
pub keep_alive_secs: Option<u64>,
332+
/// Maximum idle time before a connection is eligible for cleanup
333+
#[configurable(metadata(docs::type_unit = "seconds"))]
334+
#[configurable(metadata(docs::examples = 120))]
335+
pub connection_max_idle: Option<u64>,
332336
}
333337

334338
#[derive(Debug)]
@@ -436,6 +440,9 @@ impl PulsarSourceConfig {
436440
if let Some(secs) = opts.keep_alive_secs {
437441
retry_options.keep_alive = Duration::from_secs(secs);
438442
}
443+
if let Some(secs) = opts.connection_max_idle {
444+
retry_options.connection_max_idle = Duration::from_secs(secs);
445+
}
439446
}
440447

441448
debug!("Creating pulsar consumer with options: {:?}", retry_options);
@@ -871,6 +878,7 @@ mod tests {
871878
max_retries = 50
872879
connection_timeout_secs = 59
873880
keep_alive_secs = 58
881+
connection_max_idle = 55
874882
"#,
875883
);
876884

@@ -880,6 +888,7 @@ mod tests {
880888
assert_eq!(opts.max_retries, Some(50));
881889
assert_eq!(opts.connection_timeout_secs, Some(59));
882890
assert_eq!(opts.keep_alive_secs, Some(58));
891+
assert_eq!(opts.connection_max_idle, Some(55));
883892
}
884893

885894
#[test]
@@ -926,6 +935,10 @@ mod tests {
926935
opts.keep_alive_secs, None,
927936
"Expected keep_alive_secs to be None"
928937
);
938+
assert_eq!(
939+
opts.connection_max_idle, None,
940+
"Expected connection_max_idle to be None"
941+
);
929942
}
930943

931944
#[test]

0 commit comments

Comments
 (0)