Skip to content

Commit f5cc0a8

Browse files
Merge pull request #1578 from ClickHouse/fix_http_400
Fix Native API HTTP bugs
2 parents 559159e + bf82c41 commit f5cc0a8

File tree

10 files changed

+250
-83
lines changed

10 files changed

+250
-83
lines changed

clickhouse.go

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ func (ch *clickhouse) Query(ctx context.Context, query string, args ...any) (row
150150
if err != nil {
151151
return nil, err
152152
}
153-
conn.debugf("[acquired] connection [%d]", conn.connID())
153+
conn.debugf("[query] \"%s\"", query)
154154
return conn.query(ctx, ch.release, query, args...)
155155
}
156156

@@ -161,7 +161,8 @@ func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...any) d
161161
err: err,
162162
}
163163
}
164-
conn.debugf("[acquired] connection [%d]", conn.connID())
164+
165+
conn.debugf("[query row] \"%s\"", query)
165166
return conn.queryRow(ctx, ch.release, query, args...)
166167
}
167168

@@ -170,8 +171,7 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
170171
if err != nil {
171172
return err
172173
}
173-
conn.debugf("[acquired] connection [%d]", conn.connID())
174-
174+
conn.debugf("[exec] \"%s\"", query)
175175
if err := conn.exec(ctx, query, args...); err != nil {
176176
ch.release(conn, err)
177177
return err
@@ -185,6 +185,7 @@ func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...dr
185185
if err != nil {
186186
return nil, err
187187
}
188+
conn.debugf("[prepare batch] \"%s\"", query)
188189
batch, err := conn.prepareBatch(ctx, ch.release, ch.acquire, query, getPrepareBatchOptions(opts...))
189190
if err != nil {
190191
return nil, err
@@ -207,6 +208,7 @@ func (ch *clickhouse) AsyncInsert(ctx context.Context, query string, wait bool,
207208
if err != nil {
208209
return err
209210
}
211+
conn.debugf("[async insert] \"%s\"", query)
210212
if err := conn.asyncInsert(ctx, query, wait, args...); err != nil {
211213
ch.release(conn, err)
212214
return err
@@ -220,6 +222,7 @@ func (ch *clickhouse) Ping(ctx context.Context) (err error) {
220222
if err != nil {
221223
return err
222224
}
225+
conn.debugf("[ping]")
223226
if err := conn.ping(ctx); err != nil {
224227
ch.release(conn, err)
225228
return err
@@ -324,6 +327,7 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn nativeTransport, err er
324327
}
325328
}
326329
conn.setReleased(false)
330+
conn.debugf("[acquired from pool]")
327331
return conn, nil
328332
default:
329333
}
@@ -334,6 +338,7 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn nativeTransport, err er
334338
}
335339
return nil, err
336340
}
341+
conn.debugf("[acquired new]")
337342
return conn, nil
338343
}
339344

@@ -377,31 +382,47 @@ func (ch *clickhouse) release(conn nativeTransport, err error) {
377382
return
378383
}
379384
conn.setReleased(true)
380-
conn.debugf("[released] connection [%d]", conn.connID())
385+
386+
if err != nil {
387+
conn.debugf("[released with error]")
388+
} else {
389+
conn.debugf("[released]")
390+
}
381391

382392
select {
383393
case <-ch.open:
384394
default:
385395
}
386-
if err != nil || time.Since(conn.connectedAtTime()) >= ch.opt.ConnMaxLifetime {
396+
397+
if err != nil {
398+
conn.debugf("[close: error] %s", err.Error())
399+
conn.close()
400+
return
401+
} else if time.Since(conn.connectedAtTime()) >= ch.opt.ConnMaxLifetime {
402+
conn.debugf("[close: lifetime expired]")
387403
conn.close()
388404
return
389405
}
406+
390407
if ch.opt.FreeBufOnConnRelease {
408+
conn.debugf("[free buffer]")
391409
conn.freeBuffer()
392410
}
411+
393412
select {
394413
case ch.idle <- conn:
395414
default:
415+
conn.debugf("[close: idle pool full %d/%d]", len(ch.idle), cap(ch.idle))
396416
conn.close()
397417
}
398418
}
399419

400420
func (ch *clickhouse) Close() error {
401421
for {
402422
select {
403-
case c := <-ch.idle:
404-
c.close()
423+
case conn := <-ch.idle:
424+
conn.debugf("[close: closing pool]")
425+
conn.close()
405426
default:
406427
// In rare cases, close may be called multiple times, don't block
407428
//TODO: add proper close flag to indicate this pool is unusable after Close

conn.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,12 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
6464
if opt.Debugf != nil {
6565
debugf = func(format string, v ...any) {
6666
opt.Debugf(
67-
"[clickhouse][conn=%d][%s] "+format,
68-
append([]interface{}{num, conn.RemoteAddr()}, v...)...,
67+
"[clickhouse][%s][id=%d] "+format,
68+
append([]interface{}{conn.RemoteAddr(), num}, v...)...,
6969
)
7070
}
7171
} else {
72-
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse][conn=%d][%s]", num, conn.RemoteAddr()), 0).Printf
72+
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse][%s][id=%d]", conn.RemoteAddr(), num), 0).Printf
7373
}
7474
}
7575

