Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 119 additions & 1 deletion conn_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (h *httpConnect) writeData(block *proto.Block) error {
return nil
}

func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location) (*proto.Block, error) {
func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location, captureBuffer *bytes.Buffer) (*proto.Block, error) {
location := h.handshake.Timezone
if timezone != nil {
location = timezone
Expand All @@ -433,12 +433,130 @@ func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location)
reader.EnableCompression()
defer reader.DisableCompression()
}

// Try to decode the block
if err := block.Decode(reader, h.revision); err != nil {
// Decode failed - check if captured data contains exception marker
// The decode error typically happens because it tries to read the
// "__exception__" marker as binary data

// Try to read any remaining data
lr := &limitedReader{reader: reader, limit: 100 * 1024}
buf := make([]byte, 4096)
for {
n, readErr := lr.Read(buf)
if n > 0 && captureBuffer != nil {
captureBuffer.Write(buf[:n])
}
if readErr != nil {
break
}
}

// Check if the captured data contains the exception marker
if captureBuffer != nil && bytes.Contains(captureBuffer.Bytes(), []byte("__exception__")) {
// This is an exception block, parse it
return nil, parseExceptionFromBytes(captureBuffer.Bytes())
}

// Not an exception, return the original decode error
return nil, fmt.Errorf("block decode: %w", err)
}
return &block, nil
}

// limitedReader is a helper to read from chproto.Reader up to a limit
type limitedReader struct {
reader *chproto.Reader
limit int64
read int64
}

func (lr *limitedReader) Read(p []byte) (n int, err error) {
if lr.read >= lr.limit {
return 0, io.EOF
}
if int64(len(p)) > lr.limit-lr.read {
p = p[0 : lr.limit-lr.read]
}
n, err = lr.reader.Read(p)
lr.read += int64(n)
return
}

// parseExceptionFromBytes parses ClickHouse exception block
// Format from ClickHouse server (WriteBufferFromHTTPServerResponse.cpp):
//
// \r\n
// __exception__
// \r\n
// <TAG> (16 bytes)
// \r\n
// <error message>
// \n
// <message_length> <TAG>
// \r\n
// __exception__
// \r\n
func parseExceptionFromBytes(data []byte) error {
dataStr := string(data)

// Verify exception marker exists
if !strings.Contains(dataStr, "__exception__") {
return fmt.Errorf("exception marker not found in response")
}

// Find the first __exception__ marker
firstMarker := strings.Index(dataStr, "__exception__")
if firstMarker < 0 {
return fmt.Errorf("exception marker not found")
}

// Skip past first __exception__\r\n
pos := firstMarker + len("__exception__")
// Skip \r\n after first marker
if pos+2 < len(dataStr) && dataStr[pos:pos+2] == "\r\n" {
pos += 2
}

// Skip the exception tag (16 bytes) + \r\n
if pos+16+2 < len(dataStr) {
pos += 16 + 2 // tag + \r\n
}

// Now we're at the start of the error message
// Find the second __exception__ marker
secondMarker := strings.Index(dataStr[pos:], "__exception__")
if secondMarker < 0 {
// If we can't find second marker, just extract what we can
errorMsg := strings.TrimSpace(dataStr[pos:])
if len(errorMsg) > 0 {
return fmt.Errorf("ClickHouse exception: %s", errorMsg)
}
return fmt.Errorf("ClickHouse exception occurred but could not parse details")
}

// Extract error message between tag and second marker
// The error message ends with: \n<message_length> <TAG>\r\n__exception__
errorContent := dataStr[pos : pos+secondMarker]

// Find the last line which contains "<message_length> <TAG>"
lines := strings.Split(strings.TrimRight(errorContent, "\r\n"), "\n")
if len(lines) == 0 {
return fmt.Errorf("ClickHouse exception occurred but could not parse details")
}

// The last line is "<message_length> <TAG>", error message is everything before it
errorMsg := strings.Join(lines[:len(lines)-1], "\n")
errorMsg = strings.TrimSpace(errorMsg)

if len(errorMsg) == 0 {
return fmt.Errorf("ClickHouse exception occurred but message is empty")
}

return fmt.Errorf("ClickHouse exception: %s", errorMsg)
}

func (h *httpConnect) sendStreamQuery(ctx context.Context, r io.Reader, options *QueryOptions, headers map[string]string) (*http.Response, error) {
req, err := h.createRequest(ctx, h.url.String(), r, options, headers)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions conn_http_exception_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package clickhouse

import (
"bytes"
"strings"
"testing"
)

func TestParseExceptionFromBytes(t *testing.T) {
tests := []struct {
name string
data []byte
expectError bool
errorMsg string
}{
{
name: "valid exception with complete format",
data: []byte("\r\n__exception__\r\n1234567890123456\r\nDB::Exception: Table default.test_table doesn't exist\n42 1234567890123456\r\n__exception__\r\n"),
expectError: true,
errorMsg: "DB::Exception: Table default.test_table doesn't exist",
},
{
name: "exception with multiline error message",
data: []byte("\r\n__exception__\r\n1234567890123456\r\nDB::Exception: Syntax error\nExpected identifier\n50 1234567890123456\r\n__exception__\r\n"),
expectError: true,
errorMsg: "DB::Exception: Syntax error\nExpected identifier",
},
{
name: "exception without second marker",
data: []byte("\r\n__exception__\r\n1234567890123456\r\nDB::Exception: Connection timeout"),
expectError: true,
errorMsg: "DB::Exception: Connection timeout",
},
{
name: "no exception marker",
data: []byte("some random data without exception marker"),
expectError: true,
errorMsg: "exception marker not found",
},
{
name: "exception marker only",
data: []byte("__exception__\r\n\r\n\r\n__exception__"),
expectError: true,
errorMsg: "ClickHouse exception occurred but message is empty",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := parseExceptionFromBytes(tt.data)

if !tt.expectError {
if err != nil {
t.Errorf("expected no error, got: %v", err)
}
return
}

if err == nil {
t.Error("expected error, got nil")
return
}

if !strings.Contains(err.Error(), tt.errorMsg) {
t.Errorf("expected error to contain '%s', got: %v", tt.errorMsg, err)
}
})
}
}

