Skip to content

Commit 9464006

Browse files
Merge pull request #1577 from ClickHouse/open_http
Support HTTP connnections in Native Go interface
2 parents 8bf4552 + ac33f72 commit 9464006

File tree

162 files changed

+9316
-8197
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

162 files changed

+9316
-8197
lines changed

benchmark/json_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestMain(m *testing.M) {
1818
}
1919

2020
func GetNativeConnection(settings clickhouse.Settings, tlsConfig *tls.Config, compression *clickhouse.Compression) (driver.Conn, error) {
21-
return clickhouse_tests.GetConnection(testSet, settings, tlsConfig, compression)
21+
return clickhouse_tests.GetConnectionTCP(testSet, settings, tlsConfig, compression)
2222
}
2323

2424
func prepareJSONTest(ctx context.Context, b *testing.B) driver.Conn {

clickhouse.go

Lines changed: 57 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727

2828
_ "time/tzdata"
2929

30-
chproto "github.com/ClickHouse/ch-go/proto"
3130
"github.com/ClickHouse/clickhouse-go/v2/contributors"
3231
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
3332
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@@ -83,19 +82,43 @@ func Open(opt *Options) (driver.Conn, error) {
8382
opt = &Options{}
8483
}
8584
o := opt.setDefaults()
85+
8686
conn := &clickhouse{
8787
opt: o,
88-
idle: make(chan *connect, o.MaxIdleConns),
88+
idle: make(chan nativeTransport, o.MaxIdleConns),
8989
open: make(chan struct{}, o.MaxOpenConns),
9090
exit: make(chan struct{}),
9191
}
9292
go conn.startAutoCloseIdleConnections()
9393
return conn, nil
9494
}
9595

