Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 28 additions & 15 deletions .github/workflows/linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ on:
- master
- stable

permissions:
contents: read

jobs:
status_test:
name: Status plugin (Go ${{ matrix.go }}, PHP ${{ matrix.php }}, OS ${{matrix.os}})
Expand All @@ -34,6 +37,8 @@ jobs:

- name: Check out code
uses: actions/checkout@v6
with:
persist-credentials: false

- name: Get Composer Cache Directory
id: composer-cache
Expand Down Expand Up @@ -66,17 +71,22 @@ jobs:
- name: Install Go dependencies
run: go mod download

- name: Run unit tests with coverage
run: |
mkdir -p coverage-ci
go test -v -race -cover -coverpkg=./... -coverprofile=./coverage-ci/unit.out -covermode=atomic ./...

- name: Run status e2e tests with coverage
run: |
mkdir -p coverage-ci
cd tests
mkdir ./coverage-ci
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=./... -coverprofile=./coverage-ci/status.out -covermode=atomic ./...
go test -timeout 20m -v -race -cover -tags=debug -failfast -coverpkg=github.com/roadrunner-server/status/v6/... -coverprofile=../coverage-ci/e2e.out -covermode=atomic ./...

- name: Archive code coverage results
uses: actions/upload-artifact@v7
with:
name: coverage
path: ./tests/coverage-ci/
path: ./coverage-ci/

