Skip to content

Commit 26558ba

Browse files
committed
Added retry functionality and test (CSVDataOnRetryAndDropDataTest)
1 parent 5d3f1ae commit 26558ba

File tree

6 files changed

+148
-13
lines changed

6 files changed

+148
-13
lines changed

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,14 @@ public static void handleException(Throwable e) {
6767
case 425: // SYSTEM_ERROR
6868
case 999: // KEEPER_EXCEPTION
6969
throw new RetriableException(e);
70+
case 0:
71+
switch (clickHouseServerException.getTransportProtocolCode()) {
72+
case 400: // Bad request
73+
case 500: // Internal server error
74+
throw new RetriableException(e);
75+
default:
76+
LOG.error("Error code [{}] wasn't in the acceptable list. Transport protocol code [{}]", clickHouseServerException.getCode(), clickHouseServerException.getTransportProtocolCode());
77+
}
7078
default:
7179
LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseServerException.getCode());
7280
break;

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/data/ClickHousePayload.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,13 @@ public class ClickHousePayload implements Serializable {
99
private static final Logger LOG = LoggerFactory.getLogger(ClickHousePayload.class);
1010
private static final long serialVersionUID = 1L;
1111

12+
private int attemptCount = 1;
1213
private final byte[] payload;
1314
public ClickHousePayload(byte[] payload) {
1415
this.payload = payload;
1516
}
1617
public byte[] getPayload() { return payload; }
1718
public int getPayloadLength() { return payload.length; }
19+
public int getAttemptCount() { return attemptCount; }
20+
public void incrementAttempts() { attemptCount++; }
1821
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java

Lines changed: 61 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,22 @@
2222
import java.util.Collection;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.atomic.AtomicInteger;
2526

2627
public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, ClickHousePayload> {
2728
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class);
29+
private static final int DEFAULT_MAX_RETRIES = 3;
2830

2931
private final ClickHouseClientConfig clickHouseClientConfig;
3032
private ClickHouseFormat clickHouseFormat = null;
33+
private int numberOfRetries = DEFAULT_MAX_RETRIES;
3134

3235
private final Counter numBytesSendCounter;
3336
private final Counter numRecordsSendCounter;
3437
private final Counter numRequestSubmittedCounter;
3538
private final Counter numOfDroppedBatchesCounter;
3639
private final Counter numOfDroppedRecordsCounter;
40+
private final Counter totalBatchRetriesCounter;
3741

3842
public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
3943
WriterInitContext context,
@@ -43,28 +47,58 @@ public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> element
4347
long maxBatchSizeInBytes,
4448
long maxTimeInBufferMS,
4549
long maxRecordSizeInBytes,
50+
int numberOfRetries,
4651
ClickHouseClientConfig clickHouseClientConfig,
4752
ClickHouseFormat clickHouseFormat,
4853
Collection<BufferedRequestState<ClickHousePayload>> state) {
4954
super(elementConverter,
50-
context,
51-
AsyncSinkWriterConfiguration.builder()
52-
.setMaxBatchSize(maxBatchSize)
53-
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
54-
.setMaxInFlightRequests(maxInFlightRequests)
55-
.setMaxBufferedRequests(maxBufferedRequests)
56-
.setMaxTimeInBufferMS(maxTimeInBufferMS)
57-
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
58-
.build(),
59-
state);
55+
context,
56+
AsyncSinkWriterConfiguration.builder()
57+
.setMaxBatchSize(maxBatchSize)
58+
.setMaxBatchSizeInBytes(maxBatchSizeInBytes)
59+
.setMaxInFlightRequests(maxInFlightRequests)
60+
.setMaxBufferedRequests(maxBufferedRequests)
61+
.setMaxTimeInBufferMS(maxTimeInBufferMS)
62+
.setMaxRecordSizeInBytes(maxRecordSizeInBytes)
63+
.build(),
64+
state);
6065
this.clickHouseClientConfig = clickHouseClientConfig;
6166
this.clickHouseFormat = clickHouseFormat;
67+
this.numberOfRetries = numberOfRetries;
6268
final SinkWriterMetricGroup metricGroup = context.metricGroup();
6369
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
6470
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
6571
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
6672
this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches");
6773
this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords");
74+
this.totalBatchRetriesCounter = metricGroup.counter("totalBatchRetries");
75+
}
76+
77+
78+
public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
79+
WriterInitContext context,
80+
int maxBatchSize,
81+
int maxInFlightRequests,
82+
int maxBufferedRequests,
83+
long maxBatchSizeInBytes,
84+
long maxTimeInBufferMS,
85+
long maxRecordSizeInBytes,
86+
ClickHouseClientConfig clickHouseClientConfig,
87+
ClickHouseFormat clickHouseFormat,
88+
Collection<BufferedRequestState<ClickHousePayload>> state) {
89+
this(elementConverter,
90+
context,
91+
maxBatchSize,
92+
maxInFlightRequests,
93+
maxBufferedRequests,
94+
maxBatchSizeInBytes,
95+
maxTimeInBufferMS,
96+
maxRecordSizeInBytes,
97+
DEFAULT_MAX_RETRIES,
98+
clickHouseClientConfig,
99+
clickHouseFormat,
100+
state
101+
);
68102
}
69103

