-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathclickhouse.go
More file actions
188 lines (156 loc) · 5.27 KB
/
clickhouse.go
File metadata and controls
188 lines (156 loc) · 5.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
package database
import (
"context"
"crypto/tls"
"fmt"
"net/url"
"strings"
"time"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/ethpandaops/cbt-api/internal/config"
"github.com/sirupsen/logrus"
)
// ClickHouse protocol constants.
const (
SchemeClickHouse = "clickhouse://"
SchemeHTTPS = "https://"
SchemeHTTP = "http://"
PortClickHouseNative = ":9000"
PortClickHouseNativeTLS = ":9440"
)
// DatabaseClient defines the interface for database operations.
// This interface allows for instrumentation wrappers (e.g., tracing) without modifying generated code.
type DatabaseClient interface {
Query(ctx context.Context, query string, args ...any) (driver.Rows, error)
QueryRow(ctx context.Context, query string, args ...any) driver.Row
Select(ctx context.Context, dest any, query string, args ...any) error
Exec(ctx context.Context, query string, args ...any) error
Close() error
}
// Client wraps the official ClickHouse Go driver (native interface).
type Client struct {
conn driver.Conn
log logrus.FieldLogger
config *config.ClickHouseConfig
}
// Ensure Client implements DatabaseClient interface.
var _ DatabaseClient = (*Client)(nil)
// NewClient creates a new ClickHouse client using the official Go driver.
func NewClient(cfg *config.ClickHouseConfig, logger logrus.FieldLogger) (*Client, error) {
log := logger.WithFields(logrus.Fields{
"module": "clickhouse",
})
log.Debug("Initialising ClickHouse client")
// Parse and process DSN
parsedDSN, err := parseDSN(cfg.DSN)
if err != nil {
return nil, fmt.Errorf("failed to parse DSN: %w", err)
}
// Create ClickHouse options
options := createClickHouseOptions(cfg, parsedDSN)
// Open connection using native driver
conn, err := clickhouse.Open(options)
if err != nil {
return nil, fmt.Errorf("failed to create ClickHouse connection: %w", err)
}
// Test connection
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
if err := conn.Ping(ctx); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("failed to ping ClickHouse: %w", err)
}
log.WithFields(logrus.Fields{
"database": cfg.Database,
"host": parsedDSN.Host,
}).Info("ClickHouse client initialised")
return &Client{
conn: conn,
log: log,
config: cfg,
}, nil
}
// Query executes a SQL query and returns rows (native driver interface).
func (c *Client) Query(ctx context.Context, query string, args ...any) (driver.Rows, error) {
return c.conn.Query(ctx, query, args...)
}
// QueryRow executes a query that is expected to return at most one row.
func (c *Client) QueryRow(ctx context.Context, query string, args ...any) driver.Row {
return c.conn.QueryRow(ctx, query, args...)
}
// Select executes a query and scans results directly into a slice of structs.
func (c *Client) Select(ctx context.Context, dest any, query string, args ...any) error {
return c.conn.Select(ctx, dest, query, args...)
}
// Exec executes a query without returning any rows.
func (c *Client) Exec(ctx context.Context, query string, args ...any) error {
return c.conn.Exec(ctx, query, args...)
}
// Close closes the database connection.
func (c *Client) Close() error {
c.log.Info("Closing ClickHouse connection")
return c.conn.Close()
}
// parseDSN processes the DSN and determines connection protocol.
func parseDSN(dsn string) (*url.URL, error) {
// Normalize scheme based on explicit scheme or port detection
var processedDSN string
// If DSN already has a scheme, keep it as-is
if strings.HasPrefix(dsn, SchemeClickHouse) || strings.HasPrefix(dsn, SchemeHTTPS) || strings.HasPrefix(dsn, SchemeHTTP) {
processedDSN = dsn
} else {
// Auto-detect protocol based on port
useNative := strings.Contains(dsn, PortClickHouseNative) || strings.Contains(dsn, PortClickHouseNativeTLS)
if useNative {
// Use native protocol for ports 9000/9440
processedDSN = SchemeClickHouse + dsn
} else {
// Default to HTTPS for other ports
processedDSN = SchemeHTTPS + dsn
}
}
// Parse URL
parsedURL, err := url.Parse(processedDSN)
if err != nil {
return nil, fmt.Errorf("failed to parse DSN: %w", err)
}
return parsedURL, nil
}
// createClickHouseOptions builds connection options for the driver.
func createClickHouseOptions(cfg *config.ClickHouseConfig, parsedURL *url.URL) *clickhouse.Options {
// Extract auth
auth := clickhouse.Auth{
Database: cfg.Database,
Username: parsedURL.User.Username(),
}
if password, ok := parsedURL.User.Password(); ok {
auth.Password = password
}
// Determine protocol
protocol := clickhouse.HTTP
if strings.HasPrefix(parsedURL.Scheme, "clickhouse") {
protocol = clickhouse.Native
}
// Build options
options := &clickhouse.Options{
Addr: []string{parsedURL.Host},
Auth: auth,
Protocol: protocol,
Settings: clickhouse.Settings{
"max_execution_time": cfg.MaxExecutionTime,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
DialTimeout: cfg.DialTimeout,
ReadTimeout: cfg.ReadTimeout,
}
// Add TLS if using HTTPS
if strings.HasPrefix(parsedURL.Scheme, "https") || parsedURL.Port() == "9440" {
options.TLS = &tls.Config{
InsecureSkipVerify: cfg.InsecureSkipVerify, //nolint:gosec // config for dev environments
}
}
return options
}