Skip to content

Commit 5ece2cb

Browse files
committed
use client revision for HTTP, re-use http connection from handshake, logging
1 parent e62888a commit 5ece2cb

File tree

2 files changed

+24
-40
lines changed

2 files changed

+24
-40
lines changed

conn_http.go

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
216216
}
217217

218218
query.Set("default_format", "Native")
219+
query.Set("client_protocol_version", strconv.Itoa(ClientTCPProtocolVersion))
219220
u.RawQuery = query.Encode()
220221

221222
httpProxy := http.ProxyFromEnvironment
@@ -241,13 +242,17 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
241242
}
242243
}
243244

244-
// Temporary conn for determining timezone + version
245-
conn := &httpConnect{
246-
opt: opt,
245+
conn := httpConnect{
246+
id: num,
247+
connectedAt: time.Now(),
248+
released: false,
249+
debugfFunc: debugf,
250+
opt: opt,
247251
client: &http.Client{
248252
Transport: t,
249253
},
250254
url: u,
255+
revision: ClientTCPProtocolVersion,
251256
buffer: new(chproto.Buffer),
252257
compression: opt.Compression.Method,
253258
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
@@ -259,30 +264,9 @@ func dialHttp(ctx context.Context, addr string, num int, opt *Options) (*httpCon
259264
if err != nil {
260265
return nil, fmt.Errorf("failed to query server hello: %w", err)
261266
}
262-
// Close temp connection, important for freeing session_id if set
263-
_ = conn.close()
267+
conn.handshake = handshake
264268

265-
// Set client revision so we can decode updated versions of Native format
266-
query.Set("client_protocol_version", strconv.Itoa(int(handshake.Revision)))
267-
u.RawQuery = query.Encode()
268-
269-
return &httpConnect{
270-
id: num,
271-
connectedAt: time.Now(),
272-
released: false,
273-
debugfFunc: debugf,
274-
opt: opt,
275-
client: &http.Client{
276-
Transport: t,
277-
},
278-
url: u,
279-
buffer: new(chproto.Buffer),
280-
compression: opt.Compression.Method,
281-
blockCompressor: compress.NewWriter(compress.Level(opt.Compression.Level), compress.Method(opt.Compression.Method)),
282-
compressionPool: compressionPool,
283-
blockBufferSize: opt.BlockBufferSize,
284-
handshake: handshake,
285-
}, nil
269+
return &conn, nil
286270
}
287271

288272
type httpConnect struct {
@@ -291,6 +275,7 @@ type httpConnect struct {
291275
released bool
292276
debugfFunc func(format string, v ...any)
293277
opt *Options
278+
revision uint64
294279
url *url.URL
295280
client *http.Client
296281
buffer *chproto.Buffer
@@ -421,7 +406,7 @@ func createCompressionPool(compression *Compression) (Pool[HTTPReaderWriter], er
421406
func (h *httpConnect) writeData(block *proto.Block) error {
422407
// Saving offset of compressible data
423408
start := len(h.buffer.Buf)
424-
if err := block.Encode(h.buffer, 0); err != nil {
409+
if err := block.Encode(h.buffer, h.revision); err != nil {
425410
return err
426411
}
427412
if h.compression == CompressionLZ4 || h.compression == CompressionZSTD {
@@ -446,7 +431,7 @@ func (h *httpConnect) readData(reader *chproto.Reader, timezone *time.Location)
446431
reader.EnableCompression()
447432
defer reader.DisableCompression()
448433
}
449-
if err := block.Decode(reader, h.handshake.Revision); err != nil {
434+
if err := block.Decode(reader, h.revision); err != nil {
450435
return nil, fmt.Errorf("block decode: %w", err)
451436
}
452437
return &block, nil
@@ -565,7 +550,7 @@ func (h *httpConnect) createRequestWithExternalTables(ctx context.Context, query
565550
return nil, err
566551
}
567552
buf.Reset()
568-
err = table.Block().Encode(buf, 0)
553+
err = table.Block().Encode(buf, h.revision)
569554
if err != nil {
570555
return nil, err
571556
}

conn_http_batch.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -77,11 +77,7 @@ func fetchColumnNamesAndTypesForInsert(h *httpConnect, release nativeTransportRe
7777
} else {
7878
// Use all columns
7979
for _, colName := range allColumns {
80-
colType, ok := columnsToTypes[colName]
81-
if !ok {
82-
return nil, fmt.Errorf("column %s is not present in the table %s", colName, tableName)
83-
}
84-
80+
colType := columnsToTypes[colName]
8581
insertColumns = append(insertColumns, ColumnNameAndType{
8682
Name: colName,
8783
Type: colType,
@@ -124,8 +120,9 @@ func (h *httpConnect) prepareBatch(ctx context.Context, release nativeTransportR
124120
// release is not used within newBlock since the connection is held for the batch.
125121
query, block, err := newBlock(h, func(nativeTransport, error) {}, ctx, query)
126122
if err != nil {
123+
err = fmt.Errorf("failed to init block for HTTP batch: %w", err)
127124
release(h, err)
128-
return nil, fmt.Errorf("failed to init block for HTTP batch: %w", err)
125+
return nil, err
129126
}
130127

131128
return &httpBatch{
@@ -266,10 +263,12 @@ func (b *httpBatch) Send() (err error) {
266263

267264
go func() {
268265
var err error = nil
266+
defer b.release(err)
269267
defer pipeWriter.CloseWithError(err)
270268
defer connWriter.Close()
271269
b.conn.buffer.Reset()
272-
if b.block.Rows() != 0 {
270+
rowCount := b.block.Rows()
271+
if rowCount != 0 {
273272
if err = b.conn.writeData(b.block); err != nil {
274273
return
275274
}
@@ -280,20 +279,20 @@ func (b *httpBatch) Send() (err error) {
280279
if _, err = connWriter.Write(b.conn.buffer.Buf); err != nil {
281280
return
282281
}
283-
b.release(nil)
282+
283+
b.conn.debugf("[batch send complete] rows=%d", rowCount)
284284
}()
285285

286286
options.settings["query"] = b.query
287287
headers["Content-Type"] = "application/octet-stream"
288288

289+
b.conn.debugf("[batch send started] rows=%d", b.block.Rows())
289290
res, err := b.conn.sendStreamQuery(b.ctx, pipeReader, &options, headers)
290291
if err != nil {
291-
return err
292+
return fmt.Errorf("sendStreamQuery: %w", err)
292293
}
293294
discardAndClose(res.Body)
294295

295-
b.block.Reset()
296-
297296
return nil
298297
}
299298

0 commit comments

Comments
 (0)