Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7031f8a
replace GetAllReplicasActiveQuery to exclude read only replicas compl…
BentsiLeviav Dec 15, 2025
ef2b761
Merge branch 'main' into fix-hydra-checking
BentsiLeviav Dec 24, 2025
82827f5
add caller to logs
BentsiLeviav Dec 24, 2025
91fb1ee
add logs to all operations
BentsiLeviav Dec 24, 2025
8d038cb
add stats and logs to client execution
BentsiLeviav Dec 24, 2025
a39aaad
use the new alter/mutation_sync=3 and move WaitAllNodesAvailable to b…
BentsiLeviav Dec 24, 2025
73af1c1
update TestGetAllReplicasActiveQuery
BentsiLeviav Dec 24, 2025
69ed25d
add context information to logs
BentsiLeviav Dec 24, 2025
93b40eb
Update destination/db/clickhouse.go
BentsiLeviav Dec 24, 2025
3b82297
Update destination/db/clickhouse.go
BentsiLeviav Dec 24, 2025
aa1d776
Update destination/db/clickhouse.go
BentsiLeviav Dec 24, 2025
f5d435f
remove warning prefix
BentsiLeviav Dec 28, 2025
e63d88e
update info logs to notice (debug)
BentsiLeviav Dec 28, 2025
c5afa02
Merge remote-tracking branch 'origin/fix-hydra-checking' into fix-hyd…
BentsiLeviav Dec 28, 2025
039df3d
add query id to logs, statement and query execution
BentsiLeviav Dec 28, 2025
75e8473
Update destination/db/clickhouse.go
BentsiLeviav Jan 6, 2026
bdefc6f
Update destination/db/clickhouse.go
BentsiLeviav Jan 6, 2026
2b323af
Update destination/db/clickhouse.go
BentsiLeviav Jan 6, 2026
2fa4a42
Update destination/db/clickhouse.go
BentsiLeviav Jan 6, 2026
b8cfae9
Fix average skew
BentsiLeviav Jan 6, 2026
af7467d
cr fixes
BentsiLeviav Jan 6, 2026
6a537ae
account only for successful queries
BentsiLeviav Jan 6, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions destination/common/log/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@ func Init() error {
default:
return fmt.Errorf("invalid log level: %s, allowed values: notice, info, warning, severe", *flags.LogLevel)
}

zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMicro
if *flags.LogPretty {
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr})
log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}).With().Caller().Logger()
} else {
log.Logger = log.With().Caller().Logger()
}
zerolog.TimeFieldFormat = zerolog.TimeFormatUnixMicro

return nil
}

Expand Down
146 changes: 122 additions & 24 deletions destination/db/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,61 @@ import (
pb "fivetran.com/fivetran_sdk/proto"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
)

const (
maxQueryLengthForLogging = 200
)

type ClickHouseConnection struct {
driver.Conn
username string
isLocal bool
username string
isLocal bool
connectTime time.Time
lastUsed time.Time
queryCount int64
errorCount int64
totalDuration time.Duration
}

func (conn *ClickHouseConnection) logConnectionStats() {
avgDuration := time.Duration(0)
if conn.queryCount > 0 {
avgDuration = conn.totalDuration / time.Duration(conn.queryCount)
}

log.Info(fmt.Sprintf("Connection stats - Queries: %d, Errors: %d, Avg Duration: %v",
conn.queryCount,
conn.errorCount,
avgDuration))
}

func (conn *ClickHouseConnection) recordQuery(duration time.Duration, success bool) {
conn.queryCount++
conn.totalDuration += duration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In totalDuration, we also account for queries that encountered an error when examining conn.totalDuration/time.Duration(conn.queryCount+1))), the average can be skewed by queries that got an error. I think we should separate it totalDuration/totalDurationError average can be problematic
lets take an example q1: 100 q2: 101 q3: 105 q4: 2 (with an error) = AVG = (100+101+105+2)/4 = 77
This is very problematic
So would go for Percentiles/Histogram one for success/failure queries

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that level of sophistication is beyond the scope of this PR. The current stats are meant to be basic connection health indicators rather than comprehensive performance metrics. Implementing proper percentile tracking or histograms would require:

  • Additional memory overhead for storing duration buckets/samples
  • More complex aggregation logic
  • Decisions about retention windows and bucket sizes
  • Potentially a metrics library to handle this properly

This feels like it should be its own feature request with proper design discussion. Would you mind opening a ticket for enhanced connection metrics?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you open an issue for the correct calcluations

if !success {
conn.errorCount++
} else {
conn.queryCount++
conn.totalDuration += duration
}
// Log stats every 100 queries
if conn.queryCount%100 == 0 {
conn.logConnectionStats()
}
}

