Skip to content

Commit c9370ae

Browse files
authored
[Flink] Support Partial Updates to the Flink Sink (apache#2042)
* add partial updates to the datastraem api * refactor builder * fix checkstyle violations * update setter * fix checkstyle * update setter
1 parent 3c53a20 commit c9370ae

File tree

3 files changed

+457
-2
lines changed

3 files changed

+457
-2
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/sink/FlussSinkBuilder.java

Lines changed: 72 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import java.util.Arrays;
3637
import java.util.HashMap;
3738
import java.util.List;
3839
import java.util.Map;
@@ -73,6 +74,9 @@ public class FlussSinkBuilder<InputT> {
7374
private final Map<String, String> configOptions = new HashMap<>();
7475
private FlussSerializationSchema<InputT> serializationSchema;
7576
private boolean shuffleByBucketId = true;
77+
// Optional list of columns for partial update. When set, upsert will only update these columns.
78+
// The primary key columns must be fully specified in this list.
79+
private List<String> partialUpdateColumns;
7680

7781
/** Set the bootstrap server for the sink. */
7882
public FlussSinkBuilder<InputT> setBootstrapServers(String bootstrapServers) {
@@ -98,6 +102,24 @@ public FlussSinkBuilder<InputT> setShuffleByBucketId(boolean shuffleByBucketId)
98102
return this;
99103
}
100104

105+
/**
106+
* Enable partial update by specifying the column names to update for upsert tables. Primary key
107+
* columns must be included in this list.
108+
*/
109+
public FlussSinkBuilder<InputT> setPartialUpdateColumns(List<String> columns) {
110+
this.partialUpdateColumns = columns;
111+
return this;
112+
}
113+
114+
/**
115+
* Enable partial update by specifying the column names to update for upsert tables. Convenience
116+
* varargs overload.
117+
*/
118+
public FlussSinkBuilder<InputT> setPartialUpdateColumns(String... columns) {
119+
this.partialUpdateColumns = Arrays.asList(columns);
120+
return this;
121+
}
122+
101123
/** Set a configuration option. */
102124
public FlussSinkBuilder<InputT> setOption(String key, String value) {
103125
configOptions.put(key, value);
@@ -153,12 +175,17 @@ public FlussSink<InputT> build() {
153175

154176
if (isUpsert) {
155177
LOG.info("Initializing Fluss upsert sink writer ...");
178+
int[] targetColumnIndexes =
179+
computeTargetColumnIndexes(
180+
tableRowType.getFieldNames(),
181+
tableInfo.getPrimaryKeys(),
182+
partialUpdateColumns);
156183
writerBuilder =
157184
new FlinkSink.UpsertSinkWriterBuilder<>(
158185
tablePath,
159186
flussConfig,
160187
tableRowType,
161-
null, // not support partialUpdateColumns yet
188+
targetColumnIndexes,
162189
numBucket,
163190
bucketKeys,
164191
partitionKeys,
@@ -193,4 +220,48 @@ private void validateConfiguration() {
193220
checkNotNull(tableName, "Table name is required but not provided.");
194221
checkArgument(!tableName.isEmpty(), "Table name cannot be empty.");
195222
}
223+
224+
// -------------- Test-visible helper methods --------------
225+
/**
226+
* Computes target column indexes for partial updates. If {@code specifiedColumns} is null or
227+
* empty, returns null indicating full update. Validates that all primary key columns are
228+
* included in the specified columns.
229+
*
230+
* @param allFieldNames the list of all field names in table row type order
231+
* @param primaryKeyNames the list of primary key column names
232+
* @param specifiedColumns the optional list of columns specified for partial update
233+
* @return the indexes into {@code allFieldNames} corresponding to {@code specifiedColumns}, or
234+
* null for full update
235+
* @throws IllegalArgumentException if a specified column does not exist or primary key coverage
236+
* is incomplete
237+
*/
238+
static int[] computeTargetColumnIndexes(
239+
List<String> allFieldNames,
240+
List<String> primaryKeyNames,
241+
List<String> specifiedColumns) {
242+
if (specifiedColumns == null || specifiedColumns.isEmpty()) {
243+
return null; // full update
244+
}
245+
246+
// Map specified column names to indexes
247+
int[] indexes = new int[specifiedColumns.size()];
248+
for (int i = 0; i < specifiedColumns.size(); i++) {
249+
String col = specifiedColumns.get(i);
250+
int idx = allFieldNames.indexOf(col);
251+
checkArgument(
252+
idx >= 0, "Column '%s' not found in table schema: %s", col, allFieldNames);
253+
indexes[i] = idx;
254+
}
255+
256+
// Validate that all primary key columns are covered
257+
for (String pk : primaryKeyNames) {
258+
checkArgument(
259+
specifiedColumns.contains(pk),
260+
"Partial updates must include all primary key columns. Missing primary key column: %s. Provided columns: %s",
261+
pk,
262+
specifiedColumns);
263+
}
264+
265+
return indexes;
266+
}
196267
}

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/sink/FlussSinkBuilderTest.java

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.junit.jupiter.api.Test;
2525

2626
import java.lang.reflect.Field;
27+
import java.util.Arrays;
2728
import java.util.HashMap;
2829
import java.util.Map;
2930

@@ -171,12 +172,55 @@ void testFluentChaining() {
171172
.setTable(tableName)
172173
.setOption("key1", "value1")
173174
.setOptions(new HashMap<>())
174-
.setShuffleByBucketId(false);
175+
.setShuffleByBucketId(false)
176+
.setPartialUpdateColumns("id", "price");
175177

176178
// Verify the builder instance is returned
177179
assertThat(chainedBuilder).isInstanceOf(FlussSinkBuilder.class);
178180
}
179181

182+
@Test
183+
void testComputeTargetColumnIndexesFullUpdate() {
184+
int[] result =
185+
FlussSinkBuilder.computeTargetColumnIndexes(
186+
Arrays.asList("id", "name", "price"), Arrays.asList("id"), null);
187+
assertThat(result).isNull();
188+
}
189+
190+
@Test
191+
void testComputeTargetColumnIndexesValidPartialIncludesPk() {
192+
int[] result =
193+
FlussSinkBuilder.computeTargetColumnIndexes(
194+
Arrays.asList("id", "name", "price", "ts"),
195+
Arrays.asList("id"),
196+
Arrays.asList("id", "price"));
197+
assertThat(result).containsExactly(0, 2);
198+
}
199+
200+
@Test
201+
void testComputeTargetColumnIndexesMissingPkThrows() {
202+
assertThatThrownBy(
203+
() ->
204+
FlussSinkBuilder.computeTargetColumnIndexes(
205+
Arrays.asList("id", "name", "price"),
206+
Arrays.asList("id"),
207+
Arrays.asList("name", "price")))
208+
.isInstanceOf(IllegalArgumentException.class)
209+
.hasMessageContaining("Partial updates must include all primary key columns");
210+
}
211+
212+
@Test
213+
void testComputeTargetColumnIndexesUnknownColumnThrows() {
214+
assertThatThrownBy(
215+
() ->
216+
FlussSinkBuilder.computeTargetColumnIndexes(
217+
Arrays.asList("id", "name"),
218+
Arrays.asList("id"),
219+
Arrays.asList("id", "unknown")))
220+
.isInstanceOf(IllegalArgumentException.class)
221+
.hasMessageContaining("not found in table schema");
222+
}
223+
180224
// Helper method to get private field values using reflection
181225
@SuppressWarnings("unchecked")
182226
private <T> T getFieldValue(Object object, String fieldName) throws Exception {

0 commit comments

Comments
 (0)