Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/collection/timingwheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ func (tw *TimingWheel) moveTask(task baseEntry) {
}

timer := val.(*positionEntry)
// FIX: Delete timer if delay is too short (executes immediately)
if task.delay < tw.interval {
timer.item.removed = true // 标记删除
tw.timers.Del(task.key) // 删除映射
threading.GoSafe(func() {
tw.execute(timer.item.key, timer.item.value)
})
Expand Down
27 changes: 27 additions & 0 deletions core/collection/timingwheel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,33 @@ func TestTimingWheel_RunTasksRaceCondition(t *testing.T) {
}
}

func TestTimingWheel_MoveTimerImmediateExecution(t *testing.T) {
var executionCount int32
ticker := timex.NewFakeTicker()
tw, _ := NewTimingWheelWithTicker(testStep, 10, func(k, v any) {
atomic.AddInt32(&executionCount, 1)
assert.Equal(t, "test_key", k)
assert.Equal(t, "test_value", v)
}, ticker)
defer tw.Stop()

// Set initial task
tw.SetTimer("test_key", "test_value", testStep*5)

// Move to short delay (less than interval), should execute immediately
tw.MoveTimer("test_key", testStep>>1)

// Wait for a while to ensure no extra execution
time.Sleep(waitTime)

// Verify executed only once
assert.Equal(t, int32(1), atomic.LoadInt32(&executionCount))

// Verify the map has been cleaned up
_, exists := tw.timers.Get("test_key")
assert.False(t, exists, "The map should not retain entries for executed tasks")
}

func BenchmarkTimingWheel(b *testing.B) {
b.ReportAllocs()

Expand Down
10 changes: 9 additions & 1 deletion core/mr/mapreduce.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,14 @@ type onceChan struct {

func (oc *onceChan) write(val any) {
if atomic.CompareAndSwapInt32(&oc.wrote, 0, 1) {
oc.channel <- val
// Use select with default to avoid blocking if channel is full
// This prevents deadlock when panic occurs after reducer has written output
select {
case oc.channel <- val:
// Successfully sent panic info
default:
// Channel is full or has no receiver, drop the panic info
// This is acceptable as we've already recorded the panic in wrote flag
}
}
}
20 changes: 19 additions & 1 deletion core/mr/mapreduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

var errDummy = errors.New("dummy")


func TestFinish(t *testing.T) {
defer goleak.VerifyNone(t)

Expand Down Expand Up @@ -467,6 +466,25 @@ func TestMapReducePanic(t *testing.T) {
})
}

func TestMapReduceAfterWritePanic(t *testing.T) {
defer goleak.VerifyNone(t)

assert.Panics(t, func() {
_, _ = MapReduce(func(source chan<- int) {
source <- 0
source <- 1
}, func(i int, writer Writer[int], cancel func(error)) {
writer.Write(i)
}, func(pipe <-chan int, writer Writer[int], cancel func(error)) {
for v := range pipe {
writer.Write(v)
}
// Panic after writing output - this should not cause deadlock
panic("reducer panic after write")
})
})
}

func TestMapReducePanicOnce(t *testing.T) {
defer goleak.VerifyNone(t)

Expand Down