Skip to content

Commit ab51706

Browse files
committed
chore: add max_versions_per_poll
1 parent e8f61ad commit ab51706

File tree

2 files changed

+61
-6
lines changed

2 files changed

+61
-6
lines changed

src/sources/delta_lake_cdf/config.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,19 @@ pub struct DeltaLakeCdfConfig {
9696
#[serde(default)]
9797
pub ending_version: Option<i64>,
9898

99+
/// Maximum number of versions to process per poll cycle.
100+
///
101+
/// When catching up on a large backlog of versions, processing too many
102+
/// versions at once can cause the query planner to hang while listing
103+
/// thousands of CDF files. This option limits how many versions are
104+
/// processed in each poll cycle, allowing incremental catchup.
105+
///
106+
/// If not specified, all available versions are processed at once.
107+
#[serde(default)]
108+
#[configurable(metadata(docs::examples = 100))]
109+
#[configurable(metadata(docs::examples = 1000))]
110+
pub max_versions_per_poll: Option<i64>,
111+
99112
/// Directory for storing checkpoints.
100113
///
101114
/// The source persists its current position (version) to disk
@@ -177,6 +190,7 @@ impl Default for DeltaLakeCdfConfig {
177190
include_data: default_include_data(),
178191
change_types: Vec::new(),
179192
ending_version: None,
193+
max_versions_per_poll: None,
180194
data_dir: None,
181195
log_namespace: None,
182196
}
@@ -214,6 +228,7 @@ impl GenerateConfig for DeltaLakeCdfConfig {
214228
include_data: default_include_data(),
215229
change_types: Vec::new(),
216230
ending_version: None,
231+
max_versions_per_poll: None,
217232
data_dir: None,
218233
log_namespace: None,
219234
})
@@ -482,4 +497,25 @@ mod tests {
482497
);
483498
assert_eq!(ChangeType::from_cdf_string("unknown"), None);
484499
}
500+
501+
#[test]
502+
fn test_config_max_versions_per_poll() {
503+
let config_str = r#"
504+
table_uri = "s3://bucket/table"
505+
max_versions_per_poll = 100
506+
"#;
507+
508+
let config: DeltaLakeCdfConfig = toml::from_str(config_str).expect("Config should parse");
509+
assert_eq!(config.max_versions_per_poll, Some(100));
510+
}
511+
512+
#[test]
513+
fn test_config_max_versions_per_poll_default() {
514+
let config_str = r#"
515+
table_uri = "s3://bucket/table"
516+
"#;
517+
518+
let config: DeltaLakeCdfConfig = toml::from_str(config_str).expect("Config should parse");
519+
assert_eq!(config.max_versions_per_poll, None);
520+
}
485521
}

src/sources/delta_lake_cdf/source.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -104,16 +104,35 @@ pub async fn run_cdf_source(
104104
}
105105

106106
// Determine the end version for this batch
107+
// Apply max_versions_per_poll limit to prevent query planner from
108+
// hanging when catching up on large backlogs
109+
let max_end_from_batch_limit = config
110+
.max_versions_per_poll
111+
.map(|max| current_version.saturating_add(max).saturating_sub(1))
112+
.unwrap_or(i64::MAX);
113+
107114
let end_version = config
108115
.ending_version
109116
.map(|e| e.min(latest_version))
110-
.unwrap_or(latest_version);
117+
.unwrap_or(latest_version)
118+
.min(max_end_from_batch_limit);
111119

112-
debug!(
113-
message = "Processing CDF versions",
114-
start_version = current_version,
115-
end_version = end_version,
116-
);
120+
// Log at info level when catching up with batching, debug otherwise
121+
let versions_behind = latest_version.saturating_sub(end_version);
122+
if versions_behind > 0 && config.max_versions_per_poll.is_some() {
123+
info!(
124+
message = "Processing CDF versions (catching up)",
125+
start_version = current_version,
126+
end_version = end_version,
127+
versions_behind = versions_behind,
128+
);
129+
} else {
130+
debug!(
131+
message = "Processing CDF versions",
132+
start_version = current_version,
133+
end_version = end_version,
134+
);
135+
}
117136

118137
// Create streaming CDF reader for the version range
119138
match create_cdf_stream(&ctx, &table, current_version, end_version).await {

0 commit comments

Comments
 (0)