Skip to content
Merged
4 changes: 2 additions & 2 deletions pkg/clickhouse/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ var _ = Describe("ClickHouse Client", func() {

It("should insert into federated table", func() {
testLog := testutil.TestLogRecord{
Timestamp: time.Now(),
StartTime: time.Now(),
BucketName: "test-bucket",
ReqID: "req-123",
Action: "GetObject",
Expand All @@ -149,7 +149,7 @@ var _ = Describe("ClickHouse Client", func() {
// Insert multiple logs
for i := 0; i < 5; i++ {
testLog := testutil.TestLogRecord{
Timestamp: now.Add(time.Duration(i) * time.Second),
StartTime: now.Add(time.Duration(i) * time.Second),
BucketName: "test-bucket-multi",
ReqID: fmt.Sprintf("req-%d", i),
Action: "GetObject",
Expand Down
24 changes: 12 additions & 12 deletions pkg/logcourier/batchfinder.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,18 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
bucketName,
raftSessionID,
lastProcessedInsertedAt,
lastProcessedTimestamp,
lastProcessedStartTime,
lastProcessedReqId
FROM (
SELECT
bucketName,
raftSessionID,
lastProcessedInsertedAt,
lastProcessedTimestamp,
lastProcessedStartTime,
lastProcessedReqId,
ROW_NUMBER() OVER (
PARTITION BY bucketName, raftSessionID
ORDER BY lastProcessedInsertedAt DESC, lastProcessedTimestamp DESC, lastProcessedReqId DESC
ORDER BY lastProcessedInsertedAt DESC, lastProcessedStartTime DESC, lastProcessedReqId DESC
) as rn
FROM %s.%s
) offsets_ordered
Expand All @@ -73,11 +73,11 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
-- Purpose: Count unprocessed logs for each bucket
--
-- Joins access_logs with bucket_offsets to find logs that haven't been processed yet.
-- A log is considered unprocessed if its (insertedAt, timestamp, reqID) is greater
-- A log is considered unprocessed if its (insertedAt, startTime, reqID) is greater
-- than the stored offset using lexicographic comparison:
-- 1. insertedAt > offset.insertedAt, OR
-- 2. insertedAt = offset.insertedAt AND timestamp > offset.timestamp, OR
-- 3. insertedAt = offset.insertedAt AND timestamp = offset.timestamp AND reqID > offset.reqID
-- 2. insertedAt = offset.insertedAt AND startTime > offset.startTime, OR
-- 3. insertedAt = offset.insertedAt AND startTime = offset.startTime AND reqID > offset.reqID
--
-- For buckets with no offset, LEFT JOIN returns NULL values which are handled explicitly in WHERE clause.
new_logs_by_bucket AS (
Expand All @@ -87,7 +87,7 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
count() AS new_log_count,
min(l.insertedAt) as min_ts,
o.lastProcessedInsertedAt,
o.lastProcessedTimestamp,
o.lastProcessedStartTime,
o.lastProcessedReqId
FROM %s.%s AS l
LEFT JOIN bucket_offsets AS o
Expand All @@ -100,14 +100,14 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
OR l.insertedAt > o.lastProcessedInsertedAt
OR (
l.insertedAt = o.lastProcessedInsertedAt
AND l.timestamp > o.lastProcessedTimestamp
AND l.startTime > o.lastProcessedStartTime
)
OR (
l.insertedAt = o.lastProcessedInsertedAt
AND l.timestamp = o.lastProcessedTimestamp
AND l.startTime = o.lastProcessedStartTime
AND l.req_id > o.lastProcessedReqId
)
GROUP BY l.bucketName, l.raftSessionID, o.lastProcessedInsertedAt, o.lastProcessedTimestamp, o.lastProcessedReqId
GROUP BY l.bucketName, l.raftSessionID, o.lastProcessedInsertedAt, o.lastProcessedStartTime, o.lastProcessedReqId
)
-- Main query: Select buckets that are ready for processing
--
Expand All @@ -117,7 +117,7 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
--
-- Results are ordered by min_ts (oldest first) to prioritize buckets with oldest logs.
-- LIMIT ensures we only process maxBuckets per discovery cycle.
SELECT bucketName, raftSessionID, new_log_count, lastProcessedInsertedAt, lastProcessedTimestamp, lastProcessedReqId
SELECT bucketName, raftSessionID, new_log_count, lastProcessedInsertedAt, lastProcessedStartTime, lastProcessedReqId
FROM new_logs_by_bucket
WHERE new_log_count >= ?
OR min_ts <= now() - INTERVAL ? SECOND
Expand All @@ -140,7 +140,7 @@ func (bf *BatchFinder) FindBatches(ctx context.Context) ([]LogBatch, error) {
&batch.RaftSessionID,
&batch.LogCount,
&batch.LastProcessedOffset.InsertedAt,
&batch.LastProcessedOffset.Timestamp,
&batch.LastProcessedOffset.StartTime,
&batch.LastProcessedOffset.ReqID,
)
if err != nil {
Expand Down
Loading
Loading