conn_http.go

Lines changed: 27 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
"net/http"
3434
"net/url"
3535
"os"
36-
"strconv"
3736
"strings"
3837
"sync"
3938
"time"
@@ -165,12 +164,12 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
165164
if opt.Debugf != nil {
166165
debugf = func(format string, v ...any) {
167166
opt.Debugf(
168-
"[clickhouse-http][conn=%d][%s] "+format,
169-
append([]interface{}{num, addr}, v...)...,
167+
"[clickhouse-http][%s][id=%d] "+format,
168+
append([]interface{}{addr, num}, v...)...,
170169
)
171170
}
172171
} else {
173-
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse-http][conn=%d][%s]", num, addr), 0).Printf
172+
debugf = log.New(os.Stdout, fmt.Sprintf("[clickhouse-http][%s][id=%d]", addr, num), 0).Printf
174173
}
175174
}
176175

@@ -216,6 +215,8 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
216215
}
217216

218217
query.Set("default_format", "Native")
218+
// TODO: we support newer revisions but for some reason this completely breaks Native format
219+
//query.Set("client_protocol_version", strconv.Itoa(ClientTCPProtocolVersion))
219220
u.RawQuery = query.Encode()
220221

221222
httpProxy := http.ProxyFromEnvironment
@@ -241,13 +242,18 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
241242
}
242243
}
243244

