Skip to content

Commit c8edb77

Browse files
authored
Merge pull request #2867 from PelicanPlatform/copilot/fix-director-tests-ttl-cache
Fix director tests to fetch from TTL cache and respect downtime
2 parents 41963f5 + d48eb9b commit c8edb77

File tree

3 files changed

+92
-43
lines changed

3 files changed

+92
-43
lines changed

director/cache_ads.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -384,15 +384,15 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
384384
Status: HealthStatusInit,
385385
}
386386
errgrp.Go(func() error {
387-
LaunchPeriodicDirectorTest(cancelCtx, sAd)
387+
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
388388
return nil
389389
})
390390
log.Debugf("New director test suite issued for %s %s. Errgroup was evicted", string(ad.Type), ad.URL.String())
391391
} else {
392392
// Existing errorgroup still working
393393
cancelCtx, cancel := context.WithCancel(existingUtil.ErrGrpContext)
394394
started := existingUtil.ErrGrp.TryGo(func() error {
395-
LaunchPeriodicDirectorTest(cancelCtx, sAd)
395+
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
396396
return nil
397397
})
398398
if !started {
@@ -422,7 +422,7 @@ func recordAd(ctx context.Context, sAd server_structs.ServerAd, namespaceAds *[]
422422
Status: HealthStatusInit,
423423
}
424424
errgrp.Go(func() error {
425-
LaunchPeriodicDirectorTest(cancelCtx, sAd)
425+
LaunchPeriodicDirectorTest(cancelCtx, ad.URL.String())
426426
return nil
427427
})
428428
}
@@ -546,6 +546,16 @@ func applyServerDowntimes(serverName string, downtimes []server_structs.Downtime
546546
}
547547
}
548548

549+
// isServerInDowntime checks if a server is in the filteredServers map with an active filter.
550+
// A server is considered in downtime if it exists in filteredServers with any filter type except tempAllowed.
551+
func isServerInDowntime(serverName string) bool {
552+
filteredServersMutex.RLock()
553+
defer filteredServersMutex.RUnlock()
554+
555+
existingFilterType, isServerFiltered := filteredServers[serverName]
556+
return isServerFiltered && existingFilterType != tempAllowed
557+
}
558+
549559
// applyActiveDowntimeFilter checks federationDowntimes for any active downtime for the given server
550560
// and applies the tempFiltered filter immediately if found. This ensures that when a server wakes up
551561
// mid-downtime, it is blocked right away without waiting for the next registry poll.

director/monitor.go

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"net/url"
2929
"time"
3030

31+
"github.com/jellydator/ttlcache/v3"
3132
"github.com/pkg/errors"
3233
"github.com/prometheus/client_golang/prometheus"
3334
log "github.com/sirupsen/logrus"
@@ -124,9 +125,21 @@ func reportStatusToServer(ctx context.Context, serverWebUrl string, status strin
124125
return nil
125126
}
126127

