Skip to content

Commit dbc31b8

Browse files
committed
[Improve] doris datastream-connector code style improvement
1 parent ecb01a1 commit dbc31b8

File tree

3 files changed

+27
-22
lines changed

3 files changed

+27
-22
lines changed

streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,25 +39,25 @@
3939
import org.slf4j.Logger;
4040
import org.slf4j.LoggerFactory;
4141

42+
import java.util.Arrays;
4243
import java.util.Map;
4344
import java.util.Properties;
4445

4546
/** DorisSinkFunction */
4647
public class DorisSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
4748

4849
private static final Logger LOGGER = LoggerFactory.getLogger(DorisSinkFunction.class);
49-
private final Properties properties;
5050
private final DorisSinkWriter dorisSinkWriter;
5151
private final DorisConfig dorisConfig;
5252
// state only works with `EXACTLY_ONCE`
53-
private transient ListState<Map<String, DorisSinkBufferEntry>> checkpointedState;
53+
private transient ListState<Map<String, DorisSinkBufferEntry>> checkpointState;
5454
private transient Counter totalInvokeRowsTime;
5555
private transient Counter totalInvokeRows;
5656
private static final String COUNTER_INVOKE_ROWS_COST_TIME = "totalInvokeRowsTimeNs";
5757
private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";
5858

5959
public DorisSinkFunction(StreamingContext context) {
60-
this.properties = context.parameter().getProperties();
60+
Properties properties = context.parameter().getProperties();
6161
this.dorisConfig = new DorisConfig(properties);
6262
this.dorisSinkWriter = new DorisSinkWriter(dorisConfig);
6363
}
@@ -83,8 +83,8 @@ public void invoke(T value, SinkFunction.Context context) throws Exception {
8383
|| null == data.getDataRows()) {
8484
LOGGER.warn(
8585
String.format(
86-
" row data not fullfilled. {database: %s, table: %s, dataRows: %s}",
87-
data.getDatabase(), data.getTable(), data.getDataRows()));
86+
" row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
87+
data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows())));
8888
return;
8989
}
9090
dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows());
@@ -110,7 +110,7 @@ public void close() throws Exception {
110110
public void snapshotState(FunctionSnapshotContext context) throws Exception {
111111
if (Semantic.EXACTLY_ONCE.equals(Semantic.of(dorisConfig.semantic()))) {
112112
// save state
113-
checkpointedState.add(dorisSinkWriter.getBufferedBatchMap());
113+
checkpointState.add(dorisSinkWriter.getBufferedBatchMap());
114114
flushPreviousState();
115115
}
116116
}
@@ -122,16 +122,16 @@ public void initializeState(FunctionInitializationContext context) throws Except
122122
new ListStateDescriptor<>(
123123
"buffered-rows",
124124
TypeInformation.of(new TypeHint<Map<String, DorisSinkBufferEntry>>() {}));
125-
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
125+
checkpointState = context.getOperatorStateStore().getListState(descriptor);
126126
}
127127
}
128128

129129
private void flushPreviousState() throws Exception {
130130
// flush the batch saved at the previous checkpoint
131-
for (Map<String, DorisSinkBufferEntry> state : checkpointedState.get()) {
131+
for (Map<String, DorisSinkBufferEntry> state : checkpointState.get()) {
132132
dorisSinkWriter.setBufferedBatchMap(state);
133133
dorisSinkWriter.flush(null, true);
134134
}
135-
checkpointedState.clear();
135+
checkpointState.clear();
136136
}
137137
}

streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,6 @@ public class DorisSinkWriter implements Serializable {
5959
private transient Counter totalFlushFailedTimes;
6060

6161
private final Map<String, DorisSinkBufferEntry> bufferMap = new ConcurrentHashMap<>();
62-
private final Long timeout = 3000L;
6362
private volatile boolean closed = false;
6463
private volatile boolean flushThreadAlive = false;
6564
private volatile Throwable flushException;
@@ -239,7 +238,8 @@ private void offer(DorisSinkBufferEntry bufferEntity) throws InterruptedExceptio
239238
}
240239

241240
private boolean asyncFlush() throws Exception {
242-
final DorisSinkBufferEntry flushData = flushQueue.poll(timeout, TimeUnit.MILLISECONDS);
241+
long timeOut = 3000L;
242+
final DorisSinkBufferEntry flushData = flushQueue.poll(timeOut, TimeUnit.MILLISECONDS);
243243
if (flushData == null || flushData.getBatchCount() == 0) {
244244
return true;
245245
}
@@ -311,13 +311,13 @@ public synchronized void close() throws Exception {
311311
private void checkFlushException() {
312312
if (flushException != null) {
313313
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
314-
for (int i = 0; i < stack.length; i++) {
314+
for (StackTraceElement stackTraceElement : stack) {
315315
LOG.info(
316-
stack[i].getClassName()
316+
stackTraceElement.getClassName()
317317
+ "."
318-
+ stack[i].getMethodName()
318+
+ stackTraceElement.getMethodName()
319319
+ " line:"
320-
+ stack[i].getLineNumber());
320+
+ stackTraceElement.getLineNumber());
321321
}
322322
throw new RuntimeException("Writing records to doris failed.", flushException);
323323
}

streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,22 @@ public class DorisDelimiterParser {
2424
private static final String HEX_STRING = "0123456789ABCDEF";
2525

2626
public static String parse(String sp) throws RuntimeException {
27-
if (sp == null || sp.length() == 0) {
27+
if (sp == null || sp.isEmpty()) {
2828
throw new RuntimeException("Delimiter can't be empty");
2929
}
3030
if (!sp.toUpperCase().startsWith("\\X")) {
3131
return sp;
3232
}
33+
String hexStr = getString(sp);
34+
// transform to separator
35+
StringWriter writer = new StringWriter();
36+
for (byte b : hexStrToBytes(hexStr)) {
37+
writer.append((char) b);
38+
}
39+
return writer.toString();
40+
}
41+
42+
private static String getString(String sp) {
3343
String hexStr = sp.substring(2);
3444
// check hex str
3545
if (hexStr.isEmpty()) {
@@ -43,12 +53,7 @@ public static String parse(String sp) throws RuntimeException {
4353
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
4454
}
4555
}
46-
// transform to separator
47-
StringWriter writer = new StringWriter();
48-
for (byte b : hexStrToBytes(hexStr)) {
49-
writer.append((char) b);
50-
}
51-
return writer.toString();
56+
return hexStr;
5257
}
5358

5459
private static byte[] hexStrToBytes(String hexStr) {

0 commit comments

Comments
 (0)