244-
// Temporary conn for determining timezone + version
245-
conn := &httpConnect{
246-
opt: opt,
245+
conn := httpConnect{
246+
id: num,
247+
connectedAt: time.Now(),
248+
released: false,
249+
debugfFunc: debugf,
250+
opt: opt,
247251
client: &http.Client{
248252
Transport: t,
249253
},
250-
url: u,
254+
url: u,
255+
// TODO: learn more about why revision is broken
256+
//revision: ClientTCPProtocolVersion,
251257
buffer: new(chproto.Buffer),
252258
compression: opt.Compression.Method,
253259
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
@@ -259,30 +265,9 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
259265
if err != nil {
260266
return nil, fmt.Errorf("failed to query server hello: %w", err)
261267
}
262-
// Close temp connection, important for freeing session_id if set
263-
_ = conn.close()
264-
265-
// Set client revision so we can decode updated versions of Native format
266-
query.Set("client_protocol_version", strconv.Itoa(int(handshake.Revision)))
267-
u.RawQuery = query.Encode()
268+
conn.handshake = handshake
268269

269-
return &httpConnect{
270-
id: num,
271-
connectedAt: time.Now(),
272-
released: false,
273-
debugfFunc: debugf,
274-
opt: opt,
275-
client: &http.Client{
276-
Transport: t,
277-
},
278-
url: u,
279-
buffer: new(chproto.Buffer),
280-
compression: opt.Compression.Method,
281-
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
282-
compressionPool: compressionPool,
283-
blockBufferSize: opt.BlockBufferSize,
284-
handshake: handshake,
285-
}, nil
270+
return &conn, nil
286271
}
287272

288273
type httpConnect struct {
@@ -291,6 +276,7 @@ type httpConnect struct {
291276
released bool
292277
debugfFunc func(format string, v ...any)
293278
opt *Options
279+
revision uint64
294280
url *url.URL
295281
client *http.Client
296282
buffer *chproto.Buffer
@@ -333,6 +319,7 @@ func (h *httpConnect) isBad() bool {
333319
}
334320

335321
func (h *httpConnect) queryHello(ctx context.Context, release nativeTransportRelease) (proto.ServerHandshake, error) {
322+
h.debugf("[query hello]")
336323
ctx = Context(ctx, ignoreExternalTables())
337324
query := "SELECT displayName(), version(), revision(), timezone()"
338325
rows, err := h.query(ctx, release, query)
@@ -420,8 +407,8 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er
420407
func (h *httpConnect) writeData(block *proto.Block) error {
421408
// Saving offset of compressible data
422409
start := len(h.buffer.Buf)
423-
if err := block.Encode(h.buffer, 0); err != nil {
424-
return err
410+
if err := block.Encode(h.buffer, h.revision); err != nil {
411+
return fmt.Errorf("block encode: %w", err)
425412
}
426413
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
427414
// Performing compression. Supported and requires
@@ -445,8 +432,8 @@ func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location)
445432
reader.EnableCompression()
446433
defer reader.DisableCompression()
447434
}
448-
if err := block.Decode(reader, h.handshake.Revision); err != nil {
449-
return nil, err
435+
if err := block.Decode(reader, h.revision); err != nil {
436+
return nil, fmt.Errorf("block decode: %w", err)
450437
}
451438
return &block, nil
452439
}
@@ -564,7 +551,7 @@ func (h *httpConnect) createRequestWithExternalTables(ctx context.Context, query
564551
return nil, err
565552
}
566553
buf.Reset()
567-
err = table.Block().Encode(buf, 0)
554+
err = table.Block().Encode(buf, h.revision)
568555
if err != nil {
569556
return nil, err
570557
}
@@ -597,11 +584,12 @@ func (h *httpConnect) executeRequest(req *http.Request) (*http.Response, error)
597584

598585
if resp.StatusCode != http.StatusOK {
599586
defer discardAndClose(resp.Body)
600-
msg, err := h.readRawResponse(resp)
587+
msgBytes, err := h.readRawResponse(resp)
601588
if err != nil {
602-
return nil, fmt.Errorf("clickhouse [execute]:: %d code: failed to read the response: %w", resp.StatusCode, err)
589+
return nil, fmt.Errorf("[HTTP %d] failed to read response: %w", resp.StatusCode, err)
603590
}
604-
return nil, fmt.Errorf("clickhouse [execute]:: %d code: %s", resp.StatusCode, string(msg))
591+
592+
return nil, fmt.Errorf("[HTTP %d] response body: \"%s\"", resp.StatusCode, string(msgBytes))
605593
}
606594
return resp, nil
607595
}

conn_http_batch.go

Lines changed: 11 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,7 @@ func fetchColumnNamesAndTypesForInsert(h *httpConnect, release nativeTransportRe
7777
} else {
7878
// Use all columns
7979
for _, colName := range allColumns {
80-
colType, ok := columnsToTypes[colName]
81-
if !ok {
82-
return nil, fmt.Errorf("column %s is not present in the table %s", colName, tableName)
83-
}
84-
80+
colType := columnsToTypes[colName]
8581
insertColumns = append(insertColumns, ColumnNameAndType{
8682
Name: colName,
8783
Type: colType,
@@ -121,10 +117,12 @@ func newBlock(h *httpConnect, release nativeTransportRelease, ctx context.Contex
121117
}
122118

123119
func (h *httpConnect) prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error) {
124-
// release is not used for newBlock since the connection is held for the batch.
120+
// release is not used within newBlock since the connection is held for the batch.
125121
query, block, err := newBlock(h, func(nativeTransport, error) {}, ctx, query)
126122
if err != nil {
127-
return nil, fmt.Errorf("failed to init block for HTTP batch: %w", err)
123+
err = fmt.Errorf("failed to init block for HTTP batch: %w", err)
124+
release(h, err)
125+
return nil, err
128126
}
129127

130128
return &httpBatch{
@@ -237,6 +235,7 @@ func (b *httpBatch) IsSent() bool {
237235
func (b *httpBatch) Send() (err error) {
238236
defer func() {
239237
b.sent = true
238+
b.release(err)
240239
}()
241240
if b.sent {
242241
return ErrBatchAlreadySent
@@ -264,33 +263,29 @@ func (b *httpBatch) Send() (err error) {
264263
connWriter := compressionWriter.reset(pipeWriter)
265264

266265
go func() {
267-
var err error = nil
266+
var err error
268267
defer pipeWriter.CloseWithError(err)
269268
defer connWriter.Close()
270269
b.conn.buffer.Reset()
271-
if b.block.Rows() != 0 {
272-
if err = b.conn.writeData(b.block); err != nil {
273-
return
274-
}
275-
}
276-
if err = b.conn.writeData(&proto.Block{}); err != nil {
270+
if err = b.conn.writeData(b.block); err != nil {
277271
return
278272
}
279273
if _, err = connWriter.Write(b.conn.buffer.Buf); err != nil {
280274
return
281275
}
282-
b.release(nil)
283276
}()
284277

285278
options.settings["query"] = b.query
286279
headers["Content-Type"] = "application/octet-stream"
287280

281+
b.conn.debugf("[batch send start] columns=%d rows=%d", len(b.block.Columns), b.block.Rows())
288282
res, err := b.conn.sendStreamQuery(b.ctx, pipeReader, &options, headers)
289283
if err != nil {
290-
return err
284+
return fmt.Errorf("batch sendStreamQuery: %w", err)
291285
}
292286
discardAndClose(res.Body)
293287

288+
b.conn.debugf("[batch send complete]")
294289
b.block.Reset()
295290

296291
return nil

0 commit comments

Comments
 (0)