Skip to content

Commit 5155769

Browse files
committed
增加IgnoreNullWhenUpdate
1 parent 8625aaf commit 5155769

File tree

12 files changed

+1448
-35
lines changed

12 files changed

+1448
-35
lines changed

flink-connector-jdbc-gaussdb/src/main/java/org/apache/flink/connector/jdbc/gaussdb/database/dialect/GaussdbDialectConverter.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,11 @@
2020

2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.connector.jdbc.core.database.dialect.AbstractDialectConverter;
23+
import org.apache.flink.connector.jdbc.gaussdb.table.GaussdbFieldObject;
24+
import org.apache.flink.connector.jdbc.statement.FieldNamedPreparedStatement;
2325
import org.apache.flink.table.data.GenericArrayData;
26+
import org.apache.flink.table.data.GenericRowData;
27+
import org.apache.flink.table.data.RowData;
2428
import org.apache.flink.table.types.logical.ArrayType;
2529
import org.apache.flink.table.types.logical.LogicalType;
2630
import org.apache.flink.table.types.logical.LogicalTypeRoot;
@@ -109,4 +113,20 @@ private JdbcDeserializationConverter createGaussDBArrayConverter(ArrayType array
109113
public String converterName() {
110114
return "Gaussdb";
111115
}
116+
117+
@Override
118+
public FieldNamedPreparedStatement toExternal(
119+
RowData rowData, FieldNamedPreparedStatement statement) throws SQLException {
120+
GenericRowData genericRowData = (GenericRowData) rowData;
121+
for (int index = 0; index < rowData.getArity(); index++) {
122+
Object field = genericRowData.getField(index);
123+
int sourceIndex = index;
124+
if (field != null && field instanceof GaussdbFieldObject) {
125+
sourceIndex = ((GaussdbFieldObject) field).getIndex();
126+
genericRowData.setField(index, ((GaussdbFieldObject) field).getField());
127+
}
128+
toExternalConverters[sourceIndex].serialize(genericRowData, index, statement);
129+
}
130+
return statement;
131+
}
112132
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.apache.flink.connector.jdbc.gaussdb.table;
2+
3+
import org.apache.flink.configuration.ConfigOption;
4+
import org.apache.flink.configuration.ConfigOptions;
5+
import org.apache.flink.connector.jdbc.core.table.JdbcConnectorOptions;
6+
7+
/**
8+
* Gaussdb connector options.
9+
*/
10+
public class GaussdbExtendConnectorOptions extends JdbcConnectorOptions {
11+
12+
public static final ConfigOption<Boolean> SINK_IGNORE_NULL_WHEN_UPDATE =
13+
ConfigOptions.key("sink.ignore-null-when-update")
14+
.booleanType()
15+
.defaultValue(false)
16+
.withDescription(
17+
"Whether to ignore null values when updating records. If true, null values will not be updated to the database.");
18+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.connector.jdbc.gaussdb.table;
19+
20+
import org.apache.flink.annotation.PublicEvolving;
21+
22+
import java.io.Serializable;
23+
import java.util.Objects;
24+
25+
/** JDBC sink batch options. */
26+
@PublicEvolving
27+
public class GaussdbExtendOptions implements Serializable {
28+
29+
private final boolean ignoreNullWhenUpdate;
30+
31+
private GaussdbExtendOptions(boolean ignoreNullWhenUpdate) {
32+
this.ignoreNullWhenUpdate = ignoreNullWhenUpdate;
33+
}
34+
35+
public boolean isIgnoreNullWhenUpdate() {
36+
return ignoreNullWhenUpdate;
37+
}
38+
39+
@Override
40+
public boolean equals(Object o) {
41+
if (this == o) {
42+
return true;
43+
}
44+
if (o == null || getClass() != o.getClass()) {
45+
return false;
46+
}
47+
GaussdbExtendOptions that = (GaussdbExtendOptions) o;
48+
return ignoreNullWhenUpdate == that.ignoreNullWhenUpdate;
49+
}
50+
51+
@Override
52+
public int hashCode() {
53+
return Objects.hash(ignoreNullWhenUpdate);
54+
}
55+
56+
public static Builder builder() {
57+
return new Builder();
58+
}
59+
60+
public static GaussdbExtendOptions defaults() {
61+
return builder().build();
62+
}
63+
64+
/** Builder for {@link GaussdbExtendOptions}. */
65+
@PublicEvolving
66+
public static final class Builder {
67+
private boolean ignoreNullWhenUpdate = false;
68+
69+
public Builder withIgnoreNullWhenUpdate(boolean ignoreNullWhenUpdate) {
70+
this.ignoreNullWhenUpdate = ignoreNullWhenUpdate;
71+
return this;
72+
}
73+
74+
public GaussdbExtendOptions build() {
75+
return new GaussdbExtendOptions(ignoreNullWhenUpdate);
76+
}
77+
}
78+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package org.apache.flink.connector.jdbc.gaussdb.table;
2+
3+
/**
4+
* GaussdbFieldObject.
5+
*/
6+
public class GaussdbFieldObject {
7+
private final Object field;
8+
private final int index;
9+
10+
public GaussdbFieldObject(int index, Object field) {
11+
this.index = index;
12+
this.field = field;
13+
}
14+
15+
public Object getField() {
16+
return field;
17+
}
18+
19+
public int getIndex() {
20+
return index;
21+
}
22+
}

0 commit comments

Comments
 (0)