127-
// Run a periodic test file transfer against an origin to ensure
128-
// it's talking to the director
129-
func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.ServerAd) {
128+
// LaunchPeriodicDirectorTest runs periodic test file transfers against an origin or cache to ensure
129+
// it's responding to director test requests. The test fetches the current server ad
130+
// from the TTL cache on each cycle and stops when the ad is no longer present.
131+
func LaunchPeriodicDirectorTest(ctx context.Context, serverUrlStr string) {
132+
// Option to disable touch on hit when fetching from cache to avoid extending TTL
133+
disableTouchOpt := ttlcache.WithDisableTouchOnHit[string, *server_structs.Advertisement]()
134+
135+
// Fetch the initial server ad to set up metrics
136+
initialAdItem := serverAds.Get(serverUrlStr, disableTouchOpt)
137+
if initialAdItem == nil {
138+
log.Errorf("Failed to start director test suite: server ad not found in cache for URL %s. Test will not be started.", serverUrlStr)
139+
return
140+
}
141+
initialAd := initialAdItem.Value()
142+
serverAd := initialAd.ServerAd
130143
serverName := serverAd.Name
131144
serverUrl := serverAd.URL.String()
132145
serverWebUrl := serverAd.WebURL.String()
@@ -158,15 +171,30 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
158171
// runDirectorTestCycle executes a single director test cycle and reports the result back to the server.
159172
// Extracted as a helper to allow running the first test immediately upon registration, avoiding the
160173
// race condition where the origin/cache 30s timeout fires before the first ticker-driven test.
161-
runDirectorTestCycle := func() {
162-
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", serverAd.Type, serverName, serverUrl))
174+
// Returns true if the test was run, false if it was skipped (e.g., server not in cache or in downtime).
175+
runDirectorTestCycle := func() bool {
176+
// Fetch the current server ad from the TTL cache
177+
adItem := serverAds.Get(serverUrlStr, disableTouchOpt)
178+
if adItem == nil {
179+
log.Infof("The Director doesn't have any advertisements for server with URL %s. Stopping director tests.", serverUrlStr)
180+
return false
181+
}
182+
currentServerAd := adItem.Value().ServerAd
183+
184+
// Check if the server is in downtime by checking the filteredServers map
185+
if isServerInDowntime(currentServerAd.Name) {
186+
log.Debugf("Skipping director test cycle for %s server %s: server is in downtime", currentServerAd.Type, currentServerAd.Name)
187+
return true // Return true to continue the loop, but don't run the test
188+
}
189+
190+
log.Debug(fmt.Sprintf("Starting a director test cycle for %s server %s at %s", currentServerAd.Type, currentServerAd.Name, currentServerAd.URL.String()))
163191
testSucceeded := true
164192
var testErr error
165-
if serverAd.Type == server_structs.OriginType.String() {
193+
if currentServerAd.Type == server_structs.OriginType.String() {
166194
fileTests := server_utils.TestFileTransferImpl{}
167-
testSucceeded, testErr = fileTests.RunTests(ctx, serverUrl, serverUrl, "", server_utils.DirectorTest)
168-
} else if serverAd.Type == server_structs.CacheType.String() {
169-
testErr = runCacheTest(ctx, serverAd.URL)
195+
testSucceeded, testErr = fileTests.RunTests(ctx, currentServerAd.URL.String(), currentServerAd.URL.String(), "", server_utils.DirectorTest)
196+
} else if currentServerAd.Type == server_structs.CacheType.String() {
197+
testErr = runCacheTest(ctx, currentServerAd.URL)
170198
}
171199

172200
// Compose the result of this Director-test to report to the server
@@ -176,25 +204,25 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
176204
reportStatus = "ok"
177205
reportMessage = "Director test cycle succeeded at " + time.Now().Format(time.RFC3339)
178206
healthStatus = HealthStatusOK
179-
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), serverAd.Type, serverUrl)
207+
log.Debugf("Director file transfer test cycle succeeded at %s for %s server with URL at %s", time.Now().Format(time.RFC3339), currentServerAd.Type, currentServerAd.URL.String())
180208
} else {
181209
reportStatus = "error"
182-
reportMessage = "Director file transfer test cycle failed for server: " + serverUrl
210+
reportMessage = "Director file transfer test cycle failed for server: " + currentServerAd.URL.String()
183211
if testErr != nil {
184212
reportMessage += " " + testErr.Error()
185213
}
186214
healthStatus = HealthStatusError
187-
log.Warningln("Director file transfer test cycle failed for ", serverAd.Type, " server: ", serverUrl, " ", testErr)
215+
log.Warningln("Director file transfer test cycle failed for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " ", testErr)
188216
}
189217

190218
// Update healthTestUtils once per cycle
191219
func() {
192220
healthTestUtilsMutex.Lock()
193221
defer healthTestUtilsMutex.Unlock()
194-
if existingUtil, ok := healthTestUtils[serverAd.URL.String()]; ok {
222+
if existingUtil, ok := healthTestUtils[currentServerAd.URL.String()]; ok {
195223
existingUtil.Status = healthStatus
196224
} else {
197-
log.Debugln("HealthTestUtil missing for ", serverAd.Type, " server: ", serverUrl, " Failed to update internal status")
225+
log.Debugln("HealthTestUtil missing for ", currentServerAd.Type, " server: ", currentServerAd.URL.String(), " Failed to update internal status")
198226
}
199227
}()
200228

@@ -205,25 +233,27 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
205233
}
206234

207235
// Report the result of this Director-test back to origin/server (single call)
208-
reportErr := reportStatusToServer(ctx, serverWebUrl, reportStatus, reportMessage, serverAd.Type, false)
236+
reportErr := reportStatusToServer(ctx, currentServerAd.WebURL.String(), reportStatus, reportMessage, currentServerAd.Type, false)
209237

210238
// Determine report status metric and log if reporting failed
211239
reportStatusMetric := metrics.MetricSucceeded
212240
if reportErr != nil {
213241
reportStatusMetric = metrics.MetricFailed
214-
log.Warningf("Failed to report director test result to %s server at %s: %v", serverAd.Type, serverAd.WebURL.String(), reportErr)
242+
log.Warningf("Failed to report director test result to %s server at %s: %v", currentServerAd.Type, currentServerAd.WebURL.String(), reportErr)
215243
}
216244

217245
// Record metrics once per cycle
218246
metrics.PelicanDirectorFileTransferTestsRuns.With(
219247
prometheus.Labels{
220-
"server_name": serverName,
221-
"server_web_url": serverWebUrl,
222-
"server_type": string(serverAd.Type),
248+
"server_name": currentServerAd.Name,
249+
"server_web_url": currentServerAd.WebURL.String(),
250+
"server_type": string(currentServerAd.Type),
223251
"status": string(testStatusMetric),
224252
"report_status": string(reportStatusMetric),
225253
},
226254
).Inc()
255+
256+
return true // Test was run successfully
227257
}
228258

229259
// Run the first test immediately to avoid race with origin/cache 30s timeout.
@@ -235,7 +265,7 @@ func LaunchPeriodicDirectorTest(ctx context.Context, serverAd server_structs.Ser
235265
for {
236266
select {
237267
case <-ctx.Done():
238-
log.Debug(fmt.Sprintf("End director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
268+
log.Debug(fmt.Sprintf("Stopped the Director test suite for %s server %s at %s", serverAd.Type, serverName, serverUrl))
239269

240270
metrics.PelicanDirectorActiveFileTransferTestSuite.With(
241271
prometheus.Labels{

0 commit comments

Comments
 (0)