Skip to content

Commit db46c1b

Browse files
authored
Ensure that when a task is evicted, its deps also evicts it from its callers (#644)
This PR adds a fix to `EvictWithCleanup` to ensure that when a task is evicted, its dependencies also deletes the task from its callers, otherwise tasks could be holding onto stale references in its callers list to tasks that have been evicted otherwise. This PR also adds a debug configuration for `EvictWithCleanup` to force GC and log keys for keys that should have been evicted when built in debug mode. `task.parents` has also been renamed to `task.callers` for clarity.
1 parent 7bcf0a6 commit db46c1b

File tree

2 files changed

+56
-9
lines changed

2 files changed

+56
-9
lines changed

experimental/incremental/executor.go

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,13 @@ import (
2121
"slices"
2222
"sync"
2323
"sync/atomic"
24+
"time"
25+
"weak"
2426

2527
"golang.org/x/sync/semaphore"
2628

2729
"github.com/bufbuild/protocompile/experimental/report"
30+
"github.com/bufbuild/protocompile/internal"
2831
)
2932

3033
// Executor is a caching executor for incremental queries.
@@ -42,6 +45,10 @@ type Executor struct {
4245

4346
sema *semaphore.Weighted
4447

48+
// The [time.Duration] to wait before running the GC when debug mode is on. See docs for
49+
// [WithDebugEvict].
50+
evictGCDeadline time.Duration
51+
4552
counter atomic.Uint64 // Used for generating sequence IDs for Result.Unchanged.
4653
}
4754

@@ -73,6 +80,17 @@ func WithReportOptions(options report.Options) ExecutorOption {
7380
return func(e *Executor) { e.reportOptions = options }
7481
}
7582

83+
// WithDebugEvict takes a [time.Duration] and configures debug mode for evictions in the
84+
// executor.
85+
//
86+
// If set and the compiler is built with the debug tag, when [Executor.EvictWithCleanup]
87+
// is called, all evicted keys will be tracked. Then after eviction, a goroutine will be
88+
// kicked off to sleep for the configured duration, force a GC run, and then print out all
89+
// pointers that should be evicted but have not been GC'd.
90+
func WithDebugEvict(wait time.Duration) ExecutorOption {
91+
return func(e *Executor) { e.evictGCDeadline = wait }
92+
}
93+
7694
// Keys returns a snapshot of the keys of which queries are present (and
7795
// memoized) in an Executor.
7896
//
@@ -208,17 +226,46 @@ func (e *Executor) EvictWithCleanup(keys []any, cleanup func()) {
208226

209227
e.dirty.Lock()
210228
defer e.dirty.Unlock()
229+
230+
var evicted []weak.Pointer[task]
231+
logEvictionDebug := internal.Debug && e.evictGCDeadline > 0
211232
for n := len(tasks); n > 0; n = len(tasks) {
212233
next := tasks[n-1]
213234
tasks = tasks[:n-1]
214235

215-
next.parents.Range(func(k, _ any) bool {
236+
for k := range next.callers.Range {
216237
tasks = append(tasks, k.(*task)) //nolint:errcheck
217-
return true
218-
})
238+
}
219239

220240
// Remove the task from the map. Syncronized by the dirty lock.
221-
e.tasks.Delete(next.query.Key())
241+
t, _ := e.tasks.LoadAndDelete(next.query.Key())
242+
if logEvictionDebug {
243+
evicted = append(evicted, weak.Make(t.(*task))) //nolint:errcheck
244+
}
245+
246+
// Remove the task from the callers of its deps.
247+
for k := range next.deps.Range {
248+
k.(*task).callers.Delete(next) //nolint:errcheck
249+
}
250+
}
251+
252+
if evicted != nil {
253+
go func() {
254+
time.Sleep(e.evictGCDeadline)
255+
runtime.GC()
256+
evicted = slices.DeleteFunc(evicted, func(e weak.Pointer[task]) bool {
257+
return e.Value() == nil
258+
})
259+
for _, e := range evicted {
260+
internal.DebugLog(
261+
[]any{"exec %p", e},
262+
"EvictWithCleanup",
263+
"failed to GC evicted task %p: %v",
264+
e.Value(),
265+
e.Value().query.Key(),
266+
)
267+
}
268+
}()
222269
}
223270

224271
if cleanup != nil {

experimental/incremental/task.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -222,12 +222,12 @@ func Resolve[T any](caller *Task, queries ...Query[T]) (results []Result[T], exp
222222
deps[i] = dep
223223

224224
// Update dependency graph.
225-
parent := caller.task
226-
if parent == nil {
225+
callerTask := caller.task
226+
if callerTask == nil {
227227
continue // Root.
228228
}
229-
parent.deps.Store(dep, struct{}{})
230-
dep.parents.Store(parent, struct{}{})
229+
callerTask.deps.Store(dep, struct{}{})
230+
dep.callers.Store(callerTask, struct{}{})
231231
}
232232

233233
// Schedule all but the first query to run asynchronously.
@@ -294,7 +294,7 @@ type task struct {
294294
// Inverse of deps. Contains all tasks that directly depend on this task.
295295
// Written by multiple tasks concurrently.
296296
// TODO: See the comment on Executor.tasks.
297-
parents sync.Map // [*task]struct{}
297+
callers sync.Map // [*task]struct{}
298298

299299
// If this task has not been started yet, this is nil.
300300
// Otherwise, if it is complete, result.done will be closed.

0 commit comments

Comments
 (0)