Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/log-courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func buildProcessorConfig(logger *slog.Logger, metrics *logcourier.Metrics) logc
S3SecretAccessKey: logcourier.ConfigSpec.GetString("s3.secret-access-key"),
Logger: logger,
Metrics: metrics,
ClickHouseSettings: map[string]interface{}{

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this used?

"insert_distributed_sync": 1,
},
}
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/clickhouse/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var _ = Describe("ClickHouse Client", func() {

It("should insert into federated table", func() {
testLog := testutil.TestLogRecord{
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
BucketName: "test-bucket",
ReqID: "req-123",
Action: "GetObject",
Expand All @@ -149,7 +149,7 @@ var _ = Describe("ClickHouse Client", func() {
// Insert multiple logs
for i := 0; i < 5; i++ {
testLog := testutil.TestLogRecord{
StartTime: now.Add(time.Duration(i) * time.Second),
StartTime: now.Add(time.Duration(i) * time.Second).UnixMilli(),
BucketName: "test-bucket-multi",
ReqID: fmt.Sprintf("req-%d", i),
Action: "GetObject",
Expand Down
130 changes: 65 additions & 65 deletions pkg/logcourier/batchfinder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.InsertTestLog(ctx, testutil.TestLogRecord{
LoggingEnabled: true,
BucketName: "test-bucket",
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-%d", i),
Action: "GetObject",
})
Expand Down Expand Up @@ -83,12 +83,12 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
oldTime.Add(time.Duration(i)*time.Second), // insertedAt
"test-bucket", // bucketName
oldTime.Add(time.Duration(i)*time.Second), // startTime
fmt.Sprintf("req-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
uint16(0), // raftSessionID
"/test-bucket/key", // requestURI
oldTime.Add(time.Duration(i)*time.Second).UnixMilli(), // startTime
fmt.Sprintf("req-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
uint16(0), // raftSessionID
"/test-bucket/key", // requestURI
)
Expect(err).NotTo(HaveOccurred())
}
Expand All @@ -99,7 +99,7 @@ var _ = Describe("BatchFinder", func() {
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedStartTime, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
helper.DatabaseName, clickhouse.TableOffsetsFederated)
err := helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), lastLogTime, lastLogTime, "req-007")
"test-bucket", uint16(0), lastLogTime, lastLogTime.UnixMilli(), "req-007")
Expect(err).NotTo(HaveOccurred())

batches, err := finder.FindBatches(ctx)
Expand All @@ -123,34 +123,34 @@ var _ = Describe("BatchFinder", func() {
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
err := helper.Client().Exec(ctx, query,
"", // bucketOwner
"test-bucket", // bucketName
time.Now(), // startTime
"", // clientIP
"", // requester
"req-old", // req_id
"GetObject", // operation
"", // objectKey
"/test-bucket/key", // requestURI
uint16(0), // httpCode
"", // errorCode
uint64(0), // bytesSent
uint64(0), // objectSize
float32(0), // totalTime
float32(0), // turnAroundTime
"", // referer
"", // userAgent
"", // versionId
"", // signatureVersion
"", // cipherSuite
"", // authenticationType
"", // hostHeader
"", // tlsVersion
"", // aclRequired
oldTime, // insertedAt (old)
"", // loggingTargetBucket
"", // loggingTargetPrefix
uint16(0), // raftSessionID
"", // bucketOwner
"test-bucket", // bucketName
time.Now().UnixMilli(), // startTime
"", // clientIP
"", // requester
"req-old", // req_id
"GetObject", // operation
"", // objectKey
"/test-bucket/key", // requestURI
uint16(0), // httpCode
"", // errorCode
uint64(0), // bytesSent
uint64(0), // objectSize
float32(0), // totalTime
float32(0), // turnAroundTime
"", // referer
"", // userAgent
"", // versionId
"", // signatureVersion
"", // cipherSuite
"", // authenticationType
"", // hostHeader
"", // tlsVersion
"", // aclRequired
oldTime, // insertedAt (old)
"", // loggingTargetBucket
"", // loggingTargetPrefix
uint16(0), // raftSessionID
)
Expect(err).NotTo(HaveOccurred())

Expand All @@ -172,7 +172,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
oldTime, // insertedAt (old)
"test-bucket", // bucketName
oldTime, // startTime
oldTime.UnixMilli(), // startTime
fmt.Sprintf("req-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
Expand All @@ -194,7 +194,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.InsertTestLog(ctx, testutil.TestLogRecord{
LoggingEnabled: true,
BucketName: "test-bucket",
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-%d", i),
Action: "GetObject",
})
Expand All @@ -210,7 +210,7 @@ var _ = Describe("BatchFinder", func() {
Describe("Composite Offset Filtering", func() {
It("should filter when offset differs in insertedAt", func() {
baseTime := time.Now().Add(-2 * time.Hour)
startTime := baseTime
startTimeVal := baseTime
reqID := "req-1"

// Insert 6 logs with insertedAt from T to T+5s, same startTime and reqID
Expand All @@ -222,13 +222,13 @@ var _ = Describe("BatchFinder", func() {
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
err := helper.Client().Exec(ctx, query,
baseTime.Add(time.Duration(i)*time.Second), // insertedAt varies
"test-bucket", // bucketName
startTime, // startTime (same for all)
reqID, // req_id (same for all)
"GetObject", // operation
true, // loggingEnabled
uint16(0), // raftSessionID
"/test-bucket/key", // requestURI
"test-bucket", // bucketName
startTimeVal.UnixMilli(), // startTime (same for all)
reqID, // req_id (same for all)
"GetObject", // operation
true, // loggingEnabled
uint16(0), // raftSessionID
"/test-bucket/key", // requestURI
)
Expect(err).NotTo(HaveOccurred())
}
Expand All @@ -239,7 +239,7 @@ var _ = Describe("BatchFinder", func() {
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedStartTime, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
helper.DatabaseName, clickhouse.TableOffsetsFederated)
err := helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), offsetTime, startTime, reqID)
"test-bucket", uint16(0), offsetTime, startTimeVal.UnixMilli(), reqID)
Expect(err).NotTo(HaveOccurred())

batches, err := finder.FindBatches(ctx)
Expand All @@ -256,7 +256,7 @@ var _ = Describe("BatchFinder", func() {

// Insert 6 logs with same insertedAt, varying startTime
for i := 0; i < 6; i++ {
startTime := baseTime.Add(time.Duration(i) * time.Second)
startTimeVal := baseTime.Add(time.Duration(i) * time.Second)
query := fmt.Sprintf(`
INSERT INTO %s.%s
(insertedAt, bucketName, startTime, req_id, operation, loggingEnabled, raftSessionID, requestURI)
Expand All @@ -265,7 +265,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
insertedAt, // insertedAt (same for all)
"test-bucket", // bucketName
startTime, // startTime varies
startTimeVal.UnixMilli(), // startTime varies
fmt.Sprintf("req-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
Expand All @@ -281,7 +281,7 @@ var _ = Describe("BatchFinder", func() {
"INSERT INTO %s.%s (bucketName, raftSessionID, lastProcessedInsertedAt, lastProcessedStartTime, lastProcessedReqId) VALUES (?, ?, ?, ?, ?)",
helper.DatabaseName, clickhouse.TableOffsetsFederated)
err := helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), insertedAt, offsetTimestamp, "req-003")
"test-bucket", uint16(0), insertedAt, offsetTimestamp.UnixMilli(), "req-003")
Expect(err).NotTo(HaveOccurred())

batches, err := finder.FindBatches(ctx)
Expand All @@ -306,7 +306,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
insertedAt, // insertedAt
"test-bucket", // bucketName
insertedAt, // startTime
insertedAt.UnixMilli(), // startTime
fmt.Sprintf("req-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
Expand All @@ -323,17 +323,17 @@ var _ = Describe("BatchFinder", func() {

t0 := baseTime
err := helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), t0, t0, "req-000")
"test-bucket", uint16(0), t0, t0.UnixMilli(), "req-000")
Expect(err).NotTo(HaveOccurred())

t2 := baseTime.Add(2 * time.Second)
err = helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), t2, t2, "req-002")
"test-bucket", uint16(0), t2, t2.UnixMilli(), "req-002")
Expect(err).NotTo(HaveOccurred())

t1 := baseTime.Add(1 * time.Second)
err = helper.Client().Exec(ctx, offsetQuery,
"test-bucket", uint16(0), t1, t1, "req-001")
"test-bucket", uint16(0), t1, t1.UnixMilli(), "req-001")
Expect(err).NotTo(HaveOccurred())

batches, err := finder.FindBatches(ctx)
Expand All @@ -353,7 +353,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.InsertTestLog(ctx, testutil.TestLogRecord{
LoggingEnabled: true,
BucketName: "bucket-1",
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-1-%d", i),
Action: "GetObject",
})
Expand All @@ -364,7 +364,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.InsertTestLog(ctx, testutil.TestLogRecord{
LoggingEnabled: true,
BucketName: "bucket-2",
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-2-%d", i),
Action: "PutObject",
})
Expand Down Expand Up @@ -393,7 +393,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
oldTime, // insertedAt (older)
"bucket-A", // bucketName
oldTime, // startTime
oldTime.UnixMilli(), // startTime
fmt.Sprintf("req-a-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
Expand All @@ -413,7 +413,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
newerTime, // insertedAt (newer)
"bucket-B", // bucketName
newerTime, // startTime
newerTime.UnixMilli(), // startTime
fmt.Sprintf("req-b-%03d", i), // req_id
"GetObject", // operation
true, // loggingEnabled
Expand Down Expand Up @@ -441,7 +441,7 @@ var _ = Describe("BatchFinder", func() {
LoggingEnabled: true,
BucketName: "test-bucket",
RaftSessionID: 1,
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-1-%d", i),
Action: "GetObject",
})
Expand All @@ -454,7 +454,7 @@ var _ = Describe("BatchFinder", func() {
LoggingEnabled: true,
BucketName: "test-bucket",
RaftSessionID: 2,
StartTime: time.Now(),
StartTime: time.Now().UnixMilli(),
ReqID: fmt.Sprintf("req-2-%d", i),
Action: "GetObject",
})
Expand Down Expand Up @@ -490,15 +490,15 @@ var _ = Describe("BatchFinder", func() {
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
`, helper.DatabaseName, clickhouse.TableAccessLogsFederated)
err := helper.Client().Exec(ctx, query,
logAInsertedAt, "test-bucket", logAStartTime, "req-A",
logAInsertedAt, "test-bucket", logAStartTime.UnixMilli(), "req-A",
"GetObject", true, uint16(0), "/test-bucket/key-a")
Expect(err).NotTo(HaveOccurred())

// Insert Log B: operation at :01, arrives at :05
logBStartTime := baseTime.Add(1 * time.Second)
logBInsertedAt := baseTime.Add(5 * time.Second)
err = helper.Client().Exec(ctx, query,
logBInsertedAt, "test-bucket", logBStartTime, "req-B",
logBInsertedAt, "test-bucket", logBStartTime.UnixMilli(), "req-B",
"GetObject", true, uint16(0), "/test-bucket/key-b")
Expect(err).NotTo(HaveOccurred())

Expand Down Expand Up @@ -538,7 +538,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.InsertTestLog(ctx, testutil.TestLogRecord{
LoggingEnabled: true,
BucketName: "bucket-recent",
StartTime: recentTime,
StartTime: recentTime.UnixMilli(),
ReqID: fmt.Sprintf("req-recent-%d", i),
Action: "GetObject",
})
Expand All @@ -557,7 +557,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
oldTime,
"bucket-old",
oldTime,
oldTime.UnixMilli(),
fmt.Sprintf("req-old-%d", i),
"GetObject",
true,
Expand Down Expand Up @@ -591,7 +591,7 @@ var _ = Describe("BatchFinder", func() {
err := helper.Client().Exec(ctx, query,
insertTime,
bucketName,
insertTime,
insertTime.UnixMilli(),
fmt.Sprintf("req-%d-%d", bucketNum, i),
"GetObject",
true,
Expand Down
4 changes: 2 additions & 2 deletions pkg/logcourier/logbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

// LogBatch represents a batch of logs ready for processing
type LogBatch struct {
LastProcessedOffset Offset // Offset to filter from (empty if no previous processing)
Bucket string
RaftSessionID uint16
LastProcessedOffset Offset // Offset to filter from (empty if no previous processing)
LogCount uint64
RaftSessionID uint16
}

// String returns a string representation for logging
Expand Down
13 changes: 8 additions & 5 deletions pkg/logcourier/logfetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ func NewLogFetcher(client *clickhouse.Client, database string, maxLogsPerBatch i
// LogBuilder will re-sort by startTime, req_id.
// Uses composite filter to fetch only logs after LastProcessedOffset.
func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecord, error) {
// StartTime is stored as milliseconds since epoch
startTimeMillis := batch.LastProcessedOffset.StartTime

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was it require to change the type of StartTime to uint64 ? Why not convert time.Time to ms here instead of changing the type? If you don't change the type the rest of the code can remain the same

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried this:

--- i/pkg/logcourier/logfetch.go
+++ w/pkg/logcourier/logfetch.go
@@ -63,8 +63,8 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
                  AND raftSessionID = ?
                  AND (
                      insertedAt > ?
-                     OR (insertedAt = ? AND startTime > ?)
-                     OR (insertedAt = ? AND startTime = ? AND req_id > ?)
+                     OR (insertedAt = ? AND startTime > fromUnixTimestamp64Milli(?))
+                     OR (insertedAt = ? AND startTime = fromUnixTimestamp64Milli(?) AND req_id > ?)
                  )
                ORDER BY insertedAt ASC, startTime ASC, req_id ASC
                LIMIT ?
@@ -74,8 +74,8 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
                batch.Bucket,
                batch.RaftSessionID,
                batch.LastProcessedOffset.InsertedAt,
-               batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime,
-               batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime, batch.LastProcessedOffset.ReqID,
+               batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime.UnixMilli(),
+               batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime.UnixMilli(), batch.LastProcessedOffset.ReqID,
                lf.maxLogsPerBatch,
        )

but it fails with:

log-courier  | {"time":"2026-01-22T20:10:30.867419073Z","level":"WARN","msg":"transient error, will retry upload","bucketName":"e2e-source-1769112621578789528","attempt":2,"error":"failed to fetch logs: error iterating log records: code: 41, message: Received from 127.0.0.1:9002. DB::Exception: Cannot parse DateTime: In scope SELECT __table1.bucketOwner AS bucketOwner, __table1.bucketName AS bucketName, __table1.startTime AS startTime, __table1.clientIP AS clientIP, __table1.requester AS requester, __table1.req_id AS req_id, __table1.operation AS operation, __table1.objectKey AS objectKey, __table1.requestURI AS requestURI, __table1.httpCode AS httpCode, __table1.errorCode AS errorCode, __table1.bytesSent AS bytesSent, __table1.objectSize AS objectSize, __table1.totalTime AS totalTime, __table1.turnAroundTime AS turnAroundTime, __table1.referer AS referer, __table1.userAgent AS userAgent, __table1.versionId AS versionId, __table1.signatureVersion AS signatureVersion, __table1.cipherSuite AS cipherSuite, __table1.authenticationType AS authenticationType, __table1.hostHeader AS hostHeader, __table1.tlsVersion AS tlsVersion, __table1.aclRequired AS aclRequired, __table1.insertedAt AS insertedAt, __table1.loggingTargetBucket AS loggingTargetBucket, __table1.loggingTargetPrefix AS loggingTargetPrefix, __table1.raftSessionID AS raftSessionID FROM logs.access_logs AS __table1 WHERE (__table1.bucketName = 'e2e-source-1769112621578789528') AND (__table1.raftSessionID = 2) AND ((__table1.insertedAt > _CAST(0, 'DateTime')) OR ((__table1.insertedAt = _CAST(0, 'DateTime')) AND (__table1.startTime > _CAST('0', 'DateTime64(3)'))) OR ((__table1.insertedAt = _CAST(0, 'DateTime')) AND (__table1.startTime = _CAST('0', 'DateTime64(3)')) AND (__table1.req_id > ''))) ORDER BY __table1.insertedAt ASC, __table1.startTime ASC, __table1.req_id ASC LIMIT _CAST(100000, 'UInt64'). Stack trace:\n\n0. DB::Exception::Exception(DB::Exception::MessageMasked&&, int, bool) @ 0x000000000cbcedbb\n1. DB::Exception::Exception<>(int, FormatStringHelperImpl<>) @ 0x0000000007668f23\n2. void DB::readDateTimeTextFallback<void, true>(long&, DB::ReadBuffer&, DateLUTImpl const&) @ 0x000000000cc388d1\n3. void DB::readDateTimeTextImpl<void>(DB::DateTime64&, unsigned int, DB::ReadBuffer&, DateLUTImpl const&) @ 0x000000000786e590\n4. COW<DB::IColumn>::immutable_ptr<DB::IColumn> DB::(anonymous namespace)::ConvertImpl<DB::DataTypeString, DB::DataTypeDateTime64, DB::(anonymous namespace)::FunctionCastName, (DB::FormatSettings::DateTimeOverflowBehavior)0>::execute<unsigned int>(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, DB::(anonymous namespace)::BehaviourOnErrorFromString, unsigned int) @ 0x0000000007880bbc\n5. bool DB::callOnIndexAndDataType<DB::DataTypeDateTime64, std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)::operator()(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) const::'lambda'(DB::DataTypeDateTime64 const&)>(DB::TypeIndex, std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)::operator()(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) const::'lambda'(DB::DataTypeDateTime64 const&)&&) @ 0x000000000787e5e6\n6. COW<DB::IColumn>::immutable_ptr<DB::IColumn> std::__function::__policy_invoker<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)>::__call_impl<std::__function::__default_alloc_func<std::function<COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)> DB::(anonymous namespace)::FunctionCast::createDecimalWrapper<DB::DataTypeDateTime64>(std::shared_ptr<DB::IDataType const> const&, DB::DataTypeDateTime64 const*, bool) const::'lambda'(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long), COW<DB::IColumn>::immutable_ptr<DB::IColumn> (std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long)>>(std::__function::__policy_storage const*, std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>>&, std::shared_ptr<DB::IDataType const> const&, DB::ColumnNullable const*, unsigned long) @ 0x000000000787d7ca\n7. DB::(anonymous namespace)::ExecutableFunctionCast::executeImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const (.bbec4b369713e2406103dfff8e75fc70) @ 0x00000000077395e4\n8. DB::IExecutableFunction::executeDryRunImpl(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long) const @ 0x0000000007676e2a\n9. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f8489ff\n10. DB::IExecutableFunction::executeWithoutLowCardinalityColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f848a3c\n11. DB::IExecutableFunction::executeWithoutSparseColumns(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f8496c4\n12. DB::IExecutableFunction::execute(std::vector<DB::ColumnWithTypeAndName, std::allocator<DB::ColumnWithTypeAndName>> const&, std::shared_ptr<DB::IDataType const> const&, unsigned long, bool) const @ 0x000000000f84ad1b\n13. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010bee2b2\n14. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n15. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n16. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n17. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n18. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n19. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n20. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n21. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n22. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n23. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n24. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNodeList(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bcf54d\n25. DB::(anonymous namespace)::QueryAnalyzer::resolveFunction(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010be9237\n26. DB::(anonymous namespace)::QueryAnalyzer::resolveExpressionNode(std::shared_ptr<DB::IQueryTreeNode>&, DB::(anonymous namespace)::IdentifierResolveScope&, bool, bool) @ 0x0000000010bd09a0\n27. DB::(anonymous namespace)::QueryAnalyzer::resolveQuery(std::shared_ptr<DB::IQueryTreeNode> const&, DB::(anonymous namespace)::IdentifierResolveScope&) @ 0x0000000010bc753c\n28. DB::QueryAnalysisPass::run(std::shared_ptr<DB::IQueryTreeNode>&, std::shared_ptr<DB::Context const>) @ 0x0000000010bc4d25\n29. DB::QueryTreePassManager::run(std::shared_ptr<DB::IQueryTreeNode>, unsigned long) @ 0x0000000010bc3821\n30. DB::(anonymous namespace)::buildQueryTreeAndRunPasses(std::shared_ptr<DB::IAST> const&, DB::SelectQueryOptions const&, std::shared_ptr<DB::Context const> const&, std::shared_ptr<DB::IStorage> const&) (.llvm.17547086829861056443) @ 0x0000000010e541b8\n31. DB::InterpreterSelectQueryAnalyzer::InterpreterSelectQueryAnalyzer(std::shared_ptr<DB::IAST> const&, std::shared_ptr<DB::Context const> const&, DB::SelectQueryOptions const&) @ 0x0000000010e52fb1\n: While executing Remote"}

I'd also prefer to not change the datatype, but it is the only apprach that worked.

Copy link

@leif-scality leif-scality Jan 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking startTimeMillis := batch.LastProcessedOffset.StartTime.UnixMilli(), why change the query?

Doing the conversion at the struct level, also forces you to convert back to time.Time

timestamp := time.UnixMilli(startTimeMillis)

t := time.UnixMilli(millis).UTC()

lag := time.Since(time.UnixMilli(records[0].StartTime))

time.Time contains the milliseconds so there is no need to use int64

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see the confusion.
using batch.LastProcessedOffset.StartTime.UnixMilli() will return the number of ms since unix epoch which does not work with the query as it it (it expects Datetime(3)).
time.UnixMilli() will indeed give ms precision.

Is converting to int64 a blocker for you? I can test the proposed change, and I agree that it is a simpler change.
But this change fixes the issue, and we have already merged the change in cloudserver (and it will be soon merged in Federation).
Is it a no-go to proceed with the change proposed here?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't have the time you can merge it.

But it would be better to have only a three line change instead of changing hundred of lines and loosing the time.Time type

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed with @leif-scality and indeed we found a simpler fix. See: #95


query := fmt.Sprintf(`
SELECT
bucketOwner,
Expand Down Expand Up @@ -63,19 +66,19 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
AND raftSessionID = ?
AND (
insertedAt > ?
OR (insertedAt = ? AND startTime > ?)
OR (insertedAt = ? AND startTime = ? AND req_id > ?)
OR (insertedAt = ? AND startTime > %d)
OR (insertedAt = ? AND startTime = %d AND req_id > ?)
)
ORDER BY insertedAt ASC, startTime ASC, req_id ASC
LIMIT ?
`, lf.database, clickhouse.TableAccessLogsFederated)
`, lf.database, clickhouse.TableAccessLogsFederated, startTimeMillis, startTimeMillis)

rows, err := lf.client.Query(ctx, query,
batch.Bucket,
batch.RaftSessionID,
batch.LastProcessedOffset.InsertedAt,
batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime,
batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.StartTime, batch.LastProcessedOffset.ReqID,
batch.LastProcessedOffset.InsertedAt,
batch.LastProcessedOffset.InsertedAt, batch.LastProcessedOffset.ReqID,
lf.maxLogsPerBatch,
)
if err != nil {
Expand Down
Loading
Loading