func GetClickHouseConnection(ctx context.Context, configuration map[string]string) (*ClickHouseConnection, error) {
connConfig, err := config.Parse(configuration)
if err != nil {
return nil, fmt.Errorf("error while parsing configuration: %w", err)
}

log.Info(fmt.Sprintf("Initializing ClickHouse connection to %s:%s",
configuration[config.HostKey], configuration[config.PortKey]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's possible to add the username with which we connect with

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What for?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to understand what user was configured in case of some premissions probleam or mistakes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before customers set up a ClickHouse connection, 2 tests are run by the connector:

  • ConnectionTest to check connectivity with the provided user
  • GrantsTest to check proper permissions are set

It is important to mention that all these logs are not visible to the end users and can be retrieved only by requesting them from the Fivetran team.


settings := clickhouse.Settings{
// support ISO DateTime formats from CSV
// https://clickhouse.com/docs/en/operations/settings/formats#date_time_input_format
Expand All @@ -45,9 +86,11 @@ func GetClickHouseConnection(ctx context.Context, configuration map[string]strin
if !connConfig.Local {
tlsConfig = &tls.Config{InsecureSkipVerify: false}
// https://clickhouse.com/docs/en/operations/settings/settings#alter-sync
settings["alter_sync"] = 2
// https://github.com/ClickHouse/clickhouse-private/pull/12617
settings["alter_sync"] = 3
// https://clickhouse.com/docs/en/operations/settings/settings#mutations_sync
settings["mutations_sync"] = 2
// https://github.com/ClickHouse/clickhouse-private/pull/12617
settings["mutations_sync"] = 3
// https://clickhouse.com/docs/en/operations/settings/settings#select_sequential_consistency
settings["select_sequential_consistency"] = 1
}
Expand Down Expand Up @@ -88,6 +131,7 @@ func GetClickHouseConnection(ctx context.Context, configuration map[string]strin
log.Error(err)
return nil, err
}
log.Info("ClickHouse connection established successfully")
return &ClickHouseConnection{Conn: conn, username: connConfig.Username, isLocal: connConfig.Local}, nil
}

Expand All @@ -97,14 +141,35 @@ func (conn *ClickHouseConnection) ExecStatement(
op connectionOpType,
benchmark bool,
) error {
// Generate unique query ID
queryID := uuid.New().String()

ctx = clickhouse.Context(ctx, clickhouse.WithQueryID(queryID))

// Add as comment for visibility in query text
statementWithComment := fmt.Sprintf("-- query_id: %s, operation: %s\n%s", queryID, op, statement)

startTime := time.Now()
logQuery := statement
if len(logQuery) > maxQueryLengthForLogging {
logQuery = statement[:maxQueryLengthForLogging] + "..."
}

log.Info(fmt.Sprintf("Executing %s [query_id=%s]: %s", op, queryID, logQuery))
err := retry.OnNetError(func() error {
return conn.Exec(ctx, statement)
return conn.Exec(ctx, statementWithComment)
}, ctx, string(op), benchmark)

// Calculate duration once for consistent reporting
duration := time.Since(startTime)
conn.recordQuery(duration, err == nil)

if err != nil {
err = fmt.Errorf("error while executing %s: %w", statement, err)
err = fmt.Errorf("Error while executing %s [query_id=%s]: %w", statement, queryID, err)
log.Error(err)
return err
}
log.Info(fmt.Sprintf("Successfully executed %s [query_id=%s] in %v", op, queryID, duration))
return nil
}

Expand All @@ -114,14 +179,35 @@ func (conn *ClickHouseConnection) ExecQuery(
op connectionOpType,
benchmark bool,
) (driver.Rows, error) {
// Generate unique query ID
queryID := uuid.New().String()

ctx = clickhouse.Context(ctx, clickhouse.WithQueryID(queryID))

// Add query ID as SQL comment at the beginning of the query
queryWithID := fmt.Sprintf("-- query_id: %s\n%s", queryID, query)

startTime := time.Now()
logQuery := query
if len(logQuery) > maxQueryLengthForLogging {
logQuery = query[:maxQueryLengthForLogging] + "..."
}

log.Info(fmt.Sprintf("Executing query %s [query_id=%s]: %s", op, queryID, logQuery))
rows, err := retry.OnNetErrorWithData(func() (driver.Rows, error) {
return conn.Query(ctx, query)
return conn.Query(ctx, queryWithID)
}, ctx, string(op), benchmark)

// Calculate duration once for consistent reporting
duration := time.Since(startTime)
conn.recordQuery(duration, err == nil)

if err != nil {
err = fmt.Errorf("error while executing %s: %w", query, err)
err = fmt.Errorf("Error while executing %s [query_id=%s]: %w", query, queryID, err)
log.Error(err)
return nil, err
}
log.Info(fmt.Sprintf("Query %s [query_id=%s] completed in %v", op, queryID, duration))
return rows, nil
}

Expand Down Expand Up @@ -322,9 +408,10 @@ func (conn *ClickHouseConnection) AlterTable(
if err != nil {
return false, err
}
// even though we set alter/mutations_sync=3, we check for all nodes availability and log warning if not all nodes are available
err = conn.WaitAllNodesAvailable(ctx, schemaName, tableName)
if err != nil {
return false, err
log.Warn(fmt.Sprintf("It seems like not all nodes are available: %v. We strongly recommend to check the cluster health and availability to avoid inconsistency between replicas", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously, we returned with an error. Is this still the case since we removed return false, err

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was described in the PR description:

we moved GetAllReplicasActiveQuery to warn on bad replica status, and not fail.

}
if hasChangedPK {
unixMilli := time.Now().UnixMilli()
Expand Down Expand Up @@ -419,9 +506,10 @@ func (conn *ClickHouseConnection) TruncateTable(
} else {
op = softTruncateTable
}
// even though we set alter/mutations_sync=3, we check for all nodes availability and log warning if not all nodes are available
err = conn.WaitAllNodesAvailable(ctx, schemaName, tableName)
if err != nil {
return err
log.Warn(fmt.Sprintf("It seems like not all nodes are available: %v. We strongly recommend to check the cluster health and availability to avoid inconsistency between replicas", err))
}
err = conn.ExecStatement(ctx, statement, op, true)
if err != nil {
Expand Down Expand Up @@ -459,7 +547,11 @@ func (conn *ClickHouseConnection) InsertBatch(
return retry.OnNetError(func() error {
batch, err := conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s", qualifiedTableName))
if err != nil {
err = fmt.Errorf("error while preparing batch for %s: %w", qualifiedTableName, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("Error while preparing batch for %s: %w (context state: %v)", qualifiedTableName, err, ctx.Err())
} else {
err = fmt.Errorf("Error while preparing batch for %s: %w", qualifiedTableName, err)
}
log.Error(err)
return err
}
Expand All @@ -469,14 +561,22 @@ func (conn *ClickHouseConnection) InsertBatch(
}
err = batch.Append(row...)
if err != nil {
err = fmt.Errorf("error appending row to a batch for %s: %w", qualifiedTableName, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("error appending row to a batch for %s: %w (context state: %v)", qualifiedTableName, err, ctx.Err())
} else {
err = fmt.Errorf("error appending row to a batch for %s: %w", qualifiedTableName, err)
}
log.Error(err)
return err
}
}
err = batch.Send()
if err != nil {
err = fmt.Errorf("error while sending batch for %s: %w", qualifiedTableName, err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
err = fmt.Errorf("error while sending batch for %s: %w (context state: %v)", qualifiedTableName, err, ctx.Err())
} else {
err = fmt.Errorf("error while sending batch for %s: %w", qualifiedTableName, err)
}
log.Error(err)
return err
}
Expand Down Expand Up @@ -775,10 +875,10 @@ func (conn *ClickHouseConnection) UpdateForEarliestStartHistory(
return err
}

// Wait for all nodes to be available before running mutations
// even though we set alter/mutations_sync=3, we check for all nodes availability and log warning if not all nodes are available
err = conn.WaitAllNodesAvailable(ctx, schemaName, table.Name)
if err != nil {
return err
log.Warn(fmt.Sprintf("It seems like not all nodes are available: %v. We strongly recommend to check the cluster health and availability to avoid inconsistency between replicas", err))
}

groups, err := GroupSlices(uint(len(csv)), *flags.WriteBatchSize, 1)
Expand Down Expand Up @@ -831,13 +931,12 @@ func findColumnInCSV(csvColumns *types.CSVColumns, columnName string) (uint, pb.
}

// WaitAllNodesAvailable
// using the query generated by sql.GetAllReplicasActiveQuery, and retrying it until all the replicas are active,
// we can make sure that the ALTER TABLE statements are executed at a right time, when all the nodes are available,
// so we don't get an exception due to alter_sync=2 and mutations_sync=2 settings usage.
// Before alter/mutation_sync=3 were introduced, we used to check and wait for all replicas to be active, in order to avoid errors like: using the query generated by sql.GetAllReplicasActiveQuery, and retrying it until all the replicas are active,
//
// A sample exception that could occur without this check:
//code: 341, message: Mutation is not finished because some replicas are inactive right now
//
// code: 341, message: Mutation is not finished because some replicas are inactive right now
// We keep this check for monitoring reasons.

func (conn *ClickHouseConnection) WaitAllNodesAvailable(
ctx context.Context,
schemaName string,
Expand Down Expand Up @@ -871,7 +970,7 @@ func (conn *ClickHouseConnection) WaitAllNodesAvailable(
}

// WaitAllMutationsCompleted
// if a call to WaitAllNodesAvailable prior to ALTER TABLE statement execution was not enough,
// if mutation_sync=3 and alter_sync=3 was not enough,
// and one of the nodes went down exactly at the time of the ALTER TABLE statement execution,
// we will still get the error code 341, which indicates that the mutations will still be completed asynchronously;
// wait until all the nodes are available again, and all mutations are completed before sending the response.
Expand All @@ -885,11 +984,10 @@ func (conn *ClickHouseConnection) WaitAllMutationsCompleted(
if conn.isLocal || !isIncompleteMutationErr(mutationError) {
return mutationError
}

// a mutation won't be finished on all replicas until all of them are back
// even though we set alter/mutations_sync=3, we check for all nodes availability and log warning if not all nodes are available
err := conn.WaitAllNodesAvailable(ctx, schemaName, tableName)
if err != nil {
return fmt.Errorf("error while waiting for all nodes to be available: %w; initial cause: %w", err, mutationError)
log.Warn(fmt.Sprintf("It seems like not all nodes are available: %v. We strongly recommend to check the cluster health and availability to avoid inconsistency between replicas", err))
}

query, err := sql.GetAllMutationsCompletedQuery(schemaName, tableName)
Expand Down
16 changes: 15 additions & 1 deletion destination/db/sql/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ func GetUpdateHistoryActiveStatement(

// GetAllReplicasActiveQuery
// generates a query to check if there are no inactive replicas.
// Excludes Hydra Read Only instances which have disable_insertion_and_mutation = '1'
func GetAllReplicasActiveQuery(
schemaName string,
tableName string,
Expand All @@ -604,7 +605,20 @@ func GetAllReplicasActiveQuery(
return "", fmt.Errorf("schema name for table %s is empty", tableName)
}
return fmt.Sprintf(
"SELECT toBool(mapExists((k, v) -> (v = 0), replica_is_active) = 0) AS all_replicas_active FROM system.replicas WHERE database = '%s' AND table = '%s' AND is_readonly != 1 LIMIT 1",
`SELECT toBool(mapExists((k, v) -> (v = 0 AND k IN (
SELECT replica_host FROM (
SELECT hostName() as replica_host, value
FROM clusterAllReplicas(default, system, server_settings)
WHERE name = 'disable_insertion_and_mutation' AND value = '0'
UNION ALL
SELECT hostName() as replica_host, value
FROM clusterAllReplicas(all_groups.default, system, server_settings)
WHERE name = 'disable_insertion_and_mutation' AND value = '0'
)
)), replica_is_active) = 0) AS all_replicas_active
FROM system.replicas
WHERE database = '%s' AND table = '%s'
LIMIT 1`,
schemaName, tableName,
), nil
}
Expand Down
17 changes: 15 additions & 2 deletions destination/db/sql/sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,21 @@ func TestGetHardDeleteStatement(t *testing.T) {
func TestGetAllReplicasActiveQuery(t *testing.T) {
query, err := GetAllReplicasActiveQuery("foo", "bar")
assert.NoError(t, err)
assert.Equal(t, "SELECT toBool(mapExists((k, v) -> (v = 0), replica_is_active) = 0) AS all_replicas_active FROM system.replicas WHERE database = 'foo' AND table = 'bar' AND is_readonly != 1 LIMIT 1", query)

expected := `SELECT toBool(mapExists((k, v) -> (v = 0 AND k IN (
SELECT replica_host FROM (
SELECT hostName() as replica_host, value
FROM clusterAllReplicas(default, system, server_settings)
WHERE name = 'disable_insertion_and_mutation' AND value = '0'
UNION ALL
SELECT hostName() as replica_host, value
FROM clusterAllReplicas(all_groups.default, system, server_settings)
WHERE name = 'disable_insertion_and_mutation' AND value = '0'
)
)), replica_is_active) = 0) AS all_replicas_active
FROM system.replicas
WHERE database = 'foo' AND table = 'bar'
LIMIT 1`
assert.Equal(t, expected, query)
_, err = GetAllReplicasActiveQuery("", "bar")
assert.ErrorContains(t, err, "schema name for table bar is empty")

Expand Down
Loading