Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fed_test_utils/fed.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ func NewFedTest(t *testing.T, originConfig string) (ft *FedTest) {
require.NoError(t, param.Set(param.Cache_EnableEvictionMonitoring.GetName(), false))
require.NoError(t, param.Set(param.Cache_StorageLocation.GetName(), filepath.Join(tmpPath, "xcache-data")))
require.NoError(t, param.Set(param.Cache_DbLocation.GetName(), filepath.Join(t.TempDir(), "cache.sqlite")))
// In tests, skip the drain-wait period before XRootD restarts so tests
// don't time out waiting for PIDs to change.
require.NoError(t, param.Set(param.Xrootd_ShutdownTimeout.GetName(), 0))
Comment on lines +178 to +180
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sounds plausible to me, but will this interfere with tests outside of launchers and xrootd?

require.NoError(t, param.Set(param.Server_EnableUI.GetName(), false))
require.NoError(t, param.Set(param.Server_WebPort.GetName(), 0))
require.NoError(t, param.Set(param.Server_DbLocation.GetName(), filepath.Join(t.TempDir(), "server.sqlite")))
Expand Down
10 changes: 9 additions & 1 deletion launchers/cache_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,15 @@ func CacheServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group, m
cacheServer.SetPids(pids)

// Store restart information after PIDs are known
xrootd.StoreRestartInfo(launchers, pids, egrp, portStartCallback, true, useCMSD, privileged)
preRestartHook := func(hookCtx context.Context) {
handleGracefulShutdown(hookCtx, modules, []server_structs.XRootDServer{cacheServer})
}
postRestartHook := func(hookCtx context.Context) {
if advErr := launcher_utils.Advertise(hookCtx, []server_structs.XRootDServer{cacheServer}); advErr != nil {
log.Errorf("Failed to re-advertise cache to Director after restart: %v", advErr)
}
}
xrootd.StoreRestartInfo(ctx, launchers, pids, egrp, portStartCallback, true, useCMSD, privileged, preRestartHook, postRestartHook)

// Register callback for xrootd logging configuration changes
// This must be done after LaunchDaemons so the server has PIDs
Expand Down
10 changes: 9 additions & 1 deletion launchers/origin_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,15 @@ func OriginServe(ctx context.Context, engine *gin.Engine, egrp *errgroup.Group,
originServer.SetPids(pids)

// Store restart information after PIDs are known
xrootd.StoreRestartInfo(launchers, pids, egrp, portStartCallback, false, useCMSD, privileged)
preRestartHook := func(hookCtx context.Context) {
handleGracefulShutdown(hookCtx, modules, []server_structs.XRootDServer{originServer})
}
postRestartHook := func(hookCtx context.Context) {
if advErr := launcher_utils.Advertise(hookCtx, []server_structs.XRootDServer{originServer}); advErr != nil {
log.Errorf("Failed to re-advertise origin to Director after restart: %v", advErr)
}
}
xrootd.StoreRestartInfo(ctx, launchers, pids, egrp, portStartCallback, false, useCMSD, privileged, preRestartHook, postRestartHook)

// Register callback for xrootd logging configuration changes
// This must be done after LaunchDaemons so the server has PIDs
Expand Down
54 changes: 38 additions & 16 deletions xrootd/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@ import (
)

type restartInfo struct {
launchers []daemon.Launcher
egrp *errgroup.Group
callback func(int)
isCache bool
useCMSD bool
privileged bool
pids []int
ctx context.Context
launchers []daemon.Launcher
egrp *errgroup.Group
callback func(int)
preRestartHook func(ctx context.Context)
postRestartHook func(ctx context.Context)
isCache bool
useCMSD bool
privileged bool
pids []int
}

var (
Expand All @@ -68,15 +71,18 @@ func ResetRestartState() {

// StoreRestartInfo stores the information needed for restarting XRootD
// This should be called during initial launch after PIDs are known.
func StoreRestartInfo(launchers []daemon.Launcher, pids []int, egrp *errgroup.Group, callback func(int), cache bool, cmsd bool, priv bool) {
func StoreRestartInfo(ctx context.Context, launchers []daemon.Launcher, pids []int, egrp *errgroup.Group, callback func(int), cache bool, cmsd bool, priv bool, preRestartHook func(ctx context.Context), postRestartHook func(ctx context.Context)) {
info := restartInfo{
launchers: launchers,
egrp: egrp,
callback: callback,
isCache: cache,
useCMSD: cmsd,
privileged: priv,
pids: append([]int(nil), pids...),
ctx: ctx,
launchers: launchers,
egrp: egrp,
callback: callback,
preRestartHook: preRestartHook,
postRestartHook: postRestartHook,
isCache: cache,
useCMSD: cmsd,
privileged: priv,
pids: append([]int(nil), pids...),
}

// Replace any existing entry for the same server role; otherwise append.
Expand Down Expand Up @@ -138,6 +144,14 @@ func RestartXrootd(ctx context.Context, oldPids []int) (newPids []int, err error
return nil, errors.New("restart requested but no tracked PIDs are available")
}

// Run any pre-restart hooks (e.g., advertise shutdown to the Director and
// wait for in-flight transfers to drain) before sending signals.
for _, info := range storedInfos {
if info.preRestartHook != nil {
info.preRestartHook(info.ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the long-lived context from info, or the short-lived one that was passed to RestartXrootd?

}
}

// Step 1: Gracefully shutdown existing XRootD processes
log.Debug("Sending SIGTERM to existing XRootD processes")
for _, pid := range oldPids {
Expand Down Expand Up @@ -210,7 +224,7 @@ func RestartXrootd(ctx context.Context, oldPids []int) (newPids []int, err error
}

log.Info("Launching new XRootD daemons")
pids, launchErr := LaunchDaemons(ctx, newLaunchers, info.egrp, info.callback)
pids, launchErr := LaunchDaemons(info.ctx, newLaunchers, info.egrp, info.callback)
if launchErr != nil {
return nil, errors.Wrap(launchErr, "Failed to launch XRootD daemons")
}
Expand All @@ -231,6 +245,14 @@ func RestartXrootd(ctx context.Context, oldPids []int) (newPids []int, err error

metrics.SetComponentHealthStatus(metrics.OriginCache_XRootD, metrics.StatusOK, "XRootD restart complete")

// Run any post-restart hooks (e.g., re-advertise the server to the Director so
// clients can resume routing requests to this server immediately).
for _, info := range updatedInfos {
if info.postRestartHook != nil {
info.postRestartHook(info.ctx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want the long-lived context from info, or the short-lived one that was passed to RestartXrootd?

}
}

log.Infof("XRootD restart complete with new PIDs: %v", newPids)
return newPids, nil
}
Expand Down
10 changes: 5 additions & 5 deletions xrootd/restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func TestStoreRestartInfo(t *testing.T) {
egrp := &errgroup.Group{}
callback := func(port int) {}

StoreRestartInfo(launchers, nil, egrp, callback, true, false, true)
StoreRestartInfo(launchers, nil, egrp, callback, false, true, false)
StoreRestartInfo(context.Background(), launchers, nil, egrp, callback, true, false, true, nil, nil)
StoreRestartInfo(context.Background(), launchers, nil, egrp, callback, false, true, false, nil, nil)

require.Len(t, restartInfos, 2)

Expand Down Expand Up @@ -76,10 +76,10 @@ func TestStoreRestartInfoReplacesByRole(t *testing.T) {
var launchers []daemon.Launcher
egrp := &errgroup.Group{}

StoreRestartInfo(launchers, nil, egrp, func(int) {}, true, false, false)
StoreRestartInfo(context.Background(), launchers, nil, egrp, func(int) {}, true, false, false, nil, nil)
require.Len(t, restartInfos, 1)

StoreRestartInfo(launchers, nil, egrp, func(int) {}, true, true, true)
StoreRestartInfo(context.Background(), launchers, nil, egrp, func(int) {}, true, true, true, nil, nil)

require.Len(t, restartInfos, 1)
assert.True(t, restartInfos[0].useCMSD)
Expand All @@ -96,7 +96,7 @@ func TestRestartXrootd_NoProcesses(t *testing.T) {
var launchers []daemon.Launcher
egrp := &errgroup.Group{}
callback := func(int) {}
StoreRestartInfo(launchers, []int{999999, 999998}, egrp, callback, false, false, false)
StoreRestartInfo(context.Background(), launchers, []int{999999, 999998}, egrp, callback, false, false, false, nil, nil)

// Try to restart with empty PID list - should fail since there's no xrootd config
_, err := RestartXrootd(ctx, []int{})
Expand Down
2 changes: 1 addition & 1 deletion xrootd/restart_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

// StoreRestartInfo stores the information needed for restarting XRootD
// Windows stub - restart not implemented on Windows
func StoreRestartInfo(launchers []daemon.Launcher, pids []int, egrp *errgroup.Group, callback func(int), cache bool, cmsd bool, priv bool) {
func StoreRestartInfo(ctx context.Context, launchers []daemon.Launcher, pids []int, egrp *errgroup.Group, callback func(int), cache bool, cmsd bool, priv bool, preRestartHook func(ctx context.Context), postRestartHook func(ctx context.Context)) {
// No-op on Windows
}

Expand Down
Loading