Skip to content

Commit 35a761d

Browse files
authored
feat(sink): Sink schema change from barrier to persistent (#24271)
1 parent 62017ad commit 35a761d

File tree

35 files changed

+484
-248
lines changed

35 files changed

+484
-248
lines changed

proto/connector_service.proto

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ message CoordinateRequest {
228228
message CommitRequest {
229229
uint64 epoch = 1;
230230
SinkMetadata metadata = 2;
231-
stream_plan.SinkAddColumns add_columns = 3;
231+
stream_plan.SinkSchemaChange schema_change = 3;
232232
}
233233

234234
message UpdateVnodeBitmapRequest {

proto/stream_plan.proto

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -54,10 +54,6 @@ message StopMutation {
5454
repeated uint32 dropped_sink_fragments = 2;
5555
}
5656

57-
message SinkAddColumns {
58-
repeated plan_common.Field fields = 1;
59-
}
60-
6157
message UpdateMutation {
6258
message DispatcherUpdate {
6359
// Dispatcher can be uniquely identified by a combination of actor id and dispatcher id.
@@ -101,7 +97,7 @@ message UpdateMutation {
10197
map<uint32, Dispatchers> actor_new_dispatchers = 6;
10298
// CDC table snapshot splits
10399
source.CdcTableSnapshotSplitsWithGeneration actor_cdc_table_snapshot_splits = 7;
104-
map<uint32, SinkAddColumns> sink_add_columns = 8;
100+
map<uint32, SinkSchemaChange> sink_schema_change = 8;
105101
repeated SubscriptionUpstreamInfo subscriptions_to_drop = 9;
106102
}
107103

@@ -1289,18 +1285,17 @@ message StreamFragmentGraph {
12891285

12901286
// Schema change operation for sink
12911287
message SinkSchemaChange {
1292-
// Original schema before this change (optional, can be empty)
1293-
// Used for validation and conflict detection
1294-
// For current stage, this can be left empty
1288+
// Original schema before this change.
1289+
// Used for validation and conflict detection.
12951290
repeated plan_common.Field original_schema = 1;
12961291

12971292
// Schema change operation (mutually exclusive)
12981293
oneof op {
12991294
// Add new columns to the schema
1300-
AddColumnsOp add_columns = 2;
1295+
SinkAddColumnsOp add_columns = 2;
13011296

13021297
// Drop columns from the schema
1303-
DropColumnsOp drop_columns = 3;
1298+
SinkDropColumnsOp drop_columns = 3;
13041299

13051300
// Future operations can be added here...
13061301
// RenameColumnsOp rename_columns = 4;
@@ -1309,13 +1304,13 @@ message SinkSchemaChange {
13091304
}
13101305

13111306
// Add columns operation
1312-
message AddColumnsOp {
1307+
message SinkAddColumnsOp {
13131308
// Columns to add
13141309
repeated plan_common.Field fields = 1;
13151310
}
13161311

13171312
// Drop columns operation
1318-
message DropColumnsOp {
1313+
message SinkDropColumnsOp {
13191314
// Column names to drop
13201315
repeated string column_names = 1;
13211316
}

src/bench/sink_bench/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ impl LogReader for MockRangeLogReader {
109109
is_checkpoint: true,
110110
new_vnode_bitmap: None,
111111
is_stop: false,
112-
add_columns: None,
112+
schema_change: None,
113113
},
114114
))
115115
}

src/connector/src/sink/boxed.rs

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ use std::ops::DerefMut;
1818
use async_trait::async_trait;
1919
use futures::FutureExt;
2020
use futures::future::BoxFuture;
21-
use risingwave_common::catalog::Field;
2221
use risingwave_pb::connector_service::SinkMetadata;
22+
use risingwave_pb::stream_plan::PbSinkSchemaChange;
2323

2424
use crate::sink::log_store::{LogStoreReadItem, LogStoreResult, TruncateOffset};
2525
use crate::sink::{
@@ -111,9 +111,11 @@ impl SinglePhaseCommitCoordinator for BoxSinglePhaseCoordinator {
111111
&mut self,
112112
epoch: u64,
113113
metadata: Vec<SinkMetadata>,
114-
add_columns: Option<Vec<Field>>,
114+
schema_change: Option<PbSinkSchemaChange>,
115115
) -> crate::sink::Result<()> {
116-
self.deref_mut().commit(epoch, metadata, add_columns).await
116+
self.deref_mut()
117+
.commit(epoch, metadata, schema_change)
118+
.await
117119
}
118120
}
119121

@@ -127,15 +129,22 @@ impl TwoPhaseCommitCoordinator for BoxTwoPhaseCoordinator {
127129
&mut self,
128130
epoch: u64,
129131
metadata: Vec<SinkMetadata>,
130-
add_columns: Option<Vec<Field>>,
132+
schema_change: Option<PbSinkSchemaChange>,
131133
) -> crate::sink::Result<Vec<u8>> {
132134
self.deref_mut()
133-
.pre_commit(epoch, metadata, add_columns)
135+
.pre_commit(epoch, metadata, schema_change)
134136
.await
135137
}
136138

137-
async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> crate::sink::Result<()> {
138-
self.deref_mut().commit(epoch, commit_metadata).await
139+
async fn commit(
140+
&mut self,
141+
epoch: u64,
142+
commit_metadata: Vec<u8>,
143+
schema_change: Option<PbSinkSchemaChange>,
144+
) -> crate::sink::Result<()> {
145+
self.deref_mut()
146+
.commit(epoch, commit_metadata, schema_change)
147+
.await
139148
}
140149

141150
async fn abort(&mut self, epoch: u64, commit_metadata: Vec<u8>) {

src/connector/src/sink/coordinate.rs

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
193193
is_checkpoint,
194194
new_vnode_bitmap,
195195
is_stop,
196-
add_columns,
196+
schema_change,
197197
} => {
198198
let prev_epoch = match state {
199199
LogConsumerState::EpochBegun { curr_epoch } => curr_epoch,
@@ -204,7 +204,7 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
204204
if current_checkpoint >= commit_checkpoint_interval.get()
205205
|| new_vnode_bitmap.is_some()
206206
|| is_stop
207-
|| add_columns.is_some()
207+
|| schema_change.is_some()
208208
{
209209
let start_time = Instant::now();
210210
let metadata = sink_writer.barrier(true).await?;
@@ -213,15 +213,20 @@ impl<W: SinkWriter<CommitMetadata = Option<SinkMetadata>>> LogSinker for Coordin
213213
"should get metadata on checkpoint barrier"
214214
))
215215
})?;
216-
if add_columns.is_some() {
216+
if schema_change.is_some() {
217+
tracing::info!(
218+
sink_id = %self.param.sink_id,
219+
?schema_change,
220+
"schema change received for coordinated log sinker"
221+
);
217222
assert!(
218223
is_stop,
219-
"add columns should stop current sink for sink {}",
224+
"schema change should stop current sink for sink {}",
220225
self.param.sink_id
221226
);
222227
}
223228
coordinator_stream_handle
224-
.commit(epoch, metadata, add_columns)
229+
.commit(epoch, metadata, schema_change)
225230
.await?;
226231
sink_writer_metrics
227232
.sink_commit_duration

src/connector/src/sink/deltalake.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,13 @@ use phf::{Set, phf_set};
3333
use risingwave_common::array::StreamChunk;
3434
use risingwave_common::array::arrow::DeltaLakeConvert;
3535
use risingwave_common::bail;
36-
use risingwave_common::catalog::{Field, Schema};
36+
use risingwave_common::catalog::{Schema};
3737
use risingwave_common::types::DataType;
3838
use risingwave_common::util::iter_util::ZipEqDebug;
3939
use risingwave_pb::connector_service::SinkMetadata;
4040
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
4141
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
42+
use risingwave_pb::stream_plan::PbSinkSchemaChange;
4243
use serde::{Deserialize, Serialize};
4344
use serde_with::{DisplayFromStr, serde_as};
4445
use tokio::sync::mpsc::UnboundedSender;
@@ -530,13 +531,13 @@ impl SinglePhaseCommitCoordinator for DeltaLakeSinkCommitter {
530531
&mut self,
531532
epoch: u64,
532533
metadata: Vec<SinkMetadata>,
533-
add_columns: Option<Vec<Field>>,
534+
schema_change: Option<PbSinkSchemaChange>,
534535
) -> Result<()> {
535536
tracing::info!("Starting DeltaLake commit in epoch {epoch}.");
536-
if let Some(add_columns) = add_columns {
537+
if let Some(schema_change) = schema_change {
537538
return Err(anyhow!(
538-
"Delta lake sink not support add columns, but got: {:?}",
539-
add_columns
539+
"Delta lake sink does not support schema changes, but got: {:?}",
540+
schema_change
540541
)
541542
.into());
542543
}

src/connector/src/sink/iceberg/mod.rs

Lines changed: 34 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ use risingwave_common_estimate_size::EstimateSize;
6868
use risingwave_pb::connector_service::SinkMetadata;
6969
use risingwave_pb::connector_service::sink_metadata::Metadata::Serialized;
7070
use risingwave_pb::connector_service::sink_metadata::SerializedMetadata;
71+
use risingwave_pb::stream_plan::PbSinkSchemaChange;
7172
use serde::{Deserialize, Serialize};
7273
use serde_json::from_value;
7374
use serde_with::{DisplayFromStr, serde_as};
@@ -1886,20 +1887,32 @@ impl SinglePhaseCommitCoordinator for IcebergSinkCommitter {
18861887
&mut self,
18871888
epoch: u64,
18881889
metadata: Vec<SinkMetadata>,
1889-
add_columns: Option<Vec<Field>>,
1890+
schema_change: Option<PbSinkSchemaChange>,
18901891
) -> Result<()> {
18911892
tracing::info!("Starting iceberg direct commit in epoch {epoch}");
18921893

18931894
// Commit data if present
1894-
if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata, None)? {
1895+
if let Some((write_results, snapshot_id)) = self.pre_commit_inner(epoch, metadata)? {
18951896
self.commit_datafile(epoch, write_results, snapshot_id)
18961897
.await?;
18971898
}
18981899

18991900
// Commit schema change if present
1900-
if let Some(add_columns) = add_columns {
1901+
if let Some(schema_change) = schema_change {
1902+
let risingwave_pb::stream_plan::sink_schema_change::Op::AddColumns(add_columns) =
1903+
schema_change.op.unwrap()
1904+
else {
1905+
return Err(SinkError::Iceberg(anyhow!(
1906+
"Only AddColumns schema change is supported for Iceberg sink"
1907+
)));
1908+
};
1909+
let add_fields = add_columns
1910+
.fields
1911+
.into_iter()
1912+
.map(|pb_field| Field::from_prost(&pb_field))
1913+
.collect_vec();
19011914
tracing::info!(?epoch, "Committing schema change");
1902-
self.commit_schema_change(add_columns).await?;
1915+
self.commit_schema_change(add_fields).await?;
19031916
}
19041917

19051918
Ok(())
@@ -1921,27 +1934,26 @@ impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
19211934
&mut self,
19221935
epoch: u64,
19231936
metadata: Vec<SinkMetadata>,
1924-
add_columns: Option<Vec<Field>>,
1937+
schema_change: Option<PbSinkSchemaChange>,
19251938
) -> Result<Vec<u8>> {
19261939
tracing::info!("Starting iceberg pre commit in epoch {epoch}");
19271940

19281941
// TwoPhaseCommitCoordinator does not support schema change yet
1929-
if let Some(add_columns) = &add_columns {
1942+
if let Some(schema_change) = &schema_change {
19301943
return Err(SinkError::Iceberg(anyhow!(
19311944
"TwoPhaseCommitCoordinator for Iceberg sink does not support schema change yet, \
1932-
but got add_columns: {:?}",
1933-
add_columns.iter().map(|c| &c.name).collect::<Vec<_>>()
1945+
but got schema_change: {:?}",
1946+
schema_change
19341947
)));
19351948
}
19361949

1937-
let (write_results, snapshot_id) =
1938-
match self.pre_commit_inner(epoch, metadata, add_columns)? {
1939-
Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1940-
None => {
1941-
tracing::debug!(?epoch, "no data to commit");
1942-
return Ok(vec![]);
1943-
}
1944-
};
1950+
let (write_results, snapshot_id) = match self.pre_commit_inner(epoch, metadata)? {
1951+
Some((write_results, snapshot_id)) => (write_results, snapshot_id),
1952+
None => {
1953+
tracing::debug!(?epoch, "no data to commit");
1954+
return Ok(vec![]);
1955+
}
1956+
};
19451957

19461958
let mut write_results_bytes = Vec::new();
19471959
for each_parallelism_write_result in write_results {
@@ -1957,7 +1969,12 @@ impl TwoPhaseCommitCoordinator for IcebergSinkCommitter {
19571969
Ok(pre_commit_metadata_bytes)
19581970
}
19591971

1960-
async fn commit(&mut self, epoch: u64, commit_metadata: Vec<u8>) -> Result<()> {
1972+
async fn commit(
1973+
&mut self,
1974+
epoch: u64,
1975+
commit_metadata: Vec<u8>,
1976+
_schema_change: Option<PbSinkSchemaChange>,
1977+
) -> Result<()> {
19611978
tracing::info!("Starting iceberg commit in epoch {epoch}");
19621979
if commit_metadata.is_empty() {
19631980
tracing::debug!(?epoch, "no data to commit");
@@ -2008,16 +2025,7 @@ impl IcebergSinkCommitter {
20082025
&mut self,
20092026
_epoch: u64,
20102027
metadata: Vec<SinkMetadata>,
2011-
add_columns: Option<Vec<Field>>,
20122028
) -> Result<Option<(Vec<IcebergCommitResult>, i64)>> {
2013-
if let Some(add_columns) = add_columns {
2014-
return Err(anyhow!(
2015-
"Iceberg sink not support add columns, but got: {:?}",
2016-
add_columns
2017-
)
2018-
.into());
2019-
}
2020-
20212029
let write_results: Vec<IcebergCommitResult> = metadata
20222030
.iter()
20232031
.map(IcebergCommitResult::try_from)

src/connector/src/sink/log_store.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ use futures::{TryFuture, TryFutureExt};
2727
use risingwave_common::array::StreamChunk;
2828
use risingwave_common::bail;
2929
use risingwave_common::bitmap::Bitmap;
30-
use risingwave_common::catalog::Field;
3130
use risingwave_common::metrics::{LabelGuardedIntCounter, LabelGuardedIntGauge};
3231
use risingwave_common::util::epoch::{EpochPair, INVALID_EPOCH};
3332
use risingwave_common_estimate_size::EstimateSize;
3433
use risingwave_common_rate_limit::{RateLimit, RateLimiter};
34+
use risingwave_pb::stream_plan::PbSinkSchemaChange;
3535
use tokio::select;
3636
use tokio::sync::mpsc::UnboundedReceiver;
3737

@@ -122,7 +122,7 @@ pub enum LogStoreReadItem {
122122
is_checkpoint: bool,
123123
new_vnode_bitmap: Option<Arc<Bitmap>>,
124124
is_stop: bool,
125-
add_columns: Option<Vec<Field>>,
125+
schema_change: Option<PbSinkSchemaChange>,
126126
},
127127
}
128128

@@ -147,7 +147,7 @@ pub struct FlushCurrentEpochOptions {
147147
pub is_checkpoint: bool,
148148
pub new_vnode_bitmap: Option<Arc<Bitmap>>,
149149
pub is_stop: bool,
150-
pub add_columns: Option<Vec<Field>>,
150+
pub schema_change: Option<PbSinkSchemaChange>,
151151
}
152152

153153
pub trait LogWriter: Send {

src/connector/src/sink/mock_coordination_client.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,9 @@ impl MockSinkCoordinationRpcClient {
157157
msg:
158158
Some(coordinate_request::Msg::CommitRequest(
159159
coordinate_request::CommitRequest {
160-
epoch, metadata, ..
160+
epoch,
161+
metadata,
162+
schema_change,
161163
},
162164
)),
163165
}) => {
@@ -167,15 +169,19 @@ impl MockSinkCoordinationRpcClient {
167169
SinkCommitCoordinator::SinglePhase(coordinator) => {
168170
coordinator.init().await?;
169171
coordinator
170-
.commit(epoch, vec![metadata.unwrap()], None)
172+
.commit(epoch, vec![metadata.unwrap()], schema_change)
171173
.await?;
172174
}
173175
SinkCommitCoordinator::TwoPhase(coordinator) => {
174176
coordinator.init().await?;
175177
let metadata = coordinator
176-
.pre_commit(epoch, vec![metadata.unwrap()], None)
178+
.pre_commit(
179+
epoch,
180+
vec![metadata.unwrap()],
181+
schema_change.clone(),
182+
)
177183
.await?;
178-
coordinator.commit(epoch, metadata).await?;
184+
coordinator.commit(epoch, metadata, schema_change).await?;
179185
}
180186
}
181187
};

0 commit comments

Comments
 (0)