LOGC-46: Fix distributed ClickHouse operations and StartTime handling#90
LOGC-46: Fix distributed ClickHouse operations and StartTime handling#90
Conversation
Add insert_distributed_sync setting to ensure offset writes are visible immediately across the cluster, preventing race conditions where batches could be re-processed.
Change the startTime column from DateTime64(3) to Int64 storing milliseconds since epoch. This addresses issues with ClickHouse DateTime64 handling.
fredmnl
left a comment
There was a problem hiding this comment.
I'm ok with the change but I would have preferred if we found where the loss of precision happens and fixed that. (I'm not convinced it is actually where I made my comment)
| // Always outputs in UTC timezone | ||
| func (b *LogObjectBuilder) formatTimestamp(t time.Time) string { | ||
| utc := t.UTC() | ||
| return utc.Format("[02/Jan/2006:15:04:05 +0000]") |
There was a problem hiding this comment.
Doesn't the rounding happen here? Where the nano-second precision t time.Time is forced to fit the second precision format [02/Jan/2006:15:04:05 +0000]
There was a problem hiding this comment.
There is indeed precision loss here, but this method is used to construct the content of the s3 object that we will put in the destination bucket (which should be seconds)
It seems that the loss of precision happens in the query that reads from ClickHouse, but tbh I haven't looked at the Go driver code. |
I'm okay with that 👍 |
| // Uses composite filter to fetch only logs after LastProcessedOffset. | ||
| func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecord, error) { | ||
| // StartTime is stored as milliseconds since epoch | ||
| startTimeMillis := batch.LastProcessedOffset.StartTime |
There was a problem hiding this comment.
Was it require to change the type of StartTime to uint64 ? Why not convert time.Time to ms here instead of changing the type? If you don't change the type the rest of the code can remain the same
There was a problem hiding this comment.
I tried this:
--- i/pkg/logcourier/logfetch.go
+++ w/pkg/logcourier/logfetch.go
@@ -63,8 +63,8 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
AND raftSessionID = ?
AND (
insertedAt > ?
- OR (insertedAt = ? AND startTime > ?)
- OR (insertedAt = ? AND startTime = ? AND req_id > ?)
+ OR (insertedAt = ? AND startTime > fromUnixTimestamp64Milli(?))
+ OR (insertedAt = ? AND startTime = fromUnixTimestamp64Milli(?) AND req_id > ?)
)
ORDER BY insertedAt ASC, startTime ASC, req_id ASC
LIMIT ?
@@ -74,8 +74,8 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
batch.Bucket,
batch.RaftSessionID,
batch.LastProcessedOffset.InsertedAt,
- batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime,
- batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime, batch.LastProcessedOffset.ReqID,
+ batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime.UnixMilli(),
+ batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime.UnixMilli(), batch.LastProcessedOffset.ReqID,
lf.maxLogsPerBatch,
)
but it fails with:
log-courier | {"time":"2026-01-22T20:10:30.867419073Z","level":"WARN","msg":"transient error, will retry upload","bucketName":"e2e-source-1769112621578789528","attempt":2,"error":"failed to fetch logs: error iterating log records: code: 41, message: Received from 127.0.0.1:9002. DB::Exception: Cannot parse DateTime: In scope SELECT __table1.bucketOwner AS bucketOwner, __table1.bucketName AS bucketName, __table1.startTime AS startTime, __table1.clientIP AS clientIP, __table1.requester AS requester, __table1.req_id AS req_id, __table1.operation AS operation, __table1.objectKey AS objectKey, __table1.requestURI AS requestURI, __table1.httpCode AS httpCode, __table1.errorCode AS errorCode, __table1.bytesSent AS bytesSent, __table1.objectSize AS objectSize, __table1.totalTime AS totalTime, __table1.turnAroundTime AS turnAroundTime, __table1.referer AS referer, __table1.userAgent AS userAgent, __table1.versionId AS versionId, __table1.signatureVersion AS signatureVersion, __table1.cipherSuite AS cipherSuite, __table1.authenticationType AS authenticationType, __table1.hostHeader AS hostHeader, __table1.tlsVersion AS tlsVersion, __table1.aclRequired AS aclRequired, __table1.insertedAt AS insertedAt, __table1.loggingTargetBucket AS loggingTargetBucket, __table1.loggingTargetPrefix AS loggingTargetPrefix, __table1.raftSessionID AS raftSessionID FROM logs.access_logs AS __table1 WHERE (__table1.bucketName = 'e2e-source-1769112621578789528') AND (__table1.raftSessionID = 2) AND ((__table1.insertedAt > _CAST(0, 'DateTime')) OR ((__table1.insertedAt = _CAST(0, 'DateTime')) AND (__table1.startTime > _CAST('0', 'DateTime64(3)'))) OR ((__table1.insertedAt = _CAST(0, 'DateTime')) AND (__table1.startTime = _CAST('0', 'DateTime64(3)')) AND (__table1.req_id > ''))) ORDER BY __table1.insertedAt ASC, __table1.startTime ASC, __table1.req_id ASC LIMIT _CAST(100000, 'UInt64'). Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000cbcedbb\n1. DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000007668f23\n2. void DB::readDateTimeTextFallback<void, true>(long&, DB::ReadBuffer&, DateLUTImpl const&) @ 0x000000000cc388d1\n3. void DB::readDateTimeTextImpl<void>(DB::DateTime64&, unsigned int, DB::ReadBuffer&, DateLUTImpl const&) @ 0x000000000786e590\n4. COW<DB::IColumn>::immutable_ptr<DB::IColumn> DB::(anonymous namespace)::ConvertImpl<DB::DataTypeString, DB::DataTypeDateTime64, DB::(anonymous namespace)::FunctionCastName, (DB::FormatSettings::DateTimeOverflowBehavior)0>::execute<unsigned int>(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, DB::(anonymous namespace)::BehaviourOnErrorFromString, unsigned int) @ 0x0000000007880bbc\n5. bool DB::callOnIndexAndDataType<DB::DataTypeDateTime64, std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)::operator()(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) const::'lambda'(DB::DataTypeDateTime64 const&)>(DB::TypeIndex, std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)::operator()(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) const::'lambda'(DB::DataTypeDateTime64 const&)&&) @ 0x000000000787e5e6\n6. COW<DB::IColumn>::immutable_ptr<DB::IColumn> std::__function::__policy_invoker<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)>::__call_impl<std::__function::__default_alloc_func<std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long), COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)>>(std::__function::__policy_storage const*, std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) @ 0x000000000787d7ca\n7. DB::(anonymous namespace)::ExecutableFunctionCast::executeImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const (.bbec4b369713e2406103dfff8e75fc70) @ 0x00000000077395e4\n8. DB::IExecutableFunction::executeDryRunImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const @ 0x0000000007676e2a\n9. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f8489ff\n10. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f848a3c\n11. DB::IExecutableFunction::executeWithoutSparseColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f8496c4\n12. DB::IExecutableFunction::execute(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f84ad1b\n13. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010bee2b2\n14. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n15. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n16. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n17. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n18. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n19. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n20. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n21. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n22. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n23. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n24. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n25. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n26. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n27. DB::(anonymous namespace)::QueryAnalyzer::resolveQuery(std::shared_ptr<DB::IQueryTreeNode> const&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010bc753c\n28. DB::QueryAnalysisPass::run(std::shared_ptr<DB::IQueryTreeNode>&, std::shared_ptr<DB::Context const>) @ 0x0000000010bc4d25\n29. DB::QueryTreePassManager::run(std::shared_ptr<DB::IQueryTreeNode>, unsigned long) @ 0x0000000010bc3821\n30. DB::(anonymous namespace)::buildQueryTreeAndRunPasses(std::shared_ptr<DB::IAST> const&, DB::SelectQueryOptions const&, std::shared_ptr<DB::Context const> const&, std::shared_ptr<DB::IStorage> const&) (.llvm.17547086829861056443) @ 0x0000000010e541b8\n31. DB::InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::Context const> const&, DB::SelectQueryOptions const&) @ 0x0000000010e52fb1\n: While executing Remote"}
I'd also prefer to not change the datatype, but it is the only apprach that worked.
There was a problem hiding this comment.
I was thinking startTimeMillis := batch.LastProcessedOffset.StartTime.UnixMilli(), why change the query?
Doing the conversion at the struct level, also forces you to convert back to time.Time
log-courier/pkg/logcourier/logobject.go
Line 86 in 60b9cf1
log-courier/pkg/logcourier/logobject.go
Line 177 in 60b9cf1
log-courier/pkg/logcourier/processor.go
Line 684 in 60b9cf1
time.Time contains the milliseconds so there is no need to use int64
There was a problem hiding this comment.
Ah, I see the confusion.
using batch.LastProcessedOffset.StartTime.UnixMilli() will return the number of ms since unix epoch which does not work with the query as it it (it expects Datetime(3)).
time.UnixMilli() will indeed give ms precision.
Is converting to int64 a blocker for you? I can test the proposed change, and I agree that it is a simpler change.
But this change fixes the issue, and we have already merged the change in cloudserver (and it will be soon merged in Federation).
Is it a no-go to proceed with the change proposed here?
There was a problem hiding this comment.
If you don't have the time you can merge it.
But it would be better to have only a three line change instead of changing hundred of lines and loosing the time.Time type
There was a problem hiding this comment.
We discussed with @leif-scality and indeed we found a simpler fix. See: #95
| ReqID string | ||
| Action string | ||
| ObjectKey string | ||
| StartTime int64 // Milliseconds since epoch |
There was a problem hiding this comment.
I would suggest not changing the type and calling UnixMillis when needed
| S3SecretAccessKey: logcourier.ConfigSpec.GetString("s3.secret-access-key"), | ||
| Logger: logger, | ||
| Metrics: metrics, | ||
| ClickHouseSettings: map[string]interface{}{ |
tcarmet
left a comment
There was a problem hiding this comment.
LGTM, small question, everything seems to be implicitly testing what you changed, do you think it's worth adding a small new one to test it more explicitly, with the goal of documenting the behavior, not necessarily making sure it's tested because it already is. Or it doesn't make sense?
Only asking this because I didn't see a new testcase and I'm wondering.
That's a valid point. Indeed, I relied on existing tests to ensure everything works as expected. |
| // Uses composite filter to fetch only logs after LastProcessedOffset. | ||
| func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecord, error) { | ||
| // StartTime is stored as milliseconds since epoch | ||
| startTimeMillis := batch.LastProcessedOffset.StartTime |
There was a problem hiding this comment.
I was thinking startTimeMillis := batch.LastProcessedOffset.StartTime.UnixMilli(), why change the query?
Doing the conversion at the struct level, also forces you to convert back to time.Time
log-courier/pkg/logcourier/logobject.go
Line 86 in 60b9cf1
log-courier/pkg/logcourier/logobject.go
Line 177 in 60b9cf1
log-courier/pkg/logcourier/processor.go
Line 684 in 60b9cf1
time.Time contains the milliseconds so there is no need to use int64
The offset currently uses
DateTime64(3)forstartTime, but comparisons do not work as expected.End-to-end tests showed that log-courier fetches log records before the offset (in the same second).
Change
StartTimefield fromtime.Time/DateTime64(3)toint64/Int64storing milliseconds since epoch:Also, enable synchronous distributed inserts (
insert_distributed_sync) to ensure offset writes are immediately visible across the cluster.