Skip to content

Commit 85d06d8

Browse files
committed
fixed parsing error in response
1 parent 9045eda commit 85d06d8

File tree

3 files changed

+119
-34
lines changed

3 files changed

+119
-34
lines changed

client-v2/src/main/java/com/clickhouse/client/api/internal/HttpAPIClientHelper.java

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,20 @@
5353

5454
import javax.net.ssl.SSLContext;
5555
import javax.net.ssl.SSLException;
56+
import java.io.BufferedReader;
5657
import java.io.ByteArrayOutputStream;
5758
import java.io.IOException;
59+
import java.io.InputStream;
60+
import java.io.InputStreamReader;
5861
import java.io.OutputStream;
5962
import java.net.ConnectException;
6063
import java.net.InetSocketAddress;
6164
import java.net.NoRouteToHostException;
6265
import java.net.URI;
6366
import java.net.URISyntaxException;
6467
import java.net.UnknownHostException;
68+
import java.nio.ByteBuffer;
69+
import java.nio.charset.StandardCharsets;
6570
import java.security.NoSuchAlgorithmException;
6671
import java.util.Base64;
6772
import java.util.EnumSet;
@@ -272,18 +277,47 @@ public CloseableHttpClient createHttpClient() {
272277
return clientBuilder.build();
273278
}
274279

280+
private static final String ERROR_CODE_PREFIX_PATTERN = "Code: %d. DB::Exception:";
275281
/**
276282
* Reads status line and if error tries to parse response body to get server error message.
277283
*
278284
* @param httpResponse - HTTP response
279285
* @return
280286
*/
281287
public Exception readError(ClassicHttpResponse httpResponse) {
282-
try (ByteArrayOutputStream out = new ByteArrayOutputStream(ERROR_BODY_BUFFER_SIZE)) {
283-
httpResponse.getEntity().writeTo(out);
284-
String message = out.toString();
285-
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
286-
return new ServerException(serverCode, message);
288+
int serverCode = getHeaderInt(httpResponse.getFirstHeader(ClickHouseHttpProto.HEADER_EXCEPTION_CODE), 0);
289+
try (InputStream body = httpResponse.getEntity().getContent()) {
290+
291+
byte [] buffer = new byte[ERROR_BODY_BUFFER_SIZE];
292+
byte [] lookUpStr = String.format(ERROR_CODE_PREFIX_PATTERN, serverCode).getBytes(StandardCharsets.UTF_8);
293+
while (true) {
294+
int rBytes = body.read(buffer);
295+
if (rBytes == -1) {
296+
break;
297+
} else {
298+
for (int i = 0; i < rBytes; i++) {
299+
if (buffer[i] == lookUpStr[0]) {
300+
boolean found = true;
301+
for (int j = 1; j < Math.min(rBytes - i, lookUpStr.length); j++) {
302+
if (buffer[i + j] != lookUpStr[j]) {
303+
found = false;
304+
break;
305+
}
306+
}
307+
if (found) {
308+
int start = i;
309+
while (i < rBytes && buffer[i] != '\n') {
310+
i++;
311+
}
312+
313+
return new ServerException(serverCode, new String(buffer, start, i -start, StandardCharsets.UTF_8));
314+
}
315+
}
316+
}
317+
}
318+
}
319+
320+
return new ServerException(serverCode, ERROR_CODE_PREFIX_PATTERN.formatted(serverCode) + " <Unreadable error message>");
287321
} catch (IOException e) {
288322
throw new ClientException("Failed to read response body", e);
289323
}
@@ -307,12 +341,13 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
307341
.build();
308342
req.setConfig(httpReqConfig);
309343
// setting entity. wrapping if compression is enabled
310-
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback)));
344+
req.setEntity(wrapEntity(new EntityTemplate(-1, CONTENT_TYPE, null, writeCallback), false));
311345

312346
HttpClientContext context = HttpClientContext.create();
313347

