Skip to content

Commit b09a105

Browse files
authored
librdkafka: expose retry and reconnect backoff settings (#33973)
Exposes librdkafka settings as system settings: - `retry.backoff.ms` - `retry.backoff.max.ms` - `reconnect.backoff.ms` - `reconnect.backoff.max.ms` MZ will provide the default values, most of which are set based on the current defaults described [here](https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html). I increased the default value for `reconnect.backoff.max.ms` to `30s` based on local testing. `librdkafka` would have tried reconnecting a few times quickly before reaching the `30s` time. If those didn't work, it's likely that the cause is longer lived. Happy to set this to the librdkafka default of `10s` if folks disagree. ### Motivation exposes settings to help alleviate MaterializeInc/database-issues#9801 ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post.
1 parent 10ba611 commit b09a105

File tree

4 files changed

+77
-1
lines changed

4 files changed

+77
-1
lines changed

misc/python/materialize/mzcompose/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,10 @@ def get_default_system_parameters(
578578
"compute_peek_stash_batch_size",
579579
"storage_statistics_retention_duration",
580580
"enable_paused_cluster_readhold_downgrade",
581+
"kafka_retry_backoff",
582+
"kafka_retry_backoff_max",
583+
"kafka_reconnect_backoff",
584+
"kafka_reconnect_backoff_max",
581585
]
582586

583587

misc/python/materialize/parallel_workload/action.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,10 @@ def __init__(
15121512
"storage_statistics_retention_duration",
15131513
"enable_paused_cluster_readhold_downgrade",
15141514
"enable_with_ordinality_legacy_fallback",
1515+
"kafka_retry_backoff",
1516+
"kafka_retry_backoff_max",
1517+
"kafka_reconnect_backoff",
1518+
"kafka_reconnect_backoff_max",
15151519
]
15161520

15171521
def run(self, exe: Executor) -> bool:

src/storage-types/src/connections.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ use crate::connections::string_or_secret::StringOrSecret;
6060
use crate::controller::AlterError;
6161
use crate::dyncfgs::{
6262
ENFORCE_EXTERNAL_ADDRESSES, KAFKA_CLIENT_ID_ENRICHMENT_RULES,
63-
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM,
63+
KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM, KAFKA_RECONNECT_BACKOFF,
64+
KAFKA_RECONNECT_BACKOFF_MAX, KAFKA_RETRY_BACKOFF, KAFKA_RETRY_BACKOFF_MAX,
6465
};
6566
use crate::errors::{ContextCreationError, CsrConnectError};
6667

@@ -852,6 +853,35 @@ impl KafkaConnection {
852853
}
853854
}
854855

856+
options.insert(
857+
"retry.backoff.ms".into(),
858+
KAFKA_RETRY_BACKOFF
859+
.get(storage_configuration.config_set())
860+
.as_millis()
861+
.into(),
862+
);
863+
options.insert(
864+
"retry.backoff.max.ms".into(),
865+
KAFKA_RETRY_BACKOFF_MAX
866+
.get(storage_configuration.config_set())
867+
.as_millis()
868+
.into(),
869+
);
870+
options.insert(
871+
"reconnect.backoff.ms".into(),
872+
KAFKA_RECONNECT_BACKOFF
873+
.get(storage_configuration.config_set())
874+
.as_millis()
875+
.into(),
876+
);
877+
options.insert(
878+
"reconnect.backoff.max.ms".into(),
879+
KAFKA_RECONNECT_BACKOFF_MAX
880+
.get(storage_configuration.config_set())
881+
.as_millis()
882+
.into(),
883+
);
884+
855885
let mut config = mz_kafka_util::client::create_new_client_config(
856886
storage_configuration
857887
.connection_context

src/storage-types/src/dyncfgs.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,40 @@ pub const KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS: Config<usize> = Config
129129
most this number of elements.",
130130
);
131131

132+
/// Sets retry.backoff.ms in librdkafka for sources and sinks.
133+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
134+
pub const KAFKA_RETRY_BACKOFF: Config<Duration> = Config::new(
135+
"kafka_retry_backoff",
136+
Duration::from_millis(100),
137+
"Sets retry.backoff.ms in librdkafka for sources and sinks.",
138+
);
139+
140+
/// Sets retry.backoff.max.ms in librdkafka for sources and sinks.
141+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
142+
pub const KAFKA_RETRY_BACKOFF_MAX: Config<Duration> = Config::new(
143+
"kafka_retry_backoff_max",
144+
Duration::from_secs(1),
145+
"Sets retry.backoff.max.ms in librdkafka for sources and sinks.",
146+
);
147+
148+
/// Sets reconnect.backoff.ms in librdkafka for sources and sinks.
149+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
150+
pub const KAFKA_RECONNECT_BACKOFF: Config<Duration> = Config::new(
151+
"kafka_reconnect_backoff",
152+
Duration::from_millis(100),
153+
"Sets reconnect.backoff.ms in librdkafka for sources and sinks.",
154+
);
155+
156+
/// Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.
157+
/// We default to 30s instead of 10s to avoid constant reconnection attempts in the event of
158+
/// auth changes or unavailability.
159+
/// See <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_CONFIGURATION.html>
160+
pub const KAFKA_RECONNECT_BACKOFF_MAX: Config<Duration> = Config::new(
161+
"kafka_reconnect_backoff_max",
162+
Duration::from_secs(30),
163+
"Sets reconnect.backoff.max.ms in librdkafka for sources and sinks.",
164+
);
165+
132166
// MySQL
133167

134168
/// Replication heartbeat interval requested from the MySQL server.
@@ -301,6 +335,10 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
301335
.add(&KAFKA_DEFAULT_AWS_PRIVATELINK_ENDPOINT_IDENTIFICATION_ALGORITHM)
302336
.add(&KAFKA_METADATA_FETCH_INTERVAL)
303337
.add(&KAFKA_POLL_MAX_WAIT)
338+
.add(&KAFKA_RETRY_BACKOFF)
339+
.add(&KAFKA_RETRY_BACKOFF_MAX)
340+
.add(&KAFKA_RECONNECT_BACKOFF)
341+
.add(&KAFKA_RECONNECT_BACKOFF_MAX)
304342
.add(&MYSQL_OFFSET_KNOWN_INTERVAL)
305343
.add(&MYSQL_REPLICATION_HEARTBEAT_INTERVAL)
306344
.add(&ORE_OVERFLOWING_BEHAVIOR)

0 commit comments

Comments
 (0)