Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,8 @@ message StreamCdcScanOptions {
bool backfill_as_even_splits = 6;

uint32 backfill_split_pk_column_index = 7;

optional string backfill_split_column_name = 8;
}

message StreamCdcScanNode {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ pub struct CdcTableSnapshotSplitOption {
pub backfill_num_rows_per_split: u64,
pub backfill_as_even_splits: bool,
pub backfill_split_pk_column_index: u32,
pub backfill_split_column_name: Option<String>,
}

pub enum ExternalTableReaderImpl {
Expand Down
27 changes: 25 additions & 2 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,19 @@ impl ExternalTableReader for PostgresExternalTableReader {
))
.into());
}
if let Some(backfill_split_column_name) = &options.backfill_split_column_name
&& self
.rw_schema
.fields
.iter()
.all(|f| f.name != *backfill_split_column_name)
{
return Err(anyhow::anyhow!(format!(
"invalid backfill_split_column_name {}",
backfill_split_column_name
))
.into());
}
let split_column = self.split_column(&options);
let row_stream = if options.backfill_as_even_splits
&& is_supported_even_split_data_type(&split_column.data_type)
Expand Down Expand Up @@ -745,8 +758,18 @@ impl PostgresExternalTableReader {
}

fn split_column(&self, options: &CdcTableSnapshotSplitOption) -> Field {
self.rw_schema.fields[self.pk_indices[options.backfill_split_pk_column_index as usize]]
.clone()
if let Some(backfill_split_column_name) = &options.backfill_split_column_name {
self.rw_schema
.fields
.iter()
.filter(|f| f.name == *backfill_split_column_name)
.cloned()
.exactly_one()
.unwrap()
} else {
self.rw_schema.fields[self.pk_indices[options.backfill_split_pk_column_index as usize]]
.clone()
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions src/connector/src/source/cdc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ pub const CDC_BACKFILL_PARALLELISM: &str = "backfill.parallelism";
pub const CDC_BACKFILL_NUM_ROWS_PER_SPLIT: &str = "backfill.num_rows_per_split";
pub const CDC_BACKFILL_AS_EVEN_SPLITS: &str = "backfill.as_even_splits";
pub const CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX: &str = "backfill.split_pk_column_index";
pub const CDC_BACKFILL_SPLIT_COLUMN_NAME: &str = "backfill.split_column_name";
// We enable transaction for shared cdc source by default
pub const CDC_TRANSACTIONAL_KEY: &str = "transactional";
pub const CDC_WAIT_FOR_STREAMING_START_TIMEOUT: &str = "cdc.source.wait.streaming.start.timeout";
Expand Down Expand Up @@ -303,6 +304,8 @@ pub struct CdcScanOptions {
pub backfill_as_even_splits: bool,
/// Used by parallelized backfill. Specify the index of primary key column to use as split column.
pub backfill_split_pk_column_index: u32,
/// If set, it will override `backfill_split_pk_column_index`.
pub backfill_split_column_name: Option<String>,
}

impl Default for CdcScanOptions {
Expand All @@ -316,6 +319,7 @@ impl Default for CdcScanOptions {
backfill_num_rows_per_split: 100_000,
backfill_as_even_splits: true,
backfill_split_pk_column_index: 0,
backfill_split_column_name: None,
}
}
}
Expand All @@ -330,6 +334,7 @@ impl CdcScanOptions {
backfill_num_rows_per_split: self.backfill_num_rows_per_split,
backfill_as_even_splits: self.backfill_as_even_splits,
backfill_split_pk_column_index: self.backfill_split_pk_column_index,
backfill_split_column_name: self.backfill_split_column_name.clone(),
}
}

Expand All @@ -342,6 +347,7 @@ impl CdcScanOptions {
backfill_num_rows_per_split: proto.backfill_num_rows_per_split,
backfill_as_even_splits: proto.backfill_as_even_splits,
backfill_split_pk_column_index: proto.backfill_split_pk_column_index,
backfill_split_column_name: proto.backfill_split_column_name.clone(),
}
}

Expand Down
6 changes: 5 additions & 1 deletion src/frontend/src/optimizer/plan_node/generic/cdc_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use risingwave_connector::source::cdc::{
CDC_BACKFILL_AS_EVEN_SPLITS, CDC_BACKFILL_ENABLE_KEY, CDC_BACKFILL_MAX_PARALLELISM,
CDC_BACKFILL_NUM_ROWS_PER_SPLIT, CDC_BACKFILL_PARALLELISM,
CDC_BACKFILL_SNAPSHOT_BATCH_SIZE_KEY, CDC_BACKFILL_SNAPSHOT_INTERVAL_KEY,
CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX, CdcScanOptions,
CDC_BACKFILL_SPLIT_COLUMN_NAME, CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX, CdcScanOptions,
};