codecov:
name: Upload codecov
Expand All @@ -87,24 +97,27 @@ jobs:
steps:
- name: Check out code
uses: actions/checkout@v6
with:
persist-credentials: false
- name: Download code coverage results
uses: actions/download-artifact@v8
with:
name: coverage
path: coverage
- run: |
echo 'mode: atomic' > summary.txt
tail -q -n +2 coverage/*.out >> summary.txt
awk '
NR == 1 { print; next }
/^github\.com\/roadrunner-server\/status\/v6\// {
sub(/^github\.com\/roadrunner-server\/status\/v6\//, "", $0)
print
}
' summary.txt > summary.filtered.txt
mv summary.filtered.txt summary.txt
- name: Prepare coverage reports
run: |
for f in coverage/*.out; do
out="$(basename "${f%.out}").txt"
echo 'mode: atomic' > "$out"
awk '
/^github\.com\/roadrunner-server\/status\/v6\// {
sub(/^github\.com\/roadrunner-server\/status\/v6\//, "", $0)
print
}
' "$f" >> "$out"
done
- name: upload to codecov
uses: codecov/codecov-action@v6 # Docs: <https://github.com/codecov/codecov-action>
with:
files: summary.txt
files: unit.txt,e2e.txt
fail_ci_if_error: false
7 changes: 4 additions & 3 deletions doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
// - /jobs – returns the state of job pipelines from a plugin that
// implements the [JobsChecker] interface.
//
// During graceful shutdown the endpoints respond with 503 Service Unavailable
// (or a configurable status code) so that external load balancers can drain
// traffic before the process exits.
// During graceful shutdown /ready and /jobs respond with the configured
// unavailable status code (503 by default) so external load balancers can drain
// traffic, while /health stays 200 (liveness) so the orchestrator does not kill
// the still-draining process.
//
// An RPC service is also registered, providing Status and Ready methods for
// programmatic access from RoadRunner workers or CLI tools.
Expand Down
7 changes: 4 additions & 3 deletions handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ func (m *mockJobsChecker) Name() string { return "jobs" }

// --- Helpers ---

func newShutdownPtr(val bool) *atomic.Pointer[bool] {
var p atomic.Pointer[bool]
p.Store(&val)
func newShutdownPtr(val bool) *atomic.Bool {
var p atomic.Bool
p.Store(val)
return &p
}

Expand Down Expand Up @@ -82,6 +82,7 @@ func TestHealthHandler(t *testing.T) {
req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/health", nil)
h.ServeHTTP(rec, req)

// Liveness stays 200 during graceful shutdown (unlike /ready and /jobs).
assert.Equal(t, http.StatusOK, rec.Code)
assert.Contains(t, rec.Body.String(), "service is shutting down")
})
Expand Down
103 changes: 54 additions & 49 deletions health.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ type Health struct {
log *slog.Logger
unavailableStatusCode int
statusRegistry map[string]Checker
shutdownInitiated *atomic.Pointer[bool]
shutdownInitiated *atomic.Bool
}

func NewHealthHandler(sr map[string]Checker, shutdownInitiated *atomic.Pointer[bool], log *slog.Logger, usc int) *Health {
func NewHealthHandler(sr map[string]Checker, shutdownInitiated *atomic.Bool, log *slog.Logger, usc int) *Health {
return &Health{
statusRegistry: sr,
unavailableStatusCode: usc,
Expand All @@ -24,13 +24,16 @@ func NewHealthHandler(sr map[string]Checker, shutdownInitiated *atomic.Pointer[b
}

func (rd *Health) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if rd.shutdownInitiated != nil && *rd.shutdownInitiated.Load() {
if rd.shutdownInitiated != nil && rd.shutdownInitiated.Load() {
// Liveness stays 200 during graceful shutdown so the orchestrator does not
// kill the draining process; readiness (/ready) and /jobs return the
// configured unavailable code instead. Do NOT collapse onto unavailableStatusCode.
http.Error(w, "service is shutting down", http.StatusOK)
return
}

// report will be used either for all plugins or for the Plugins in the query
report := make([]*Report, 0, 2)
report := make([]*Report, 0, len(rd.statusRegistry))

plg := r.URL.Query()[pluginsQuery]
// if no Plugins provided, check them all
Expand Down Expand Up @@ -108,56 +111,58 @@ func (rd *Health) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}

// iterate over all provided Plugins
for i := range plg {
if svc, ok := rd.statusRegistry[plg[i]]; ok {
if svc == nil {
continue
}
for _, name := range plg {
svc, ok := rd.statusRegistry[name]
if !ok {
rd.log.Info("plugin does not support health checks", "plugin", name)
continue
}

st, err := rd.statusRegistry[plg[i]].Status()
if err != nil {
report = append(report, &Report{
PluginName: plg[i],
ErrorMessage: err.Error(),
StatusCode: http.StatusInternalServerError,
})
if svc == nil {
continue
}

continue
}
st, err := svc.Status()
if err != nil {
report = append(report, &Report{
PluginName: name,
ErrorMessage: err.Error(),
StatusCode: http.StatusInternalServerError,
})

if st == nil {
report = append(report, &Report{
PluginName: plg[i],
ErrorMessage: "plugin is not available",
StatusCode: rd.unavailableStatusCode,
})
continue
}

continue
}
if st == nil {
report = append(report, &Report{
PluginName: name,
ErrorMessage: "plugin is not available",
StatusCode: rd.unavailableStatusCode,
})

switch {
case st.Code >= 500:
// on >=500, write header, because it'll be written on Write (200)
w.WriteHeader(rd.unavailableStatusCode)
report = append(report, &Report{
PluginName: plg[i],
ErrorMessage: "internal server error, see logs",
StatusCode: rd.unavailableStatusCode,
})
case st.Code >= 100 && st.Code <= 400:
report = append(report, &Report{
PluginName: plg[i],
StatusCode: st.Code,
})
default:
report = append(report, &Report{
PluginName: plg[i],
ErrorMessage: "unexpected status code",
StatusCode: st.Code,
})
}
} else {
rd.log.Info("plugin does not support health checks", "plugin", plg[i])
continue
}

switch {
case st.Code >= 500:
// on >=500, write header, because it'll be written on Write (200)
w.WriteHeader(rd.unavailableStatusCode)
report = append(report, &Report{
PluginName: name,
ErrorMessage: "internal server error, see logs",
StatusCode: rd.unavailableStatusCode,
})
case st.Code >= 100 && st.Code <= 400:
report = append(report, &Report{
PluginName: name,
StatusCode: st.Code,
})
default:
report = append(report, &Report{
PluginName: name,
ErrorMessage: "unexpected status code",
StatusCode: st.Code,
})
}
}

Expand Down
33 changes: 16 additions & 17 deletions jobs.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package status

import (
"context"
"encoding/json"
"log/slog"
"net/http"
Expand All @@ -12,10 +11,10 @@ type Jobs struct {
statusJobsRegistry JobsChecker
unavailableStatusCode int
log *slog.Logger
shutdownInitiated *atomic.Pointer[bool]
shutdownInitiated *atomic.Bool
}

func NewJobsHandler(jc JobsChecker, shutdownInitiated *atomic.Pointer[bool], log *slog.Logger, usc int) *Jobs {
func NewJobsHandler(jc JobsChecker, shutdownInitiated *atomic.Bool, log *slog.Logger, usc int) *Jobs {
return &Jobs{
statusJobsRegistry: jc,
unavailableStatusCode: usc,
Expand All @@ -24,9 +23,9 @@ func NewJobsHandler(jc JobsChecker, shutdownInitiated *atomic.Pointer[bool], log
}
}

func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
if jb.shutdownInitiated != nil && *jb.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", http.StatusServiceUnavailable)
func (jb *Jobs) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if jb.shutdownInitiated != nil && jb.shutdownInitiated.Load() {
http.Error(w, "service is shutting down", jb.unavailableStatusCode)
return
}

Expand All @@ -35,7 +34,7 @@ func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
return
}

jobStates, err := jb.statusJobsRegistry.JobsState(context.Background())
jobStates, err := jb.statusJobsRegistry.JobsState(r.Context())
if err != nil {
jb.log.Error("jobs state", "error", err)
http.Error(w, "jobs plugin not found", jb.unavailableStatusCode)
Expand All @@ -45,17 +44,17 @@ func (jb *Jobs) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
report := make([]*JobsReport, 0, len(jobStates))

// write info about underlying drivers
for i := range jobStates {
for _, js := range jobStates {
report = append(report, &JobsReport{
Pipeline: jobStates[i].Pipeline,
Priority: jobStates[i].Priority,
Ready: jobStates[i].Ready,
Queue: jobStates[i].Queue,
Active: jobStates[i].Active,
Delayed: jobStates[i].Delayed,
Reserved: jobStates[i].Reserved,
Driver: jobStates[i].Driver,
ErrorMessage: jobStates[i].ErrorMessage,
Pipeline: js.Pipeline,
Priority: js.Priority,
Ready: js.Ready,
Queue: js.Queue,
Active: js.Active,
Delayed: js.Delayed,
Reserved: js.Reserved,
Driver: js.Driver,
ErrorMessage: js.ErrorMessage,
})
}

Expand Down
11 changes: 6 additions & 5 deletions plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ type Plugin struct {
readyRegistry map[string]Readiness
// jobs plugin checker
statusJobsRegistry JobsChecker
// shared pointer
shutdownInitiated atomic.Pointer[bool]
// true once Stop is called; checked by all HTTP handlers
shutdownInitiated atomic.Bool
server *http.Server
log *slog.Logger
cfg *Config
Expand All @@ -86,7 +86,6 @@ func (c *Plugin) Init(cfg Configurer, log Logger) error {

c.readyRegistry = make(map[string]Readiness)
c.statusRegistry = make(map[string]Checker)
c.shutdownInitiated.Store(new(false))

c.log = log.NamedLogger(PluginName)

Expand Down Expand Up @@ -131,8 +130,10 @@ func (c *Plugin) Stop(_ context.Context) error {
c.mu.Lock()
defer c.mu.Unlock()

// set shutdown to true, thus all endpoints will return 503
c.shutdownInitiated.Store(new(true))
// set shutdown to true: /ready and /jobs then return the configured unavailable
// status code, while /health (liveness) stays 200 so the orchestrator does not
// kill the draining process
c.shutdownInitiated.Store(true)

return nil
}
Expand Down
39 changes: 39 additions & 0 deletions plugin_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package status

import (
stderr "errors"
"log/slog"
"testing"

"github.com/roadrunner-server/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

// initConfigurer is a minimal Configurer for exercising Plugin.Init's early
// error returns without standing up a full container.
type initConfigurer struct {
has bool
unmarshalErr error
}

func (c *initConfigurer) Has(string) bool { return c.has }
func (c *initConfigurer) UnmarshalKey(string, any) error { return c.unmarshalErr }

type initLogger struct{}

func (initLogger) NamedLogger(string) *slog.Logger { return slog.New(slog.DiscardHandler) }

func TestPluginInit(t *testing.T) {
t.Run("disabled when config section is missing", func(t *testing.T) {
err := (&Plugin{}).Init(&initConfigurer{has: false}, initLogger{})
require.Error(t, err)
assert.True(t, errors.Is(errors.Disabled, err))
})

t.Run("disabled on unmarshal error", func(t *testing.T) {
err := (&Plugin{}).Init(&initConfigurer{has: true, unmarshalErr: stderr.New("bad config")}, initLogger{})
require.Error(t, err)
assert.True(t, errors.Is(errors.Disabled, err))
})
}
Loading
Loading