-
Notifications
You must be signed in to change notification settings - Fork 2
Address hydra issue + extend logging #54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR enhances the ClickHouse destination connector's reliability and observability by addressing replica synchronization issues and adding comprehensive logging. The main purpose is to prevent "Mutation is not finished because some replicas are inactive right now" errors by using mutation_sync=3 and alter_sync=3 settings, which ensure operations execute only on available replicas. Additionally, the PR adds detailed logging throughout all operations and implements connection statistics tracking.
Key changes:
- Changed synchronization settings from 2 to 3 for mutations and alter operations to avoid inactive replica errors
- Added extensive logging at operation entry/exit points and error conditions, with context cancellation tracking
- Implemented connection statistics tracking that logs metrics every 100 queries
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| destination/service/server.go | Added comprehensive logging to all RPC handlers including operation start/completion, error conditions, and file processing progress |
| destination/db/clickhouse.go | Changed alter_sync and mutations_sync to 3, demoted replica availability failures from errors to warnings, added connection stats tracking, and enhanced error messages with context state |
| destination/db/sql/sql.go | Updated GetAllReplicasActiveQuery to exclude Hydra read-only instances by checking disable_insertion_and_mutation setting |
| destination/db/sql/sql_test.go | Updated test to match new query structure for replica availability checking |
| destination/common/log/logger.go | Added caller information to log output for better traceability |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
destination/service/server.go
Outdated
| } | ||
|
|
||
| func (s *Server) WriteHistoryBatch(ctx context.Context, in *pb.WriteHistoryBatchRequest) (*pb.WriteBatchResponse, error) { | ||
| log.Info(fmt.Sprintf("[WriteHistoryBatch] Starting for %s.%s with earliest_start_files=%d, replace_files=%d, update_files=%d, delete_files=%d", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- does Fivetran provide any recommendations about log levels? this one might be more appropriate for
debug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does Fivetran provide any recommendations about log levels? this one might be more appropriate for debug
Yes, can be found here https://fivetran.com/docs/connector-sdk/technical-reference/connector-sdk-code/connector-sdk-logs
I updated all the Fivetran-related functions, like
WriteHistoryBatchWriteBatchprocessReplaceFilesprocessEarliestStartFilesForHistoryBatch
to be aNoticelevel (debug mode) and keep all ClickHouse-related operations likeDescribeTableCreateTableAlterTableTruncate
to be Info level
|
|
||
| func (s *Server) Test(ctx context.Context, in *pb.TestRequest) (*pb.TestResponse, error) { | ||
|
|
||
| log.Info(fmt.Sprintf("[Test_%s] Starting test", in.Name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we log a stop reason anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes:
- In case of error, we log the error with indication of which test was run
- In case of success, we log this line before returning the success response
log.Info(fmt.Sprintf("[Test_%s] Test completed successfully", in.Name))…ra-and-extend-logging # Conflicts: # destination/db/clickhouse.go
mzitnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see a lot of error messages that are not capitalized. Is it some convention?
|
|
||
| func (conn *ClickHouseConnection) recordQuery(duration time.Duration, success bool) { | ||
| conn.queryCount++ | ||
| conn.totalDuration += duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In totalDuration, we also account for queries that encountered an error when examining conn.totalDuration/time.Duration(conn.queryCount+1))), the average can be skewed by queries that got an error. I think we should separate it totalDuration/totalDurationError average can be problematic
lets take an example q1: 100 q2: 101 q3: 105 q4: 2 (with an error) = AVG = (100+101+105+2)/4 = 77
This is very problematic
So would go for Percentiles/Histogram one for success/failure queries
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that level of sophistication is beyond the scope of this PR. The current stats are meant to be basic connection health indicators rather than comprehensive performance metrics. Implementing proper percentile tracking or histograms would require:
- Additional memory overhead for storing duration buckets/samples
- More complex aggregation logic
- Decisions about retention windows and bucket sizes
- Potentially a metrics library to handle this properly
This feels like it should be its own feature request with proper design discussion. Would you mind opening a ticket for enhanced connection metrics?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open an issue for the correct calcluations
| } | ||
|
|
||
| log.Info(fmt.Sprintf("Initializing ClickHouse connection to %s:%s", | ||
| configuration[config.HostKey], configuration[config.PortKey])) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's possible to add the username with which we connect with
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to understand what user was configured in case of some premissions probleam or mistakes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before customers set up a ClickHouse connection, 2 tests are run by the connector:
- ConnectionTest to check connectivity with the provided user
- GrantsTest to check proper permissions are set
It is important to mention that all these logs are not visible to the end users and can be retrieved only by requesting them from the Fivetran team.
destination/db/clickhouse.go
Outdated
| log.Error(err) | ||
| return err | ||
| } | ||
| log.Info(fmt.Sprintf("Successfully executed %s [query_id=%s] in %v", op, queryID, time.Since(startTime))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using time.Since on lines 160 and 154 can be problematic (if there is a context switch). Since time.Since mainly doing return Now().Sub(t) getting every method call a new Now() I would calculate the since once separately use it in 160 and 154
See for reference: https://github.com/golang/go/blob/master/src/time/time.go#L1227
destination/db/clickhouse.go
Outdated
| return conn.Query(ctx, queryWithID) | ||
| }, ctx, string(op), benchmark) | ||
|
|
||
| conn.recordQuery(time.Since(startTime), err == nil) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
| err = conn.WaitAllNodesAvailable(ctx, schemaName, tableName) | ||
| if err != nil { | ||
| return false, err | ||
| log.Warn(fmt.Sprintf("It seems like not all nodes are available: %v. We strongly recommend to check the cluster health and availability to avoid inconsistency between replicas", err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we returned with an error. Is this still the case since we removed return false, err
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was described in the PR description:
we moved GetAllReplicasActiveQuery to warn on bad replica status, and not fail.
Co-authored-by: Mark Zitnik <[email protected]>
Co-authored-by: Mark Zitnik <[email protected]>
Co-authored-by: Mark Zitnik <[email protected]>
Co-authored-by: Mark Zitnik <[email protected]>
mzitnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still left a few places with not capitalized text for error logging
|
|
||
| func (conn *ClickHouseConnection) recordQuery(duration time.Duration, success bool) { | ||
| conn.queryCount++ | ||
| conn.totalDuration += duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you open an issue for the correct calcluations
Summary
This PR adds the following:
Use
mutation_sync=3andalter_aync=3to make sure we execute alter/mutation operations only on available replicas, to avoid gettingcode: 341, message: Mutation is not finished because some replicas are inactive right nowerror from ClickHouse. Due to that, we movedGetAllReplicasActiveQueryto warn on bad replica status, and not fail. close Support hydra architecture #36Add logs to many operations, and specifically add logs around the context being cancelled, close Extend context cancellation logging #51, and partially takes care of Extend logging for troubleshooting #50
Add stats to all client operations, which will be logged out every 100 queries.