Skip to content

Commit 7ff09a9

Browse files
committed
Merge remote-tracking branch 'origin/main'
2 parents 72e4153 + f834fa9 commit 7ff09a9

25 files changed

Lines changed: 316 additions & 56 deletions

SECURITY.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Security Policy
2+
This document provides guidelines for reporting security vulnerabilities in starrocks connector for flink.
3+
4+
## How to report a vulnerability?
5+
To ensure responsible disclosure, please avoid submitting security issues via public GitHub issues or pull requests.
6+
Instead, please report potential vulnerabilities by sending the details to [support@starrocks.com](mailto:support@starrocks.com).
7+

common.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ if ! ${MVN_CMD} --version; then
3232
fi
3333
export MVN_CMD
3434

35-
SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19")
35+
SUPPORTED_MINOR_VERSION=("1.15" "1.16" "1.17" "1.18" "1.19" "1.20")
3636
# version formats are different among flink versions
37-
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19")
37+
SUPPORTED_KAFKA_CONNECTOR_VERSION=("1.15.0" "1.16.0" "1.17.0" "3.0.1-1.18" "3.2.0-1.19" "3.4.0-1.20")
3838
VERSION_MESSAGE=$(IFS=, ; echo "${SUPPORTED_MINOR_VERSION[*]}")
3939

