Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons
ErrorMsg: fmt.Sprintf("no job with JobId %v exists", req.JobID),
}
}

// If the job manager was not found, then Job was resurrected
// Get the Job manager again for given JobId
jm, _ := JobsAdmin.JobMgr(req.JobID)
Expand Down Expand Up @@ -402,7 +403,6 @@ func ResumeJobOrder(req common.ResumeJobRequest) common.CancelPauseResumeRespons
})

jm.ResumeTransfers(steCtx) // Reschedule all job part's transfers
// }()
jr = common.CancelPauseResumeResponse{
CancelledPauseResumed: true,
ErrorMsg: "",
Expand Down
16 changes: 13 additions & 3 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,10 +716,21 @@ func (jm *jobMgr) ResumeTransfers(appCtx context.Context) {
// Since while creating the JobMgr, atomicAllTransfersScheduled is set to true
// reset it to false while resuming it
jm.ResetAllTransfersScheduled()
jm.jobPartMgrs.Iterate(false, func(p common.PartNumber, jpm IJobPartMgr) {

// In mover builds, use READ lock (not write lock) to iterate jobPartMgrs while queuing to partsChannel.
// A write lock here would deadlock when partsChannel fills up (capacity 1000):
// - ResumeTransfers holds write lock, blocks on partsChannel send
// - reportJobPartDoneHandler needs read lock via Get(0), blocks on write lock
// - scheduleJobParts blocks on unbuffered jobPartProgress, waiting for reportJobPartDoneHandler
// Read lock avoids this because RWMutex allows concurrent readers.
// No writers exist at this point — all AddJobPart/AddJobOrder calls completed in ResurrectJob (step 1).
useReadLock := buildmode.IsMover
partCount := 0
jm.jobPartMgrs.Iterate(useReadLock, func(p common.PartNumber, jpm IJobPartMgr) {
jm.QueueJobParts(jpm)
// jpm.ScheduleTransfers(jm.ctx, includeTransfer, excludeTransfer)
partCount++
})
common.GetLifecycleMgr().Info(fmt.Sprintf("[RESUME] JobId=%s: all %d parts queued successfully", jm.jobID, partCount))
}

// When a previously job is resumed, ResetFailedTransfersCount
Expand Down Expand Up @@ -1170,7 +1181,6 @@ func (jm *jobMgr) scheduleJobParts() {
return

case jobPart := <-jm.xferChannels.partsChannel:

if !startedPoolSizer {
// spin up a GR to coordinate dynamic sizing of the main pool
// It will automatically spin up the right number of chunk processors
Expand Down
15 changes: 9 additions & 6 deletions ste/mgr-JobPartMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,14 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {

jpm.clientInfo()

// Cache IsFinalPart before the transfer loop, since plan memory may be unmapped
// by progressive cleanup after the last ReportTransferDone fires.
isFinalPart := plan.IsFinalPart

// *** Schedule this job part's transfers ***
for t := uint32(0); t < plan.NumTransfers; t++ {
// Use cachedNumTransfers (copied at init) instead of plan.NumTransfers (mmap'd memory)
// to avoid SIGSEGV if the plan file is unmapped by progressive cleanup while this loop is still running.
for t := uint32(0); t < jpm.cachedNumTransfers; t++ {
jppt := plan.Transfer(t)
ts := jppt.TransferStatus()
if ts == common.ETransferStatus.Success() {
Expand Down Expand Up @@ -430,12 +436,12 @@ func (jpm *jobPartMgr) ScheduleTransfers(jobCtx context.Context) {
// atomicAllTransfersScheduled variables is used in case of resume job
// Since iterating the JobParts and scheduling transfer is independent
// a variable is required which defines whether last part is resumed or not
if plan.IsFinalPart {
if isFinalPart {
jpm.jobMgr.ConfirmAllTransfersScheduled()
}
}

if plan.IsFinalPart {
if isFinalPart {
jpm.Log(common.LogInfo, "Final job part has been scheduled")
}
}
Expand Down Expand Up @@ -693,11 +699,8 @@ func (jpm *jobPartMgr) UnmapPlanFile() {

jpm.planMMF.Unmap()
if partNum%100 == 0 {
// logging for every 100th part num
fmt.Printf("DEBUG: Unmap() completed for part %d\n", partNum)
}
// Note: We don't set planMMF to nil here to maintain Plan() access,
// but the memory is freed. The Unmap() is idempotent so it's safe to call again.
}

// TODO: added for debugging purpose. remove later
Expand Down