314348
try {
315349
ClassicHttpResponse httpResponse = httpClient.executeOpen(null, req, context);
350+
httpResponse.setEntity(wrapEntity(httpResponse.getEntity(), true));
316351
if (httpResponse.getCode() == HttpStatus.SC_PROXY_AUTHENTICATION_REQUIRED) {
317352
throw new ClientMisconfigurationException("Proxy authentication required. Please check your proxy settings.");
318353
} else if (httpResponse.getCode() >= HttpStatus.SC_BAD_REQUEST &&
@@ -329,8 +364,6 @@ public ClassicHttpResponse executeRequest(ClickHouseNode server, Map<String, Obj
329364
httpResponse.close();
330365
return httpResponse;
331366
}
332-
333-
httpResponse.setEntity(wrapEntity(httpResponse.getEntity()));
334367
return httpResponse;
335368

336369
} catch (UnknownHostException e) {
@@ -413,13 +446,13 @@ private void addQueryParams(URIBuilder req, Map<String, String> chConfig, Map<St
413446
}
414447
}
415448

416-
private HttpEntity wrapEntity(HttpEntity httpEntity) {
449+
private HttpEntity wrapEntity(HttpEntity httpEntity, boolean isResponse) {
417450
boolean serverCompression = chConfiguration.getOrDefault(ClickHouseClientOption.COMPRESS.getKey(), "false").equalsIgnoreCase("true");
418451
boolean clientCompression = chConfiguration.getOrDefault(ClickHouseClientOption.DECOMPRESS.getKey(), "false").equalsIgnoreCase("true");
419452
boolean useHttpCompression = chConfiguration.getOrDefault("client.use_http_compression", "false").equalsIgnoreCase("true");
420453
if (serverCompression || clientCompression) {
421454
return new LZ4Entity(httpEntity, useHttpCompression, serverCompression, clientCompression,
422-
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"));
455+
MapUtils.getInt(chConfiguration, "compression.lz4.uncompressed_buffer_size"), isResponse);
423456
} else {
424457
return httpEntity;
425458
}

client-v2/src/main/java/com/clickhouse/client/api/internal/LZ4Entity.java

Lines changed: 39 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -16,55 +16,70 @@
1616
class LZ4Entity implements HttpEntity {
1717

1818
private HttpEntity httpEntity;
19-
19+
2020
private final boolean useHttpCompression;
21-
private final boolean serverCompression;
22-
private final boolean clientCompression;
23-
21+
2422
private final int bufferSize;
2523

24+
private final boolean isResponse;
25+
26+
private boolean serverCompression;
27+
28+
private boolean clientCompression;
29+
2630
LZ4Entity(HttpEntity httpEntity, boolean useHttpCompression, boolean serverCompression, boolean clientCompression,
27-
int bufferSize) {
31+
int bufferSize, boolean isResponse) {
2832
this.httpEntity = httpEntity;
2933
this.useHttpCompression = useHttpCompression;
34+
this.bufferSize = bufferSize;
3035
this.serverCompression = serverCompression;
3136
this.clientCompression = clientCompression;
32-
this.bufferSize = bufferSize;
37+
this.isResponse = isResponse;
3338
}
34-
39+
3540
@Override
3641
public boolean isRepeatable() {
3742
return httpEntity.isRepeatable();
3843
}
3944

4045
@Override
4146
public InputStream getContent() throws IOException, UnsupportedOperationException {
42-
if (serverCompression && useHttpCompression) {
43-
InputStream content = httpEntity.getContent();
44-
try {
45-
return new FramedLZ4CompressorInputStream(content);
46-
} catch (IOException e) {
47-
// This is the easiest way to handle empty content because
48-
// - streams at this point wrapped with something else and we can't check content length
49-
// - exception is thrown with no details
50-
// So we just return original content and if there is a real data in it we will get error later
51-
return content;
52-
}
47+
if (!isResponse && serverCompression) {
48+
throw new UnsupportedOperationException("Unsupported: getting compressed content of request");
5349
} else if (serverCompression) {
54-
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
55-
bufferSize);
50+
if (useHttpCompression) {
51+
InputStream content = httpEntity.getContent();
52+
try {
53+
return new FramedLZ4CompressorInputStream(content);
54+
} catch (IOException e) {
55+
// This is the easiest way to handle empty content because
56+
// - streams at this point wrapped with something else and we can't check content length
57+
// - exception is thrown with no details
58+
// So we just return original content and if there is a real data in it we will get error later
59+
return content;
60+
}
61+
} else {
62+
return new ClickHouseLZ4InputStream(httpEntity.getContent(), LZ4Factory.fastestInstance().fastDecompressor(),
63+
bufferSize);
64+
}
5665
} else {
5766
return httpEntity.getContent();
5867
}
5968
}
6069

6170
@Override
6271
public void writeTo(OutputStream outStream) throws IOException {
63-
if (clientCompression && useHttpCompression) {
64-
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
72+
if (isResponse && serverCompression) {
73+
// called by us to get compressed response
74+
throw new UnsupportedOperationException("Unsupported: writing compressed response to elsewhere");
6575
} else if (clientCompression) {
66-
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
67-
bufferSize));
76+
// called by client to send data
77+
if (useHttpCompression) {
78+
httpEntity.writeTo(new FramedLZ4CompressorOutputStream(outStream));
79+
} else {
80+
httpEntity.writeTo(new ClickHouseLZ4OutputStream(outStream, LZ4Factory.fastestInstance().fastCompressor(),
81+
bufferSize));
82+
}
6883
} else {
6984
httpEntity.writeTo(outStream);
7085
}

client-v2/src/test/java/com/clickhouse/client/HttpTransportTests.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@
55
import com.clickhouse.client.api.ClientFaultCause;
66
import com.clickhouse.client.api.ConnectionInitiationException;
77
import com.clickhouse.client.api.ConnectionReuseStrategy;
8+
import com.clickhouse.client.api.ServerException;
9+
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
810
import com.clickhouse.client.api.enums.Protocol;
911
import com.clickhouse.client.api.enums.ProxyType;
1012
import com.clickhouse.client.api.insert.InsertResponse;
1113
import com.clickhouse.client.api.query.GenericRecord;
1214
import com.clickhouse.client.api.query.QueryResponse;
15+
import com.clickhouse.client.api.query.QuerySettings;
1316
import com.clickhouse.client.config.ClickHouseClientOption;
1417
import com.clickhouse.data.ClickHouseFormat;
1518
import com.github.tomakehurst.wiremock.WireMockServer;
@@ -304,4 +307,38 @@ public static Object[][] noResponseFailureProvider() {
304307
{selectBody, 0, queryFunction, true}
305308
};
306309
}
310+
311+
@Test(groups = { "integration" }, dataProvider = "testServerErrorHandlingDataProvider")
312+
public void testServerErrorHandling(ClickHouseFormat format) {
313+
ClickHouseNode server = getServer(ClickHouseProtocol.HTTP);
314+
try (Client client = new Client.Builder()
315+
.addEndpoint(server.getBaseUri())
316+
.setUsername("default")
317+
.setPassword("")
318+
.useNewImplementation(true)
319+
// TODO: fix in old client
320+
// .useNewImplementation(System.getProperty("client.tests.useNewImplementation", "false").equals("true"))
321+
.build()) {
322+
323+
QuerySettings querySettings = new QuerySettings().setFormat(format);
324+
try (QueryResponse response =
325+
client.query("SELECT invalid;statement", querySettings).get(1, TimeUnit.SECONDS)) {
326+
Assert.fail("Expected exception");
327+
} catch (ClientException e) {
328+
e.printStackTrace();
329+
ServerException serverException = (ServerException) e.getCause();
330+
Assert.assertEquals(serverException.getCode(), 62);
331+
Assert.assertTrue(serverException.getMessage().startsWith("Code: 62. DB::Exception: Syntax error (Multi-statements are not allowed): failed at position 15 (end of query)"),
332+
"Unexpected error message: " + serverException.getMessage());
333+
}
334+
} catch (Exception e) {
335+
e.printStackTrace();
336+
Assert.fail(e.getMessage(), e);
337+
}
338+
}
339+
340+
@DataProvider(name = "testServerErrorHandlingDataProvider")
341+
public static Object[] testServerErrorHandlingDataProvider() {
342+
return new Object[] { ClickHouseFormat.JSON, ClickHouseFormat.TabSeparated, ClickHouseFormat.RowBinary };
343+
}
307344
}

0 commit comments

Comments
 (0)