Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,19 @@ type partitionState struct {
topic string
partition int32

offset int64
jobBucket time.Time
// nextJobStartOffset is the start offset of the next job that will be
// planned when a new jobBucket is entered.
nextJobStartOffset int64
jobBucket time.Time

// committed is the locally known committed offset for this partition.
committed *advancingOffset
planned *advancingOffset
// planned is the locally known planned offset for this partition. It is the
// highest offset through which jobs have been planned for this partition.
planned *advancingOffset

// pendingJobs are jobs that are waiting to be enqueued. The job creation policy is what allows them to advance to the plannedJobs list.
// pendingJobs are jobs that are waiting to be enqueued. The job creation
// policy is what allows them to advance to the plannedJobs list.
pendingJobs *list.List
// plannedJobs are jobs that are either ready to be assigned, in-progress, or completed.
plannedJobs *list.List
Expand All @@ -259,7 +265,7 @@ func (s *partitionState) updateEndOffset(end int64, ts time.Time, jobSize time.D
newJobBucket := ts.Truncate(jobSize)

if s.jobBucket.IsZero() {
s.offset = end
s.nextJobStartOffset = s.planned.offset()
s.jobBucket = newJobBucket
return nil, nil
}
Expand All @@ -268,23 +274,23 @@ func (s *partitionState) updateEndOffset(end int64, ts time.Time, jobSize time.D
case bucketBefore:
// New bucket is before our current one. This should only happen if our
// Kafka's end offsets aren't monotonically increasing.
return nil, fmt.Errorf("time went backwards: %s < %s (%d, %d)", newJobBucket, s.jobBucket, s.offset, end)
return nil, fmt.Errorf("time went backwards: %s < %s (%d, %d)", newJobBucket, s.jobBucket, s.nextJobStartOffset, end)
case bucketSame:
// Observation is in the currently tracked bucket. No action needed.
case bucketAfter:
// We've entered a new job bucket. Emit a job for the current
// bucket if it has data and start a new one.

var job *schedulerpb.JobSpec
if s.offset < end {
if s.nextJobStartOffset < end {
job = &schedulerpb.JobSpec{
Topic: s.topic,
Partition: s.partition,
StartOffset: s.offset,
StartOffset: s.nextJobStartOffset,
EndOffset: end,
}
}
s.offset = end
s.nextJobStartOffset = end
s.jobBucket = newJobBucket
return job, nil
}
Expand Down
49 changes: 32 additions & 17 deletions pkg/blockbuilder/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1572,14 +1572,15 @@ func TestStartupToRegularModeJobProduction(t *testing.T) {
}

type testCase struct {
name string
initialStart int64
initialResume int64
initialEnd int64
initialTime time.Time
offsets []*offsetTime
futureObservations []endOffsetObservation
expectedFinalEnd int64
name string
partitionAbsentInitially bool
initialStart int64
initialResume int64
initialEnd int64
initialTime time.Time
offsets []*offsetTime
futureObservations []endOffsetObservation
expectedFinalEnd int64
}

tests := [...]testCase{
Expand Down Expand Up @@ -1653,6 +1654,16 @@ func TestStartupToRegularModeJobProduction(t *testing.T) {
},
expectedFinalEnd: 700,
},
{
name: "partition absent at startup",
partitionAbsentInitially: true,
initialTime: time.Date(2025, 3, 1, 9, 0, 0, 0, time.UTC),
futureObservations: []endOffsetObservation{
{offset: 600, timestamp: time.Date(2025, 3, 1, 11, 0, 0, 0, time.UTC)},
{offset: 700, timestamp: time.Date(2025, 3, 1, 12, 0, 0, 0, time.UTC)},
},
expectedFinalEnd: 700,
},
}

for _, tt := range tests {
Expand All @@ -1668,14 +1679,18 @@ func TestStartupToRegularModeJobProduction(t *testing.T) {
distinctTimes: make(map[time.Time]struct{}),
}

consumeOffs := []partitionOffsets{
{
topic: "topic",
partition: 0,
start: tt.initialStart,
resume: tt.initialResume,
end: tt.initialEnd,
},
var consumeOffs []partitionOffsets

if !tt.partitionAbsentInitially {
consumeOffs = []partitionOffsets{
{
topic: "topic",
partition: 0,
start: tt.initialStart,
resume: tt.initialResume,
end: tt.initialEnd,
},
}
}

// Call populateInitialJobs to set up initial state
Expand All @@ -1702,7 +1717,7 @@ func TestStartupToRegularModeJobProduction(t *testing.T) {
return
}

// Verify first job starts at or after resume offset
// Verify first job starts at resume offset
require.Equal(t, tt.initialResume, collectedJobs[0].StartOffset,
"first job should start at resume offset")

Expand Down
Loading