Skip to content

Commit 5d3f1ae

Browse files
committed
Add Failure Drop data test
1 parent cbc7c8a commit 5d3f1ae

File tree

3 files changed

+82
-19
lines changed

3 files changed

+82
-19
lines changed

flink-connector-clickhouse-base/src/main/java/com/clickhouse/utils/Utils.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
package com.clickhouse.utils;
22

3-
import com.clickhouse.client.ClickHouseException;
3+
import com.clickhouse.client.api.ServerException;
44
import org.apache.flink.connector.clickhouse.exception.RetriableException;
5-
import org.apache.flink.connector.clickhouse.sink.ClickHouseAsyncWriter;
65
import org.slf4j.Logger;
76
import org.slf4j.LoggerFactory;
87

98
import java.io.IOException;
109
import java.net.SocketTimeoutException;
1110
import java.net.UnknownHostException;
12-
import java.util.Collection;
1311

1412
public class Utils {
1513

@@ -18,18 +16,18 @@ public class Utils {
1816
private static final String CLICKHOUSE_CLIENT_ERROR_WRITE_TIMEOUT_MSG = "Write timed out after";
1917

2018
/**
21-
* This will drill down to the first ClickHouseException in the exception chain
19+
* This will drill down to the first ServerException in the exception chain
2220
*
2321
* @param e Exception to drill down
24-
* @return ClickHouseException or null if none found
22+
* @return ServerException or null if none found
2523
*/
26-
public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseException) {
24+
public static Exception getRootCause(Throwable e, Boolean prioritizeServerException) {
2725
if (e == null)
2826
return null;
2927

3028
Throwable runningException = e;//We have to use Throwable because of the getCause() signature
3129
while (runningException.getCause() != null &&
32-
(!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) {
30+
(!prioritizeServerException || !(runningException instanceof ServerException))) {
3331
LOG.trace("Found exception: {}", runningException.getLocalizedMessage());
3432
runningException = runningException.getCause();
3533
}
@@ -46,13 +44,13 @@ public static Exception getRootCause(Throwable e, Boolean prioritizeClickHouseEx
4644
public static void handleException(Throwable e) {
4745
LOG.warn("Deciding how to handle exception: {}", e.getLocalizedMessage());
4846

49-
//Let's check if we have a ClickHouseException to reference the error code
47+
//Let's check if we have a ServerException to reference the error code
5048
//https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/ErrorCodes.cpp
5149
Exception rootCause = Utils.getRootCause(e, true);
52-
if (rootCause instanceof ClickHouseException) {
53-
ClickHouseException clickHouseException = (ClickHouseException) rootCause;
54-
LOG.warn("ClickHouseException code: {}", clickHouseException.getErrorCode());
55-
switch (clickHouseException.getErrorCode()) {
50+
if (rootCause instanceof ServerException) {
51+
ServerException clickHouseServerException = (ServerException) rootCause;
52+
LOG.warn("ClickHouse Server Exception Code: {}", clickHouseServerException.getCode());
53+
switch (clickHouseServerException.getCode()) {
5654
case 3: // UNEXPECTED_END_OF_FILE
5755
case 107: // FILE_DOESNT_EXIST
5856
case 159: // TIMEOUT_EXCEEDED
@@ -70,7 +68,7 @@ public static void handleException(Throwable e) {
7068
case 999: // KEEPER_EXCEPTION
7169
throw new RetriableException(e);
7270
default:
73-
LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseException.getErrorCode());
71+
LOG.error("Error code [{}] wasn't in the acceptable list.", clickHouseServerException.getCode());
7472
break;
7573
}
7674
}

flink-connector-clickhouse-base/src/main/java/org/apache/flink/connector/clickhouse/sink/ClickHouseAsyncWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.Collection;
2323
import java.util.List;
2424
import java.util.concurrent.CompletableFuture;
25-
import java.util.concurrent.atomic.AtomicInteger;
2625

2726
public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, ClickHousePayload> {
2827
private static final Logger LOG = LoggerFactory.getLogger(ClickHouseAsyncWriter.class);
@@ -33,6 +32,8 @@ public class ClickHouseAsyncWriter<InputT> extends AsyncSinkWriter<InputT, Click
3332
private final Counter numBytesSendCounter;
3433
private final Counter numRecordsSendCounter;
3534
private final Counter numRequestSubmittedCounter;
35+
private final Counter numOfDroppedBatchesCounter;
36+
private final Counter numOfDroppedRecordsCounter;
3637

3738
public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> elementConverter,
3839
WriterInitContext context,
@@ -62,6 +63,8 @@ public ClickHouseAsyncWriter(ElementConverter<InputT, ClickHousePayload> element
6263
this.numBytesSendCounter = metricGroup.getNumBytesSendCounter();
6364
this.numRecordsSendCounter = metricGroup.getNumRecordsSendCounter();
6465
this.numRequestSubmittedCounter = metricGroup.counter("numRequestSubmitted");
66+
this.numOfDroppedBatchesCounter = metricGroup.counter("numOfDroppedBatches");
67+
this.numOfDroppedRecordsCounter = metricGroup.counter("numOfDroppedRecords");
6568
}
6669

6770
@Override
@@ -141,7 +144,9 @@ private void handleFailedRequest(
141144
// TODO: send data again
142145
resultHandler.retryForEntries(requestEntries);
143146
}
144-
LOG.info("completeExceptionally");
147+
LOG.info("Dropping request entries. Since It a failure that can not be retried. error {} number of entries drop {}", error.getLocalizedMessage(), requestEntries.size());
148+
numOfDroppedBatchesCounter.inc();
149+
numOfDroppedRecordsCounter.inc(requestEntries.size());
145150
resultHandler.completeExceptionally((Exception)error);
146151
}
147152

flink-connector-clickhouse-base/src/test/java/org/apache/flink/connector/clickhouse/sink/ClickHouseSinkTests.java

Lines changed: 64 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.clickhouse.client.api.metadata.TableSchema;
44
import com.clickhouse.data.ClickHouseFormat;
5+
import org.apache.flink.api.common.JobExecutionResult;
56
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
67
import org.apache.flink.api.common.functions.MapFunction;
78
import org.apache.flink.connector.base.sink.writer.ElementConverter;
@@ -31,6 +32,7 @@
3132
public class ClickHouseSinkTests extends FlinkClusterTests {
3233

3334
static final int EXPECTED_ROWS = 10000;
35+
static final int EXPECTED_ROWS_ON_FAILURE = 0;
3436
static final int MAX_BATCH_SIZE = 5000;
3537
static final int MAX_IN_FLIGHT_REQUESTS = 2;
3638
static final int MAX_BUFFERED_REQUESTS = 20000;
@@ -40,7 +42,7 @@ public class ClickHouseSinkTests extends FlinkClusterTests {
4042

4143
static final int STREAM_PARALLELISM = 5;
4244

43-
private int executeJob(StreamExecutionEnvironment env, String tableName) throws Exception {
45+
private int executeAsyncJob(StreamExecutionEnvironment env, String tableName) throws Exception {
4446
JobClient jobClient = env.executeAsync("Read GZipped CSV with FileSource");
4547
int rows = 0;
4648
int iterations = 0;
@@ -109,7 +111,7 @@ void CSVDataTest() throws Exception {
109111
"GzipCsvSource"
110112
);
111113
lines.sinkTo(csvSink);
112-
int rows = executeJob(env, tableName);
114+
int rows = executeAsyncJob(env, tableName);
113115
Assertions.assertEquals(EXPECTED_ROWS, rows);
114116
}
115117

@@ -178,7 +180,7 @@ public CovidPOJO map(String value) throws Exception {
178180
});
179181
// send to a sink
180182
covidPOJOs.sinkTo(covidPOJOSink);
181-
int rows = executeJob(env, tableName);
183+
int rows = executeAsyncJob(env, tableName);
182184
Assertions.assertEquals(EXPECTED_ROWS, rows);
183185
}
184186

@@ -239,7 +241,65 @@ void SimplePOJODataTest() throws Exception {
239241
DataStream<SimplePOJO> simplePOJOs = env.fromData(simplePOJOList.toArray(new SimplePOJO[0]));
240242
// send to a sink
241243
simplePOJOs.sinkTo(simplePOJOSink);
242-
int rows = executeJob(env, tableName);
244+
int rows = executeAsyncJob(env, tableName);
243245
Assertions.assertEquals(EXPECTED_ROWS, rows);
244246
}
247+
248+
@Test
249+
void CSVDataOnFailureDropDataTest() throws Exception {
250+
String tableName = "csv_failure_covid";
251+
String dropTable = String.format("DROP TABLE IF EXISTS `%s`.`%s`", getDatabase(), tableName);
252+
ClickHouseServerForTests.executeSql(dropTable);
253+
// create table
254+
String tableSql = "CREATE TABLE `" + getDatabase() + "`.`" + tableName + "` (" +
255+
"date Date," +
256+
"location_key LowCardinality(String)," +
257+
"new_confirmed Int32," +
258+
"new_deceased Int32," +
259+
"new_recovered Int32," +
260+
"new_tested Int32," +
261+
"cumulative_confirmed Int32," +
262+
"cumulative_deceased Int32," +
263+
"cumulative_recovered Int32," +
264+
"cumulative_tested Int32" +
265+
") " +
266+
"ENGINE = MergeTree " +
267+
"ORDER BY (location_key, date); ";
268+
ClickHouseServerForTests.executeSql(tableSql);
269+
270+
final StreamExecutionEnvironment env = EmbeddedFlinkClusterForTests.getMiniCluster().getTestStreamEnvironment();
271+
env.setParallelism(STREAM_PARALLELISM);
272+
273+
274+
ClickHouseClientConfig clickHouseClientConfig = new ClickHouseClientConfig(getServerURL(), getUsername(), getPassword(), getDatabase(), tableName);
275+
ElementConverter<String, ClickHousePayload> convertorString = new ClickHouseConvertor<>(String.class);
276+
// create sink
277+
ClickHouseAsyncSink<String> csvSink = new ClickHouseAsyncSink<>(
278+
convertorString,
279+
MAX_BATCH_SIZE,
280+
MAX_IN_FLIGHT_REQUESTS,
281+
MAX_BUFFERED_REQUESTS,
282+
MAX_BATCH_SIZE_IN_BYTES,
283+
MAX_TIME_IN_BUFFER_MS,
284+
MAX_RECORD_SIZE_IN_BYTES,
285+
clickHouseClientConfig
286+
);
287+
csvSink.setClickHouseFormat(ClickHouseFormat.TSV);
288+
289+
Path filePath = new Path("./src/test/resources/epidemiology_top_10000.csv.gz");
290+
291+
FileSource<String> source = FileSource
292+
.forRecordStreamFormat(new TextLineInputFormat(), filePath)
293+
.build();
294+
// read csv data from file
295+
DataStreamSource<String> lines = env.fromSource(
296+
source,
297+
WatermarkStrategy.noWatermarks(),
298+
"GzipCsvSource"
299+
);
300+
lines.sinkTo(csvSink);
301+
// TODO: make the test smarter by checking the counter of numOfDroppedRecords equals EXPECTED_ROWS
302+
int rows = executeAsyncJob(env, tableName);
303+
Assertions.assertEquals(EXPECTED_ROWS_ON_FAILURE, rows);
304+
}
245305
}

0 commit comments

Comments
 (0)