func TestCapturingReader(t *testing.T) {
tests := []struct {
name string
data string
readSize int
}{
{
name: "capture small data",
data: "test data",
readSize: 4,
},
{
name: "capture large data",
data: strings.Repeat("x", 1000),
readSize: 100,
},
{
name: "capture empty data",
data: "",
readSize: 10,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
buf := bytes.NewBufferString(tt.data)
cr := &capturingReader{reader: buf}

// Read data in chunks
chunk := make([]byte, tt.readSize)
totalRead := 0
for {
n, err := cr.Read(chunk)
totalRead += n
if err != nil {
break
}
}

// Verify that all data was captured
captured := cr.buffer.String()
if captured != tt.data {
t.Errorf("expected captured data to be %q, got %q", tt.data, captured)
}

if totalRead != len(tt.data) {
t.Errorf("expected to read %d bytes, got %d", len(tt.data), totalRead)
}
})
}
}
27 changes: 23 additions & 4 deletions conn_http_query.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clickhouse

import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -10,6 +12,20 @@ import (
"github.com/ClickHouse/clickhouse-go/v2/lib/proto"
)

// capturingReader wraps a reader and captures all data that passes through it
type capturingReader struct {
reader io.Reader
buffer bytes.Buffer
}

func (r *capturingReader) Read(p []byte) (n int, err error) {
n, err = r.reader.Read(p)
if n > 0 {
r.buffer.Write(p[:n])
}
return n, err
}

// release is ignored, because http used by std with empty release function
func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error) {
h.debugf("[http query] \"%s\"", query)
Expand Down Expand Up @@ -49,7 +65,6 @@ func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease,

rw := h.compressionPool.Get()
// The HTTPReaderWriter.NewReader will create a reader that will decompress it if needed,
// cause adding Accept-Encoding:gzip on your request means response won’t be automatically decompressed
reader, err := rw.NewReader(res)
if err != nil {
err = fmt.Errorf("NewReader: %w", err)
Expand All @@ -58,8 +73,12 @@ func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease,
release(h, err)
return nil, err
}
chReader := chproto.NewReader(reader)
block, err := h.readData(chReader, options.userLocation)

// Wrap reader with capturing reader to detect exceptions
capturingRdr := &capturingReader{reader: reader}
bufferedReader := bufio.NewReader(capturingRdr)
chReader := chproto.NewReader(bufferedReader)
block, err := h.readData(chReader, options.userLocation, &capturingRdr.buffer)
if err != nil && !errors.Is(err, io.EOF) {
err = fmt.Errorf("readData: %w", err)
discardAndClose(res.Body)
Expand All @@ -79,7 +98,7 @@ func (h *httpConnect) query(ctx context.Context, release nativeTransportRelease,
)
go func() {
for {
block, err := h.readData(chReader, options.userLocation)
block, err := h.readData(chReader, options.userLocation, &capturingRdr.buffer)
if err != nil {
// ch-go wraps EOF errors
if !errors.Is(err, io.EOF) {
Expand Down
48 changes: 48 additions & 0 deletions tests/http_exception_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package tests

import (
"context"
"testing"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestHTTPExceptionHandling(t *testing.T) {
conn, err := GetNativeConnection(t, clickhouse.HTTP, nil, nil, nil)
require.NoError(t, err)

ctx := context.Background()

// These settings will make sure mid-stream exception most likely on the server
ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{
"max_threads": 1,
"max_block_size": 1,
"http_write_exception_in_output_format": 0,
"wait_end_of_query": 0,
"http_response_buffer_size": 1,
}))

rows, err := conn.Query(ctx, `SELECT throwIf(number=3, 'there is an exception') FROM system.numbers`)
require.NoError(t, err) // query shouldn't fail with 500 status code.

occured := false
// query should fail while scanning the rows mid-stream
for rows.Next() {
var result uint8
err := rows.Scan(&result)
if err != nil {
// should be an exception caught correctly
assert.Contains(t, err.Error(), "there is an exception", "Expected exception message not caught")
occured = true
}
}

if err := rows.Err(); err != nil {
assert.Contains(t, err.Error(), "there is an exception", "Expected exception message not caught")
occured = true
}

assert.True(t, occured, "execption not caught in the response chunks")
}
1 change: 0 additions & 1 deletion tests/main_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

package tests

import (
Expand Down
Loading
Loading