Skip to content

Commit 5d5cf40

Browse files
authored
support abort the job when sink failed with config (#111)
1 parent 655b72c commit 5d5cf40

File tree

6 files changed

+39
-6
lines changed

6 files changed

+39
-6
lines changed

connector/src/main/java/org.apache.flink/connector/nebula/options/ExecutionOptions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,24 @@ public class ExecutionOptions implements Serializable {
1818
private final int batchSize;
1919
private final int retryTimes;
2020
private final long intervalMs;
21+
private final boolean errorWhenFailed;
2122

2223
protected ExecutionOptions(String graphName,
2324
List<String> nebulaFields,
2425
List<String> flinkFields,
2526
WriteModeEnum writeMode,
2627
int batchSize,
2728
int retryTimes,
28-
long intervalMs) {
29+
long intervalMs,
30+
boolean errorWhenFailed) {
2931
this.graphName = graphName;
3032
this.nebulaFields = nebulaFields;
3133
this.flinkFields = flinkFields;
3234
this.writeMode = writeMode;
3335
this.batchSize = batchSize;
3436
this.retryTimes = retryTimes;
3537
this.intervalMs = intervalMs;
38+
this.errorWhenFailed = errorWhenFailed;
3639
}
3740

3841
public String getGraphName() {
@@ -62,4 +65,8 @@ public int getRetryTimes() {
6265
public long getIntervalMs() {
6366
return intervalMs;
6467
}
68+
69+
public boolean throwErrorWhenFailed() {
70+
return errorWhenFailed;
71+
}
6572
}

connector/src/main/java/org.apache.flink/connector/nebula/options/SinkEdgeOptions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.apache.flink.connector.nebula.options;
88

9+
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ERROR_WHEN_FAILED;
910
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_INTERVAL_MILLIS;
1011
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_RETRY_TIMES;
1112
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH_SIZE;
@@ -32,7 +33,8 @@ private SinkEdgeOptions(Builder builder) {
3233
builder.writeMode,
3334
builder.batchSize,
3435
builder.retryTimes,
35-
builder.intervalMs);
36+
builder.intervalMs,
37+
builder.errorWhenFailed);
3638
this.builder = builder;
3739
this.edgeType = builder.edgeType;
3840
this.nebulaSrcPks = builder.nebulaSrcPks;
@@ -84,6 +86,7 @@ public static final class Builder implements Serializable {
8486
private int batchSize = DEFAULT_WRITE_BATCH_SIZE;
8587
private int retryTimes = DEFAULT_RETRY_TIMES;
8688
private long intervalMs = DEFAULT_INTERVAL_MILLIS;
89+
private boolean errorWhenFailed = DEFAULT_ERROR_WHEN_FAILED;
8790

8891

8992
public Builder withGraphName(String graphName) {
@@ -154,6 +157,10 @@ public Builder withIntervalMs(long intervalMs) {
154157
return this;
155158
}
156159

160+
public Builder withErrorWhenFailed(boolean errorWhenFailed) {
161+
this.errorWhenFailed = errorWhenFailed;
162+
return this;
163+
}
157164

158165
private void check() {
159166
if (nebulaFields.size() != flinkFields.size()) {

connector/src/main/java/org.apache.flink/connector/nebula/options/SinkNodeOptions.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
package org.apache.flink.connector.nebula.options;
88

9+
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_ERROR_WHEN_FAILED;
910
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_INTERVAL_MILLIS;
1011
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_RETRY_TIMES;
1112
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DEFAULT_WRITE_BATCH_SIZE;
@@ -22,7 +23,8 @@ public class SinkNodeOptions extends ExecutionOptions {
2223

2324
private SinkNodeOptions(Builder builder) {
2425
super(builder.graphName, builder.nebulaFields, builder.flinkFields,
25-
builder.writeMode, builder.batchSize, builder.retryTimes, builder.intervalMs);
26+
builder.writeMode, builder.batchSize, builder.retryTimes, builder.intervalMs,
27+
builder.errorWhenFailed);
2628
this.builder = builder;
2729
this.nodeType = builder.nodeType;
2830
}
@@ -49,7 +51,8 @@ public static final class Builder implements Serializable {
4951
private WriteModeEnum writeMode = WriteModeEnum.INSERTREPLACE;
5052
private int batchSize = DEFAULT_WRITE_BATCH_SIZE;
5153

52-
private int retryTimes = DEFAULT_RETRY_TIMES;
54+
private int retryTimes = DEFAULT_RETRY_TIMES;
55+
private boolean errorWhenFailed = DEFAULT_ERROR_WHEN_FAILED;
5356

5457
public Builder withGraphName(String graphName) {
5558
this.graphName = graphName;
@@ -91,6 +94,11 @@ public Builder withIntervalMs(long intervalMs) {
9194
return this;
9295
}
9396

97+
public Builder withErrorWhenFailed(boolean errorWhenFailed) {
98+
this.errorWhenFailed = errorWhenFailed;
99+
return this;
100+
}
101+
94102
private void check() {
95103
if (nebulaFields.size() != flinkFields.size()) {
96104
throw new IllegalArgumentException(

connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaEdgeBatchExecutor.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
package org.apache.flink.connector.nebula.sink;
88

99

10-
11-
1210
import com.vesoft.nebula.driver.graph.data.ResultSet;
1311
import java.util.ArrayList;
1412
import java.util.List;
@@ -104,6 +102,9 @@ public String executeBatch(GraphProvider graphProvider) {
104102
end = System.currentTimeMillis();
105103
} catch (Exception e) {
106104
LOG.error(">>>>>> write data error, ", e);
105+
if (executionOptions.throwErrorWhenFailed()) {
106+
throw new RuntimeException("write edge failed", e);
107+
}
107108
nebulaEdgeList.clear();
108109
return statement;
109110
}
@@ -116,6 +117,9 @@ public String executeBatch(GraphProvider graphProvider) {
116117
} else {
117118
LOG.error(">>>>> write edge failed: {}", execResult.getErrorMessage());
118119
LOG.error(">>>>> failed gql: {}", statement);
120+
if (executionOptions.throwErrorWhenFailed()) {
121+
throw new RuntimeException("write edge failed:" + execResult.getErrorMessage());
122+
}
119123
nebulaEdgeList.clear();
120124
return statement;
121125
}

connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaNodeBatchExecutor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,9 @@ public String executeBatch(GraphProvider graphProvider) {
9393
end = System.currentTimeMillis();
9494
} catch (Exception e) {
9595
LOG.error("write data error, ", e);
96+
if (executionOptions.throwErrorWhenFailed()) {
97+
throw new RuntimeException("write node failed", e);
98+
}
9699
nebulaVertexList.clear();
97100
return statement;
98101
}
@@ -105,6 +108,9 @@ public String executeBatch(GraphProvider graphProvider) {
105108
} else {
106109
LOG.error(">>>>> write data failed: {}", execResult.getErrorMessage());
107110
LOG.error(">>>>> failed gql: {}", statement);
111+
if (executionOptions.throwErrorWhenFailed()) {
112+
throw new RuntimeException("write node failed:" + execResult.getErrorMessage());
113+
}
108114
nebulaVertexList.clear();
109115
return statement;
110116
}

connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class NebulaConstant {
2626
public static final long DEFAULT_INTERVAL_MILLIS = 0;
2727

2828
public static final int DEFAULT_RETRY_TIMES = 0;
29+
public static final boolean DEFAULT_ERROR_WHEN_FAILED = false;
2930

3031
// default value for connection
3132
public static final int DEFAULT_CONNECTION_TIMEOUT_MS = 3000;

0 commit comments

Comments
 (0)