use super::GenericPlanNode;
Expand Down Expand Up @@ -117,6 +117,10 @@ pub fn build_cdc_scan_options_with_options(
anyhow!("Invalid value for {}", CDC_BACKFILL_SPLIT_PK_COLUMN_INDEX)
})?;
}

if let Some(backfill_split_column_name) = with_options.get(CDC_BACKFILL_SPLIT_COLUMN_NAME) {
scan_options.backfill_split_column_name = Some(backfill_split_column_name.to_owned());
}
}

Ok(scan_options)
Expand Down
4 changes: 2 additions & 2 deletions src/frontend/src/stream_fragmenter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,8 +456,8 @@ fn build_fragment(
}

NodeBody::StreamCdcScan(node) => {
if let Some(o) = node.options
&& CdcScanOptions::from_proto(&o).is_parallelized_backfill()
if let Some(o) = &node.options
&& CdcScanOptions::from_proto(o).is_parallelized_backfill()
{
// Use parallel CDC backfill.
current_fragment
Expand Down
4 changes: 2 additions & 2 deletions src/meta/src/rpc/ddl_controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -816,8 +816,8 @@ impl DdlController {
})?;
fn assert_parallelism(stream_scan_fragment: &Fragment, node_body: &Option<NodeBody>) {
if let Some(NodeBody::StreamCdcScan(node)) = node_body {
if let Some(o) = node.options
&& CdcScanOptions::from_proto(&o).is_parallelized_backfill()
if let Some(o) = &node.options
&& CdcScanOptions::from_proto(o).is_parallelized_backfill()
{
// Use parallel CDC backfill.
} else {
Expand Down
1 change: 1 addition & 0 deletions src/meta/src/stream/cdc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ pub(crate) async fn try_init_parallel_cdc_table_snapshot_splits(
backfill_num_rows_per_split: per_table_options.backfill_num_rows_per_split,
backfill_as_even_splits: per_table_options.backfill_as_even_splits,
backfill_split_pk_column_index: per_table_options.backfill_split_pk_column_index,
backfill_split_column_name: per_table_options.backfill_split_column_name.clone(),
};
let table_type = ExternalCdcTableType::from_properties(&table_desc.connect_properties);
// Filter out additional columns to construct the external table schema
Expand Down
35 changes: 28 additions & 7 deletions src/stream/src/executor/backfill/cdc/cdc_backill_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,13 +119,29 @@ impl<S: StateStore> ParallelizedCdcBackfillExecutor<S> {
.filter(|col| col.additional_column.column_type.is_some())
.cloned()
.collect_vec();
assert!(
(self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
"split pk column index {} out of bound",
self.options.backfill_split_pk_column_index
);
let snapshot_split_column_index =
pk_indices[self.options.backfill_split_pk_column_index as usize];
if let Some(backfill_split_column_name) = &self.options.backfill_split_column_name {
let Some(pos) = self
.external_table
.schema()
.fields
.iter()
.position(|f| f.name == *backfill_split_column_name)
else {
panic!(
"invalid backfill_split_column_name {}",
backfill_split_column_name
);
};
pos
} else {
assert!(
(self.options.backfill_split_pk_column_index as usize) < pk_indices.len(),
"split pk column index {} out of bound",
self.options.backfill_split_pk_column_index
);
pk_indices[self.options.backfill_split_pk_column_index as usize]
};
let cdc_table_snapshot_split_column =
vec![self.external_table.schema().fields[snapshot_split_column_index].clone()];

Expand Down Expand Up @@ -802,7 +818,12 @@ fn assert_consecutive_splits(actor_snapshot_splits: &[CdcTableSnapshotSplit]) {
actor_snapshot_splits[i].right_bound_exclusive.datum_at(0),
OrderType::ascending_nulls_last(),
)
.is_lt()
.is_lt(),
"{:?} {:?}",
actor_snapshot_splits[i - 1]
.right_bound_exclusive
.datum_at(0),
actor_snapshot_splits[i].right_bound_exclusive.datum_at(0)
);
}
}
Expand Down