4040
function check_flink_version_supported() {

docs/content/connector-sink.md

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,13 @@ The Flink connector supports DataStream API, Table API & SQL, and Python API. It
1010
1111
## Version requirements
1212

13-
| Connector | Flink | StarRocks | Java | Scala |
14-
|-----------|--------------------------|---------------| ---- |-----------|
15-
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later| 8 | 2.11,2.12 |
16-
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later| 8 | 2.11,2.12 |
17-
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later| 8 | 2.11,2.12 |
18-
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later| 8 | 2.11,2.12 |
13+
| Connector | Flink | StarRocks | Java | Scala |
14+
|-----------|-------------------------------|---------------| ---- |-----------|
15+
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 and later| 8 | 2.11,2.12 |
16+
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later| 8 | 2.11,2.12 |
17+
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later| 8 | 2.11,2.12 |
18+
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later| 8 | 2.11,2.12 |
19+
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later| 8 | 2.11,2.12 |
1920

2021
## Obtain Flink connector
2122

@@ -113,7 +114,7 @@ In your Maven project's `pom.xml` file, add the Flink connector as a dependency
113114
| sink.properties.row_delimiter | No | \n | The row delimiter for CSV-formatted data. |
114115
| sink.properties.max_filter_ratio | No | 0 | The maximum error tolerance of the Stream Load. It's the maximum percentage of data records that can be filtered out due to inadequate data quality. Valid values: `0` to `1`. Default value: `0`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
115116
| sink.properties.strict_mode | No | false | Specifies whether to enable the strict mode for Stream Load. It affects the loading behavior when there are unqualified rows, such as inconsistent column values. Valid values: `true` and `false`. Default value: `false`. See [Stream Load](https://docs.starrocks.io/en-us/latest/sql-reference/sql-statements/data-manipulation/STREAM%20LOAD) for details. |
116-
| sink.properties.compression | No | NONE | Supported since 1.2.10. The compression algorithm used for Stream Load. Currently, compression is only supported for the JSON format. Valid values: `lz4_frame`. Compression for json format is supported only in StarRocks v3.2.7 and later. |
117+
| sink.properties.compression | No | NONE | The compression algorithm used for Stream Load. Valid values: `lz4_frame`. Compression for json format needs connector 1.2.10 and StarRocks v3.2.7 or later. Compression for csv format needs connector 1.2.11 and there is no requirements for StarRocks version. |
117118

118119
## Data type mapping between Flink and StarRocks
119120

docs/content/connector-source.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,13 @@ Unlike the JDBC connector provided by Flink, the Flink connector of StarRocks su
2020

2121
## Version requirements
2222

23-
| Connector | Flink | StarRocks | Java | Scala |
24-
|-----------|--------------------------|---------------| ---- |-----------|
25-
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later| 8 | 2.11,2.12 |
26-
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later| 8 | 2.11,2.12 |
27-
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later| 8 | 2.11,2.12 |
28-
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later| 8 | 2.11,2.12 |
23+
| Connector | Flink | StarRocks | Java | Scala |
24+
|-----------|-------------------------------|---------------| ---- |-----------|
25+
| 1.2.11 | 1.15,1.16,1.17,1.18,1.19,1.20 | 2.1 and later| 8 | 2.11,2.12 |
26+
| 1.2.10 | 1.15,1.16,1.17,1.18,1.19 | 2.1 and later| 8 | 2.11,2.12 |
27+
| 1.2.9 | 1.15,1.16,1.17,1.18 | 2.1 and later| 8 | 2.11,2.12 |
28+
| 1.2.8 | 1.13,1.14,1.15,1.16,1.17 | 2.1 and later| 8 | 2.11,2.12 |
29+
| 1.2.7 | 1.11,1.12,1.13,1.14,1.15 | 2.1 and later| 8 | 2.11,2.12 |
2930

3031
## Prerequisites
3132

pom.xml

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,16 @@ limitations under the License.
4646
<version>${srfc.version}_flink-${flink.minor.version}-SNAPSHOT</version>
4747

4848
<properties>
49-
<srfc.version>1.2.11</srfc.version>
49+
<srfc.version>1.2.12</srfc.version>
5050
<maven.compiler.source>1.8</maven.compiler.source>
5151
<maven.compiler.target>1.8</maven.compiler.target>
5252
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
5353
<file_encoding>UTF-8</file_encoding>
5454
<maven-enforcer-plugin.version>3.0.0-M3</maven-enforcer-plugin.version>
5555
<maven-surefire-plugin.version>3.0.0-M4</maven-surefire-plugin.version>
56-
<flink.minor.version>1.19</flink.minor.version>
57-
<flink.version>1.19.0</flink.version>
58-
<kafka.connector.version>3.2.0-1.19</kafka.connector.version>
56+
<flink.minor.version>1.20</flink.minor.version>
57+
<flink.version>1.20.0</flink.version>
58+
<kafka.connector.version>3.4.0-1.20</kafka.connector.version>
5959
<arrow.version>5.0.0</arrow.version>
6060
<kafka.version>2.8.1</kafka.version>
6161
<scala.binary.version>2.12</scala.binary.version>
@@ -438,6 +438,7 @@ limitations under the License.
438438
<exclude>docs/**</exclude>
439439
<exclude>README.md</exclude>
440440
<exclude>.github/**</exclude>
441+
<exclude>SECURITY.md</exclude>
441442
</excludes>
442443
</configuration>
443444
</plugin>

src/main/java/com/starrocks/connector/flink/manager/StarRocksQueryPlanVisitor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import java.util.HashSet;
4040
import java.util.List;
4141
import java.util.Map;
42+
import java.util.Optional;
4243
import java.util.Random;
4344
import java.util.Set;
4445

@@ -112,6 +113,8 @@ private static QueryPlan getQueryPlan(String querySQL, String httpNode, StarRock
112113
HttpPost post = new HttpPost(url);
113114
post.setHeader("Content-Type", "application/json;charset=UTF-8");
114115
post.setHeader("Authorization", getBasicAuthHeader(sourceOptions.getUsername(), sourceOptions.getPassword()));
116+
Optional.ofNullable(sourceOptions.getWarehouseName())
117+
.ifPresent(warehouse -> post.setHeader("warehouse", warehouse));
115118
post.setEntity(new ByteArrayEntity(body.getBytes()));
116119
try (CloseableHttpResponse response = httpClient.execute(post)) {
117120
requsetCode = response.getStatusLine().getStatusCode();

src/main/java/com/starrocks/connector/flink/manager/StarRocksStreamLoadVisitor.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,18 @@ private boolean tryHttpConnection(String host) {
231231
}
232232
}
233233

234-
private byte[] joinRows(List<byte[]> rows, int totalBytes) throws IOException {
235-
if (StarRocksSinkOptions.StreamLoadFormat.CSV.equals(sinkOptions.getStreamLoadFormat())) {
236-
byte[] lineDelimiter = StarRocksDelimiterParser.parse(sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
237-
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
238-
for (byte[] row : rows) {
239-
bos.put(row);
240-
bos.put(lineDelimiter);
241-
}
242-
return bos.array();
234+
private byte[] joinCsvRows(List<byte[]> rows, int totalBytes) {
235+
byte[] lineDelimiter = StarRocksDelimiterParser.parse(sinkOptions.getSinkStreamLoadProperties().get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
236+
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
237+
for (byte[] row : rows) {
238+
bos.put(row);
239+
bos.put(lineDelimiter);
243240
}
241+
return bos.array();
242+
}
244243

245-
if (StarRocksSinkOptions.StreamLoadFormat.JSON.equals(sinkOptions.getStreamLoadFormat())) {
244+
private byte[] joinJsonRows(List<byte[]> rows, int totalBytes) {
245+
if (!sinkOptions.isWrapJsonAsArray()) {
246246
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
247247
bos.put("[".getBytes(StandardCharsets.UTF_8));
248248
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
@@ -256,8 +256,24 @@ private byte[] joinRows(List<byte[]> rows, int totalBytes) throws IOException {
256256
}
257257
bos.put("]".getBytes(StandardCharsets.UTF_8));
258258
return bos.array();
259+
} else {
260+
ByteBuffer bos = ByteBuffer.allocate(totalBytes);
261+
for (byte[] row : rows) {
262+
bos.put(row);
263+
}
264+
return bos.array();
265+
}
266+
}
267+
268+
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
269+
switch (sinkOptions.getStreamLoadFormat()) {
270+
case CSV:
271+
return joinCsvRows(rows, totalBytes);
272+
case JSON:
273+
return joinJsonRows(rows, totalBytes);
274+
default:
275+
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
259276
}
260-
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
261277
}
262278

263279
@SuppressWarnings("unchecked")

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicSinkFunctionV2.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050

5151
import java.nio.charset.StandardCharsets;
5252
import java.util.ArrayList;
53-
import java.util.Arrays;
5453
import java.util.Collections;
5554
import java.util.List;
5655
import java.util.Map;
@@ -128,7 +127,7 @@ public void invoke(T value, Context context) throws Exception {
128127
|| Strings.isNullOrEmpty(data.getTable())
129128
|| data.getDataRows() == null) {
130129
log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
131-
data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows())));
130+
data.getDatabase(), data.getTable(), data.getDataRows() == null ? "null" : "Redacted"));
132131
return;
133132
}
134133
sinkManager.write(null, data.getDatabase(), data.getTable(), data.getDataRows());
@@ -139,7 +138,7 @@ public void invoke(T value, Context context) throws Exception {
139138
|| Strings.isNullOrEmpty(data.getTable())
140139
|| data.getRow() == null) {
141140
log.warn(String.format("json row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
142-
data.getDatabase(), data.getTable(), data.getRow()));
141+
data.getDatabase(), data.getTable(), data.getRow() == null ? "null" : "Redacted"));
143142
return;
144143
}
145144
sinkManager.write(data.getUniqueKey(), data.getDatabase(), data.getTable(), data.getRow());

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksDynamicTableSinkFactory.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public Set<ConfigOption<?>> optionalOptions() {
8888
optionalOptions.add(StarRocksSinkOptions.SINK_ABORT_CHECK_NUM_TXNS);
8989
optionalOptions.add(StarRocksSinkOptions.SINK_USE_NEW_SINK_API);
9090
optionalOptions.add(StarRocksSinkOptions.SINK_IGNORE_DELETE);
91+
optionalOptions.add(StarRocksSinkOptions.SINK_WRAP_JSON_AS_ARRAY);
9192
return optionalOptions;
9293
}
9394
}

src/main/java/com/starrocks/connector/flink/table/sink/StarRocksSinkOptions.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,11 @@ public enum StreamLoadFormat {
164164

165165
public static final ConfigOption<Integer> SINK_PARALLELISM = FactoryUtil.SINK_PARALLELISM;
166166

167+
public static final ConfigOption<Boolean> SINK_WRAP_JSON_AS_ARRAY = ConfigOptions.key("sink.wrap-json-as-array")
168+
.booleanType()
169+
.defaultValue(false)
170+
.withDescription("Whether wrap data as array or not.");
171+
167172
// Sink semantic
168173
private static final Set<String> SINK_SEMANTIC_ENUMS = Arrays.stream(StarRocksSinkSemantic.values()).map(s -> s.getName()).collect(Collectors.toSet());
169174
// wild stream load properties' prefix
@@ -386,6 +391,10 @@ public boolean isUseUnifiedSinkApi() {
386391
return tableOptions.get(SINK_USE_NEW_SINK_API);
387392
}
388393

394+
public boolean isWrapJsonAsArray() {
395+
return tableOptions.get(SINK_WRAP_JSON_AS_ARRAY);
396+
}
397+
389398
private void validateStreamLoadUrl() {
390399
tableOptions.getOptional(LOAD_URL).ifPresent(urlList -> {
391400
for (String host : urlList) {
@@ -557,9 +566,12 @@ public StreamLoadProperties getProperties(@Nullable StarRocksSinkTable table) {
557566
// By default, using json format should enable strip_outer_array and ignore_json_size,
558567
// which will simplify the configurations
559568
if (dataFormat instanceof StreamLoadDataFormat.JSONFormat) {
560-
if (!streamLoadProperties.containsKey("strip_outer_array")) {
569+
if (!streamLoadProperties.containsKey("strip_outer_array") || isWrapJsonAsArray()) {
570+
// When sink.wrap_json_as_array is enabled, strip_outer_array should be set to true as well.
571+
// Because users know the source data contains json array, and they need strip_outer_array.
561572
streamLoadProperties.put("strip_outer_array", "true");
562573
}
574+
563575
if (!streamLoadProperties.containsKey("ignore_json_size")) {
564576
streamLoadProperties.put("ignore_json_size", "true");
565577
}

0 commit comments

Comments
 (0)