70104
@Override
@@ -141,13 +175,27 @@ private void handleFailedRequest(
141175
Utils.handleException(error);
142176
} catch (RetriableException e) {
143177
LOG.info("Retriable exception occurred while processing request. ", e);
144-
// TODO: send data again
145-
resultHandler.retryForEntries(requestEntries);
178+
// Let's try to retry
179+
if (requestEntries != null && !requestEntries.isEmpty()) {
180+
ClickHousePayload firstElement = requestEntries.get(0);
181+
LOG.warn("Retry number [{}] out of [{}]", firstElement.getAttemptCount(), this.numberOfRetries);
182+
firstElement.incrementAttempts();
183+
if (firstElement.getAttemptCount() <= this.numberOfRetries) {
184+
totalBatchRetriesCounter.inc();
185+
LOG.warn("Retriable exception occurred while processing request. Left attempts {}.", this.numberOfRetries - (firstElement.getAttemptCount() - 1) );
186+
// We are not in retry threshold we can send data again
187+
resultHandler.retryForEntries(requestEntries);
188+
return;
189+
} else {
190+
LOG.warn("No attempts left going to drop batch");
191+
}
192+
}
193+
146194
}
147195
LOG.info("Dropping request entries. Since It a failure that can not be retried. error {} number of entries drop {}", error.getLocalizedMessage(), requestEntries.size());
148196
numOfDroppedBatchesCounter.inc();
149197
numOfDroppedRecordsCounter.inc(requestEntries.size());
150-
resultHandler.completeExceptionally((Exception)error);
198+
resultHandler.completeExceptionally((Exception) error);
151199
}
152200

153201
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,10 @@ void SimplePOJODataTest() throws Exception {
245245
Assertions.assertEquals(EXPECTED_ROWS, rows);
246246
}
247247

248+
/**
249+
* Suppose to drop data on failure. The way we try to generate this use case is by supplying the writer with wrong Format
250+
* @throws Exception
251+
*/
248252
@Test
249253
void CSVDataOnFailureDropDataTest() throws Exception {
250254
String tableName = "csv_failure_covid";
@@ -302,4 +306,67 @@ void CSVDataOnFailureDropDataTest() throws Exception {
302306
int rows = executeAsyncJob(env, tableName);
303307
Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows);
304308
}
309+
310+
/**
311+
* Suppose to retry and drop data on failure. The way we try to generate this use case is by supplying a different port of ClickHouse server
312+
* @throws Exception
313+
*/
314+
@Test
315+
void CSVDataOnRetryAndDropDataTest() throws Exception {
316+
String tableName = "csv_retry_covid";
317+
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
318+
ClickHouseServerForTests.executeSql(dropTable);
319+
// create table
320+
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
321+
"date Date," +
322+
"location_key LowCardinality(String)," +
323+
"new_confirmed Int32," +
324+
"new_deceased Int32," +
325+
"new_recovered Int32," +
326+
"new_tested Int32," +
327+
"cumulative_confirmed Int32," +
328+
"cumulative_deceased Int32," +
329+
"cumulative_recovered Int32," +
330+
"cumulative_tested Int32" +
331+
") " +
332+
"ENGINE = MergeTree " +
333+
"ORDER BY (location_key, date); ";
334+
ClickHouseServerForTests.executeSql(tableSql);
335+
336+
final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment();
337+
env.setParallelism(STREAM_PARALLELISM);
338+
339+
340+
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getIncorrectServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
341+
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
342+
// create sink
343+
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
344+
convertorString,
345+
MAX_BATCH_SIZE,
346+
MAX_IN_FLIGHT_REQUESTS,
347+
MAX_BUFFERED_REQUESTS,
348+
MAX_BATCH_SIZE_IN_BYTES,
349+
MAX_TIME_IN_BUFFER_MS,
350+
MAX_RECORD_SIZE_IN_BYTES,
351+
clickHouseClientConfig
352+
);
353+
csvSink.setClickHouseFormat(ClickHouseFormat.CSV);
354+
355+
Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
356+
357+
FileSource<String> source = FileSource
358+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
359+
.build();
360+
// read csv data from file
361+
DataStreamSource<String> lines = env.fromSource(
362+
source,
363+
WatermarkStrategy.noWatermarks(),
364+
"GzipCsvSource"
365+
);
366+
lines.sinkTo(csvSink);
367+
// TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS
368+
int rows = executeAsyncJob(env, tableName);
369+
Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows);
370+
}
371+
305372
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/FlinkClusterTests.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ public static String getServerURL() {
2323
return ClickHouseServerForTests.getURL();
2424
}
2525

26+
public static String getIncorrectServerURL() {
27+
return ClickHouseServerForTests.getURL(ClickHouseServerForTests.getHost(), ClickHouseServerForTests.getPort() + 1);
28+
}
29+
2630
public static String getUsername() {
2731
return ClickHouseServerForTests.getUsername();
2832
}

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/test/embedded/clickhouse/ClickHouseServerForTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,12 @@ public static void tearDown() {
7878
public static int getPort() { return port; }
7979
public static String getUsername() { return username; }
8080
public static String getPassword() { return password; }
81+
8182
public static String getURL() {
83+
return ClickHouseServerForTests.getURL(host, port);
84+
}
85+
86+
public static String getURL(String host, int port) {
8287
if (isCloud) {
8388
return "https://" + host + ":" + port + "/";
8489
} else {

0 commit comments

Comments
 (0)