Skip to content

Commit 911431c

Browse files
authored
scheduler: add component shutdown timeout of 10m (#4577)
* scheduler: add component shutdown timeout of 10m * Expose a flag to change deadline
1 parent d9a0686 commit 911431c

File tree

6 files changed

+138
-31
lines changed

6 files changed

+138
-31
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,19 @@ Main (unreleased)
4848

4949
- Fix direction of arrows for pyroscope components in UI graph. (@dehaansa)
5050

51+
- Fix an issue where component shutdown could block indefinitely by adding a warning log message and a deadline of 10 minutes. The deadline can be configured with the `--feature.component-shutdown-deadline` flag if the default is not suitable. (@thampiotr)
52+
5153
v1.11.1
5254
-----------------
5355

56+
### Bugfixes
57+
5458
- Fix potential deadlock in `loki.source.journal` when stopping or reloading the component. (@thampiotr)
5559

5660
- Honor sync timeout when waiting for network availability for prometheus.operator.* components. (@dehaansa)
5761

5862
- Fix `prometheus.exporter.cloudwatch` to not always emit debug logs but respect debug property. (@kalleep)
5963

60-
### Bugfixes
6164

6265
v1.11.0
6366
-----------------

docs/sources/reference/cli/run.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ The following flags are supported:
6565
* `--config.extra-args`: Extra arguments from the original format used by the converter.
6666
* `--stability.level`: The minimum permitted stability level of functionality. Supported values: `experimental`, `public-preview`, and `generally-available` (default `"generally-available"`).
6767
* `--feature.community-components.enabled`: Enable community components (default `false`).
68+
* `--feature.component-shutdown-deadline`: Maximum duration to wait for a component to shut down before giving up and logging an error (default `"10m"`).
6869
* `--windows.priority`: The priority to set for the {{< param "PRODUCT_NAME" >}} process when running on Windows. This is only available on Windows. Supported values: `above_normal`, `below_normal`, `normal`, `high`, `idle`, or `realtime` (default `"normal"`).
6970

7071
{{< admonition type="note" >}}

internal/alloycli/cmd_run.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ func runCommand() *cobra.Command {
6868
clusterRejoinInterval: 60 * time.Second,
6969
disableSupportBundle: false,
7070
windowsPriority: windowspriority.PriorityNormal,
71+
taskShutdownDeadline: 10 * time.Minute,
7172
}
7273

7374
cmd := &cobra.Command{
@@ -166,6 +167,7 @@ depending on the nature of the reload error.
166167
if runtime.GOOS == "windows" {
167168
cmd.Flags().StringVar(&r.windowsPriority, "windows.priority", r.windowsPriority, fmt.Sprintf("Process priority to use when running on windows. This flag is currently in public preview. Supported values: %s", strings.Join(slices.Collect(windowspriority.PriorityValues()), ", ")))
168169
}
170+
cmd.Flags().DurationVar(&r.taskShutdownDeadline, "feature.component-shutdown-deadline", r.taskShutdownDeadline, "Maximum duration to wait for a component to shut down before giving up and logging an error")
169171

170172
addDeprecatedFlags(cmd)
171173
return cmd
@@ -201,6 +203,7 @@ type alloyRun struct {
201203
enableCommunityComps bool
202204
disableSupportBundle bool
203205
windowsPriority string
206+
taskShutdownDeadline time.Duration
204207
}
205208

206209
func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error {
@@ -384,6 +387,7 @@ func (fr *alloyRun) Run(cmd *cobra.Command, configPath string) error {
384387
remoteCfgService,
385388
uiService,
386389
},
390+
TaskShutdownDeadline: fr.taskShutdownDeadline,
387391
})
388392

389393
ready = f.Ready

internal/runtime/alloy.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,9 @@ type Options struct {
114114

115115
// EnableCommunityComps enables the use of community components.
116116
EnableCommunityComps bool
117+
118+
// TaskShutdownDeadline is the maximum duration to wait for a component to shut down before giving up and logging an error.
119+
TaskShutdownDeadline time.Duration
117120
}
118121

119122
// Runtime is the Alloy system.
@@ -137,10 +140,11 @@ type Runtime struct {
137140
// New creates a new, unstarted Alloy controller. Call Run to run the controller.
138141
func New(o Options) *Runtime {
139142
return newController(controllerOptions{
140-
Options: o,
141-
ModuleRegistry: newModuleRegistry(),
142-
IsModule: false, // We are creating a new root controller.
143-
WorkerPool: worker.NewDefaultWorkerPool(),
143+
Options: o,
144+
ModuleRegistry: newModuleRegistry(),
145+
IsModule: false, // We are creating a new root controller.
146+
WorkerPool: worker.NewDefaultWorkerPool(),
147+
TaskShutdownDeadline: o.TaskShutdownDeadline,
144148
})
145149
}
146150

@@ -154,6 +158,8 @@ type controllerOptions struct {
154158
IsModule bool // Whether this controller is for a module.
155159
// A worker pool to evaluate components asynchronously. A default one will be created if this is nil.
156160
WorkerPool worker.Pool
161+
// TaskShutdownDeadline is the maximum duration to wait for a component to shut down before giving up and logging an error.
162+
TaskShutdownDeadline time.Duration
157163
}
158164

159165
// newController creates a new, unstarted Alloy controller with a specific
@@ -186,7 +192,7 @@ func newController(o controllerOptions) *Runtime {
186192
opts: o,
187193

188194
updateQueue: controller.NewQueue(),
189-
sched: controller.NewScheduler(log),
195+
sched: controller.NewScheduler(log, o.TaskShutdownDeadline),
190196

191197
modules: o.ModuleRegistry,
192198

internal/runtime/internal/controller/scheduler.go

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,19 @@ import (
44
"context"
55
"fmt"
66
"sync"
7+
"time"
78

89
"github.com/go-kit/log"
910

1011
"github.com/grafana/alloy/internal/runtime/logging/level"
1112
)
1213

14+
var (
15+
// TaskShutdownWarningTimeout is the duration after which a warning is logged
16+
// when a task is taking too long to shut down.
17+
TaskShutdownWarningTimeout = time.Minute
18+
)
19+
1320
// RunnableNode is any BlockNode which can also be run.
1421
type RunnableNode interface {
1522
BlockNode
@@ -18,10 +25,11 @@ type RunnableNode interface {
1825

1926
// Scheduler runs components.
2027
type Scheduler struct {
21-
ctx context.Context
22-
cancel context.CancelFunc
23-
running sync.WaitGroup
24-
logger log.Logger
28+
ctx context.Context
29+
cancel context.CancelFunc
30+
running sync.WaitGroup
31+
logger log.Logger
32+
taskShutdownDeadline time.Duration
2533

2634
tasksMut sync.Mutex
2735
tasks map[string]*task
@@ -31,12 +39,13 @@ type Scheduler struct {
3139
// components which are running.
3240
//
3341
// Call Close to stop the Scheduler and all running components.
34-
func NewScheduler(logger log.Logger) *Scheduler {
42+
func NewScheduler(logger log.Logger, taskShutdownDeadline time.Duration) *Scheduler {
3543
ctx, cancel := context.WithCancel(context.Background())
3644
return &Scheduler{
37-
ctx: ctx,
38-
cancel: cancel,
39-
logger: logger,
45+
ctx: ctx,
46+
cancel: cancel,
47+
logger: logger,
48+
taskShutdownDeadline: taskShutdownDeadline,
4049

4150
tasks: make(map[string]*task),
4251
}
@@ -52,7 +61,6 @@ func NewScheduler(logger log.Logger) *Scheduler {
5261
// call to Synchronize.
5362
func (s *Scheduler) Synchronize(rr []RunnableNode) error {
5463
s.tasksMut.Lock()
55-
defer s.tasksMut.Unlock()
5664

5765
if s.ctx.Err() != nil {
5866
return fmt.Errorf("Scheduler is closed")
@@ -89,9 +97,9 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
8997
)
9098

9199
opts := taskOptions{
92-
Context: s.ctx,
93-
Runnable: newRunnable,
94-
OnDone: func(err error) {
100+
context: s.ctx,
101+
runnable: newRunnable,
102+
onDone: func(err error) {
95103
defer s.running.Done()
96104

97105
if err != nil {
@@ -104,12 +112,16 @@ func (s *Scheduler) Synchronize(rr []RunnableNode) error {
104112
defer s.tasksMut.Unlock()
105113
delete(s.tasks, nodeID)
106114
},
115+
logger: log.With(s.logger, "taskID", nodeID),
116+
taskShutdownDeadline: s.taskShutdownDeadline,
107117
}
108118

109119
s.running.Add(1)
110120
s.tasks[nodeID] = newTask(opts)
111121
}
112122

123+
// Unlock the tasks mutex so that Stop calls can complete.
124+
s.tasksMut.Unlock()
113125
// Wait for all stopping runnables to exit.
114126
stopping.Wait()
115127
return nil
@@ -125,36 +137,64 @@ func (s *Scheduler) Close() error {
125137

126138
// task is a scheduled runnable.
127139
type task struct {
128-
ctx context.Context
129-
cancel context.CancelFunc
130-
exited chan struct{}
140+
ctx context.Context
141+
cancel context.CancelFunc
142+
exited chan struct{}
143+
opts taskOptions
144+
doneOnce sync.Once
131145
}
132146

133147
type taskOptions struct {
134-
Context context.Context
135-
Runnable RunnableNode
136-
OnDone func(error)
148+
context context.Context
149+
runnable RunnableNode
150+
onDone func(error)
151+
logger log.Logger
152+
taskShutdownDeadline time.Duration
137153
}
138154

139155
// newTask creates and starts a new task.
140156
func newTask(opts taskOptions) *task {
141-
ctx, cancel := context.WithCancel(opts.Context)
157+
ctx, cancel := context.WithCancel(opts.context)
142158

143159
t := &task{
144160
ctx: ctx,
145161
cancel: cancel,
146162
exited: make(chan struct{}),
163+
opts: opts,
147164
}
148165

149166
go func() {
150-
err := opts.Runnable.Run(t.ctx)
167+
err := opts.runnable.Run(t.ctx)
151168
close(t.exited)
152-
opts.OnDone(err)
169+
t.doneOnce.Do(func() {
170+
t.opts.onDone(err)
171+
})
153172
}()
154173
return t
155174
}
156175

157176
func (t *task) Stop() {
158177
t.cancel()
159-
<-t.exited
178+
179+
deadlineDuration := t.opts.taskShutdownDeadline
180+
if deadlineDuration == 0 {
181+
deadlineDuration = time.Hour * 24 * 365 * 100 // infinite timeout ~= 100 years
182+
}
183+
184+
deadlineCtx, deadlineCancel := context.WithTimeout(context.Background(), deadlineDuration)
185+
defer deadlineCancel()
186+
187+
for {
188+
select {
189+
case <-t.exited:
190+
return // Task exited normally.
191+
case <-time.After(TaskShutdownWarningTimeout):
192+
level.Warn(t.opts.logger).Log("msg", "task shutdown is taking longer than expected")
193+
case <-deadlineCtx.Done():
194+
t.doneOnce.Do(func() {
195+
t.opts.onDone(fmt.Errorf("task shutdown deadline exceeded"))
196+
})
197+
return // Task took too long to exit, don't wait.
198+
}
199+
}
160200
}

internal/runtime/internal/controller/scheduler_test.go

Lines changed: 56 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
package controller_test
22

33
import (
4+
"bytes"
45
"context"
56
"os"
67
"sync"
78
"testing"
9+
"time"
810

911
"github.com/go-kit/log"
1012
"github.com/stretchr/testify/require"
@@ -30,7 +32,7 @@ func TestScheduler_Synchronize(t *testing.T) {
3032
return nil
3133
}
3234

33-
sched := controller.NewScheduler(logger)
35+
sched := controller.NewScheduler(logger, 1*time.Minute)
3436
sched.Synchronize([]controller.RunnableNode{
3537
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
3638
fakeRunnable{ID: "component-b", Component: mockComponent{RunFunc: runFunc}},
@@ -52,7 +54,7 @@ func TestScheduler_Synchronize(t *testing.T) {
5254
return nil
5355
}
5456

55-
sched := controller.NewScheduler(logger)
57+
sched := controller.NewScheduler(logger, 1*time.Minute)
5658

5759
for i := 0; i < 10; i++ {
5860
// If a new runnable is created, runFunc will panic since the WaitGroup
@@ -78,7 +80,7 @@ func TestScheduler_Synchronize(t *testing.T) {
7880
return nil
7981
}
8082

81-
sched := controller.NewScheduler(logger)
83+
sched := controller.NewScheduler(logger, 1*time.Minute)
8284

8385
sched.Synchronize([]controller.RunnableNode{
8486
fakeRunnable{ID: "component-a", Component: mockComponent{RunFunc: runFunc}},
@@ -114,3 +116,54 @@ var _ component.Component = (*mockComponent)(nil)
114116

115117
func (mc mockComponent) Run(ctx context.Context) error { return mc.RunFunc(ctx) }
116118
func (mc mockComponent) Update(newConfig component.Arguments) error { return mc.UpdateFunc(newConfig) }
119+
120+
func TestScheduler_TaskTimeoutLogging(t *testing.T) {
121+
// Temporarily modify timeout values for testing
122+
originalWarningTimeout := controller.TaskShutdownWarningTimeout
123+
controller.TaskShutdownWarningTimeout = 50 * time.Millisecond
124+
defer func() {
125+
controller.TaskShutdownWarningTimeout = originalWarningTimeout
126+
}()
127+
128+
// Create a buffer to capture log output
129+
var logBuffer bytes.Buffer
130+
logger := log.NewLogfmtLogger(&logBuffer)
131+
132+
var started sync.WaitGroup
133+
started.Add(1)
134+
135+
// Create a component that will block and not respond to context cancellation
136+
runFunc := func(ctx context.Context) error {
137+
started.Done()
138+
// Block indefinitely, ignoring context cancellation
139+
// Use a long sleep to simulate a component that doesn't respond to cancellation
140+
time.Sleep(1 * time.Second)
141+
return nil
142+
}
143+
144+
sched := controller.NewScheduler(logger, 150*time.Millisecond)
145+
146+
// Start a component
147+
err := sched.Synchronize([]controller.RunnableNode{
148+
fakeRunnable{ID: "blocking-component", Component: mockComponent{RunFunc: runFunc}},
149+
})
150+
require.NoError(t, err)
151+
started.Wait()
152+
153+
// Remove the component, which should trigger the timeout behavior. This will block until the component exits.
154+
err = sched.Synchronize([]controller.RunnableNode{})
155+
require.NoError(t, err)
156+
157+
logOutput := logBuffer.String()
158+
t.Logf("actual log output:\n%s", logOutput)
159+
160+
// Should contain warning message
161+
require.Contains(t, logOutput, "task shutdown is taking longer than expected")
162+
require.Contains(t, logOutput, "level=warn")
163+
164+
// Should contain error message
165+
require.Contains(t, logOutput, "task shutdown deadline exceeded")
166+
require.Contains(t, logOutput, "level=error")
167+
168+
require.NoError(t, sched.Close())
169+
}

0 commit comments

Comments
 (0)