96+
// nativeTransport represents an implementation (TCP or HTTP) that can be pooled by the main clickhouse struct.
97+
// Implementations are not expected to be thread safe, which is why we provide acquire/release functions.
98+
type nativeTransport interface {
99+
serverVersion() (*ServerVersion, error)
100+
query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error)
101+
queryRow(ctx context.Context, release nativeTransportRelease, query string, args ...any) *row
102+
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error)
103+
exec(ctx context.Context, query string, args ...any) error
104+
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
105+
ping(context.Context) error
106+
isBad() bool
107+
connID() int
108+
connectedAtTime() time.Time
109+
isReleased() bool
110+
setReleased(released bool)
111+
debugf(format string, v ...any)
112+
// freeBuffer is called if Options.FreeBufOnConnRelease is set
113+
freeBuffer()
114+
close() error
115+
}
116+
type nativeTransportAcquire func(context.Context) (nativeTransport, error)
117+
type nativeTransportRelease func(nativeTransport, error)
118+
96119
type clickhouse struct {
97120
opt *Options
98-
idle chan *connect
121+
idle chan nativeTransport
99122
open chan struct{}
100123
exit chan struct{}
101124
connID int64
@@ -118,27 +141,27 @@ func (ch *clickhouse) ServerVersion() (*driver.ServerVersion, error) {
118141
if err != nil {
119142
return nil, err
120143
}
121-
ch.release(conn, nil)
122-
return &conn.server, nil
144+
defer ch.release(conn, nil)
145+
return conn.serverVersion()
123146
}
124147

125148
func (ch *clickhouse) Query(ctx context.Context, query string, args ...any) (rows driver.Rows, err error) {
126149
conn, err := ch.acquire(ctx)
127150
if err != nil {
128151
return nil, err
129152
}
130-
conn.debugf("[acquired] connection [%d]", conn.id)
153+
conn.debugf("[acquired] connection [%d]", conn.connID())
131154
return conn.query(ctx, ch.release, query, args...)
132155
}
133156

134-
func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...any) (rows driver.Row) {
157+
func (ch *clickhouse) QueryRow(ctx context.Context, query string, args ...any) driver.Row {
135158
conn, err := ch.acquire(ctx)
136159
if err != nil {
137160
return &row{
138161
err: err,
139162
}
140163
}
141-
conn.debugf("[acquired] connection [%d]", conn.id)
164+
conn.debugf("[acquired] connection [%d]", conn.connID())
142165
return conn.queryRow(ctx, ch.release, query, args...)
143166
}
144167

@@ -147,7 +170,7 @@ func (ch *clickhouse) Exec(ctx context.Context, query string, args ...any) error
147170
if err != nil {
148171
return err
149172
}
150-
conn.debugf("[acquired] connection [%d]", conn.id)
173+
conn.debugf("[acquired] connection [%d]", conn.connID())
151174

152175
if err := conn.exec(ctx, query, args...); err != nil {
153176
ch.release(conn, err)
@@ -162,7 +185,7 @@ func (ch *clickhouse) PrepareBatch(ctx context.Context, query string, opts ...dr
162185
if err != nil {
163186
return nil, err
164187
}
165-
batch, err := conn.prepareBatch(ctx, query, getPrepareBatchOptions(opts...), ch.release, ch.acquire)
188+
batch, err := conn.prepareBatch(ctx, ch.release, ch.acquire, query, getPrepareBatchOptions(opts...))
166189
if err != nil {
167190
return nil, err
168191
}
@@ -214,11 +237,18 @@ func (ch *clickhouse) Stats() driver.Stats {
214237
}
215238
}
216239

217-
func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
240+
func (ch *clickhouse) dial(ctx context.Context) (conn nativeTransport, err error) {
218241
connID := int(atomic.AddInt64(&ch.connID, 1))
219242

220243
dialFunc := func(ctx context.Context, addr string, opt *Options) (DialResult, error) {
221-
conn, err := dial(ctx, addr, connID, opt)
244+
var conn nativeTransport
245+
var err error
246+
switch opt.Protocol {
247+
case HTTP:
248+
conn, err = dialHttp(ctx, addr, connID, opt)
249+
default:
250+
conn, err = dial(ctx, addr, connID, opt)
251+
}
222252

223253
return DialResult{conn}, err
224254
}
@@ -260,7 +290,7 @@ func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dia
260290
return r, err
261291
}
262292

263-
func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
293+
func (ch *clickhouse) acquire(ctx context.Context) (conn nativeTransport, err error) {
264294
timer := time.NewTimer(ch.opt.DialTimeout)
265295
defer timer.Stop()
266296
select {
@@ -293,7 +323,7 @@ func (ch *clickhouse) acquire(ctx context.Context) (conn *connect, err error) {
293323
return nil, err
294324
}
295325
}
296-
conn.released = false
326+
conn.setReleased(false)
297327
return conn, nil
298328
default:
299329
}
@@ -326,7 +356,7 @@ func (ch *clickhouse) closeIdleExpired() {
326356
for {
327357
select {
328358
case conn := <-ch.idle:
329-
if conn.connectedAt.Before(cutoff) {
359+
if conn.connectedAtTime().Before(cutoff) {
330360
conn.close()
331361
} else {
332362
select {
@@ -342,24 +372,23 @@ func (ch *clickhouse) closeIdleExpired() {
342372
}
343373
}
344374

345-
func (ch *clickhouse) release(conn *connect, err error) {
346-
if conn.released {
375+
func (ch *clickhouse) release(conn nativeTransport, err error) {
376+
if conn.isReleased() {
347377
return
348378
}
349-
conn.released = true
350-
conn.debugf("[released] connection [%d]", conn.id)
379+
conn.setReleased(true)
380+
conn.debugf("[released] connection [%d]", conn.connID())
351381

352382
select {
353383
case <-ch.open:
354384
default:
355385
}
356-
if err != nil || time.Since(conn.connectedAt) >= ch.opt.ConnMaxLifetime {
386+
if err != nil || time.Since(conn.connectedAtTime()) >= ch.opt.ConnMaxLifetime {
357387
conn.close()
358388
return
359389
}
360390
if ch.opt.FreeBufOnConnRelease {
361-
conn.buffer = new(chproto.Buffer)
362-
conn.compressor.Data = nil
391+
conn.freeBuffer()
363392
}
364393
select {
365394
case ch.idle <- conn:
@@ -374,7 +403,12 @@ func (ch *clickhouse) Close() error {
374403
case c := <-ch.idle:
375404
c.close()
376405
default:
377-
ch.exit <- struct{}{}
406+
// In rare cases, close may be called multiple times, don't block
407+
//TODO: add proper close flag to indicate this pool is unusable after Close
408+
select {
409+
case ch.exit <- struct{}{}:
410+
default:
411+
}
378412
return nil
379413
}
380414
}

clickhouse_options.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func ParseDSN(dsn string) (*Options, error) {
124124

125125
type Dial func(ctx context.Context, addr string, opt *Options) (DialResult, error)
126126
type DialResult struct {
127-
conn *connect
127+
conn nativeTransport
128128
}
129129

130130
type HTTPProxy func(*http.Request) (*url.URL, error)

clickhouse_std.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import (
3434
"syscall"
3535

3636
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
37-
ldriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
37+
chdriver "github.com/ClickHouse/clickhouse-go/v2/lib/driver"
3838
)
3939

4040
var globalConnID int64
@@ -196,10 +196,10 @@ func OpenDB(opt *Options) *sql.DB {
196196
type stdConnect interface {
197197
isBad() bool
198198
close() error
199-
query(ctx context.Context, release func(*connect, error), query string, args ...any) (*rows, error)
199+
query(ctx context.Context, release nativeTransportRelease, query string, args ...any) (*rows, error)
200200
exec(ctx context.Context, query string, args ...any) error
201201
ping(ctx context.Context) (err error)
202-
prepareBatch(ctx context.Context, query string, options ldriver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (ldriver.Batch, error)
202+
prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, options chdriver.PrepareBatchOptions) (chdriver.Batch, error)
203203
asyncInsert(ctx context.Context, query string, wait bool, args ...any) error
204204
}
205205

@@ -333,7 +333,7 @@ func (std *stdDriver) QueryContext(ctx context.Context, query string, args []dri
333333
return nil, driver.ErrBadConn
334334
}
335335

336-
r, err := std.conn.query(ctx, func(*connect, error) {}, query, rebind(args)...)
336+
r, err := std.conn.query(ctx, func(nativeTransport, error) {}, query, rebind(args)...)
337337
if isConnBrokenError(err) {
338338
std.debugf("QueryContext got a fatal error, resetting connection: %v\n", err)
339339
return nil, driver.ErrBadConn
@@ -358,7 +358,7 @@ func (std *stdDriver) PrepareContext(ctx context.Context, query string) (driver.
358358
return nil, driver.ErrBadConn
359359
}
360360

361-
batch, err := std.conn.prepareBatch(ctx, query, ldriver.PrepareBatchOptions{}, func(*connect, error) {}, func(context.Context) (*connect, error) { return nil, nil })
361+
batch, err := std.conn.prepareBatch(ctx, func(nativeTransport, error) {}, func(context.Context) (nativeTransport, error) { return nil, nil }, query, chdriver.PrepareBatchOptions{})
362362
if err != nil {
363363
if isConnBrokenError(err) {
364364
std.debugf("PrepareContext got a fatal error, resetting connection: %v\n", err)
@@ -387,7 +387,7 @@ func (std *stdDriver) Close() error {
387387
}
388388

389389
type stdBatch struct {
390-
batch ldriver.Batch
390+
batch chdriver.Batch
391391
debugf func(format string, v ...any)
392392
}
393393

conn.go

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ func dial(ctx context.Context, addr string, num int, opt *Options) (*connect, er
9696
id: num,
9797
opt: opt,
9898
conn: conn,
99-
debugf: debugf,
99+
debugfFunc: debugf,
100100
buffer: new(chproto.Buffer),
101101
reader: chproto.NewReader(conn),
102102
revision: ClientTCPProtocolVersion,
@@ -144,7 +144,7 @@ type connect struct {
144144
id int
145145
opt *Options
146146
conn net.Conn
147-
debugf func(format string, v ...any)
147+
debugfFunc func(format string, v ...any)
148148
server ServerVersion
149149
closed bool
150150
buffer *chproto.Buffer
@@ -162,6 +162,22 @@ type connect struct {
162162
closeMutex sync.Mutex
163163
}
164164

165+
func (c *connect) debugf(format string, v ...any) {
166+
c.debugfFunc(format, v...)
167+
}
168+
169+
func (c *connect) connID() int {
170+
return c.id
171+
}
172+
173+
func (c *connect) connectedAtTime() time.Time {
174+
return c.connectedAt
175+
}
176+
177+
func (c *connect) serverVersion() (*ServerVersion, error) {
178+
return &c.server, nil
179+
}
180+
165181
func (c *connect) settings(querySettings Settings) []proto.Setting {
166182
settingToProtoSetting := func(k string, v any) proto.Setting {
167183
isCustom := false
@@ -206,6 +222,14 @@ func (c *connect) isBad() bool {
206222
return false
207223
}
208224

225+
func (c *connect) isReleased() bool {
226+
return c.released
227+
}
228+
229+
func (c *connect) setReleased(released bool) {
230+
c.released = released
231+
}
232+
209233
func (c *connect) isClosed() bool {
210234
c.closeMutex.Lock()
211235
defer c.closeMutex.Unlock()
@@ -372,6 +396,11 @@ func (c *connect) readData(ctx context.Context, packet byte, compressible bool)
372396
return &block, nil
373397
}
374398

399+
func (c *connect) freeBuffer() {
400+
c.buffer = new(chproto.Buffer)
401+
c.compressor.Data = nil
402+
}
403+
375404
func (c *connect) flush() error {
376405
if len(c.buffer.Buf) == 0 {
377406
// Nothing to flush.

conn_batch.go

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import (
3535
var insertMatch = regexp.MustCompile(`(?i)(INSERT\s+INTO\s+[^( ]+(?:\s*\([^()]*(?:\([^()]*\)[^()]*)*\))?)(?:\s*VALUES)?`)
3636
var columnMatch = regexp.MustCompile(`INSERT INTO .+\s\((?P<Columns>.+)\)$`)
3737

38-
func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.PrepareBatchOptions, release func(*connect, error), acquire func(context.Context) (*connect, error)) (driver.Batch, error) {
38+
func (c *connect) prepareBatch(ctx context.Context, release nativeTransportRelease, acquire nativeTransportAcquire, query string, opts driver.PrepareBatchOptions) (driver.Batch, error) {
3939
query, _, queryColumns, verr := extractNormalizedInsertQueryAndColumns(query)
4040
if verr != nil {
4141
return nil, verr
@@ -60,17 +60,29 @@ func (c *connect) prepareBatch(ctx context.Context, query string, opts driver.Pr
6060
}
6161
// resort batch to specified columns
6262
if err = block.SortColumns(queryColumns); err != nil {
63+
release(c, err)
6364
return nil, err
6465
}
6566

67+
connRelease := func(conn *connect, err error) {
68+
release(conn, err)
69+
}
70+
connAcquire := func(ctx context.Context) (*connect, error) {
71+
conn, err := acquire(ctx)
72+
if err != nil {
73+
return nil, err
74+
}
75+
return conn.(*connect), nil
76+
}
77+
6678
b := &batch{
6779
ctx: ctx,
6880
query: query,
6981
conn: c,
7082
block: block,
7183
released: false,
72-
connRelease: release,
73-
connAcquire: acquire,
84+
connRelease: connRelease,
85+
connAcquire: connAcquire,
7486
onProcess: onProcess,
7587
closeOnFlush: opts.CloseOnFlush,
7688
}

0 commit comments

Comments
 (0)