@@ -3,10 +3,18 @@ package logcourier
33import (
44 "context"
55 "fmt"
6+ "time"
67
78 "github.com/scality/log-courier/pkg/clickhouse"
89)
910
11+ // toDateTime64String converts time.Time to a string format that ClickHouse
12+ // correctly interprets as DateTime64(3). The Go driver truncates milliseconds
13+ // when binding time.Time directly to DateTime64 parameters.
14+ func toDateTime64String (t time.Time ) string {
15+ return fmt .Sprintf ("%.3f" , float64 (t .UnixMilli ())/ 1000.0 )
16+ }
17+
1018// LogFetcher fetches logs from ClickHouse
1119type LogFetcher struct {
1220 client * clickhouse.Client
@@ -70,12 +78,13 @@ func (lf *LogFetcher) FetchLogs(ctx context.Context, batch LogBatch) ([]LogRecor
7078 LIMIT ?
7179 ` , lf .database , clickhouse .TableAccessLogsFederated )
7280
81+ startTimeStr := toDateTime64String (batch .LastProcessedOffset .StartTime )
7382 rows , err := lf .client .Query (ctx , query ,
7483 batch .Bucket ,
7584 batch .RaftSessionID ,
7685 batch .LastProcessedOffset .InsertedAt ,
77- batch .LastProcessedOffset .InsertedAt , batch . LastProcessedOffset . StartTime ,
78- batch .LastProcessedOffset .InsertedAt , batch . LastProcessedOffset . StartTime , batch .LastProcessedOffset .ReqID ,
86+ batch .LastProcessedOffset .InsertedAt , startTimeStr ,
87+ batch .LastProcessedOffset .InsertedAt , startTimeStr , batch .LastProcessedOffset .ReqID ,
7988 lf .maxLogsPerBatch ,
8089 )
8190 if err != nil {
0 commit comments