11package clickhouse
22
33import (
4+ "context"
45 "database/sql"
56 "fmt"
67 "log/slog"
@@ -56,6 +57,8 @@ func (c *Client) BufferLen() int {
5657// BufferWrite writes a list of rows from the buffer to the configured
5758// ClickHouse instance.
5859func (c * Client ) BufferWrite () error {
60+ ctx := context .Background ()
61+
5962 c .bufferMutex .Lock ()
6063 defer c .bufferMutex .Unlock ()
6164
@@ -72,22 +75,22 @@ func (c *Client) BufferWrite() error {
7275 // #nosec G201
7376 sql := fmt .Sprintf ("INSERT INTO %s.logs (timestamp, cluster, namespace, app, pod_name, container_name, host, fields_string, fields_number, log) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) %s" , c .database , settings )
7477
75- tx , err := c .client .Begin ( )
78+ tx , err := c .client .BeginTx ( ctx , nil )
7679 if err != nil {
7780 slog .Error ("Begin transaction failure" , slog .Any ("error" , err ))
7881 return err
7982 }
8083
8184 defer tx .Rollback ()
8285
83- stmt , err := tx .Prepare ( sql )
86+ stmt , err := tx .PrepareContext ( ctx , sql )
8487 if err != nil {
8588 slog .Error ("Prepare statement failure" , slog .Any ("error" , err ))
8689 return err
8790 }
8891
8992 for _ , l := range c .buffer {
90- _ , err = stmt .Exec ( l .Timestamp , l .Cluster , l .Namespace , l .App , l .Pod , l .Container , l .Host , l .FieldsString , l .FieldsNumber , l .Log )
93+ _ , err = stmt .ExecContext ( ctx , l .Timestamp , l .Cluster , l .Namespace , l .App , l .Pod , l .Container , l .Host , l .FieldsString , l .FieldsNumber , l .Log )
9194
9295 if err != nil {
9396 slog .Error ("Statement exec failure" , slog .Any ("error" , err ))
@@ -135,7 +138,7 @@ func NewClient(address, username, password, database, dialTimeout, connMaxLifeti
135138 conn .SetMaxOpenConns (maxOpenConns )
136139 conn .SetConnMaxLifetime (parsedConnMaxLifetime )
137140
138- if err := conn .Ping ( ); err != nil {
141+ if err := conn .PingContext ( context . Background () ); err != nil {
139142 if exception , ok := err .(* clickhouse.Exception ); ok {
140143 slog .Error (fmt .Sprintf ("[%d] %s \n %s\n " , exception .Code , exception .Message , exception .StackTrace ))
141144 } else {
0 commit comments