Skip to content

Commit c6c71be

Browse files
committed
fix(admin): fixes
1 parent 2304adc commit c6c71be

File tree

2 files changed

+27
-9
lines changed

2 files changed

+27
-9
lines changed

admin/failover_reprocessor.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -147,10 +147,9 @@ func (m *ReprocessingJobManager) StartJob(config ReprocessingJobConfig) (*Reproc
147147
}
148148
m.Infof("[StartJob] Found %d files to process", len(fileItems))
149149

150-
// Determine number of workers (default: 1 worker per 10 files, max 50 workers)
151-
workerCount := (len(fileItems) + 9) / 10
152-
if workerCount > 50 {
153-
workerCount = 50
150+
workerCount := len(fileItems)
151+
if workerCount > m.config.K8sMaxParallelWorkers {
152+
workerCount = m.config.K8sMaxParallelWorkers
154153
}
155154
if workerCount < 1 {
156155
workerCount = 1
@@ -446,7 +445,12 @@ func (m *ReprocessingJobManager) GetJobWorkers(id string) ([]map[string]interfac
446445
func (m *ReprocessingJobManager) ListJobs() []*ReprocessingJob {
447446
m.Infof("[ListJobs] Starting")
448447

449-
jobs, err := ListReprocessingJobs(m.dbpool)
448+
// Create context with timeout for database query
449+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
450+
defer cancel()
451+
452+
m.Infof("[ListJobs] Querying database for jobs")
453+
jobs, err := ListReprocessingJobs(ctx, m.dbpool)
450454
if err != nil {
451455
m.Errorf("[ListJobs] Failed to list jobs from database: %v", err)
452456
return []*ReprocessingJob{}
@@ -455,14 +459,28 @@ func (m *ReprocessingJobManager) ListJobs() []*ReprocessingJob {
455459

456460
// Enrich running jobs with K8s status to detect completion
457461
if m.k8sClient != nil {
462+
runningJobCount := 0
458463
for _, job := range jobs {
459464
if job.Status == JobStatusRunning && job.K8sJobName != "" {
465+
runningJobCount++
466+
}
467+
}
468+
m.Infof("[ListJobs] Found %d running jobs to enrich with K8s status", runningJobCount)
469+
470+
for i, job := range jobs {
471+
if job.Status == JobStatusRunning && job.K8sJobName != "" {
472+
m.Infof("[ListJobs] Enriching job %d/%d: %s (k8s_job=%s)", i+1, runningJobCount, job.ID, job.K8sJobName)
460473
if err := m.enrichJobWithK8sStatus(job); err != nil {
461474
// Log but don't fail
462-
m.Warnf("Failed to get K8s status for job %s: %v", job.ID, err)
475+
m.Warnf("[ListJobs] Failed to get K8s status for job %s: %v", job.ID, err)
476+
} else {
477+
m.Infof("[ListJobs] Successfully enriched job %s, new status=%s", job.ID, job.Status)
463478
}
464479
}
465480
}
481+
m.Infof("[ListJobs] Finished enriching %d running jobs", runningJobCount)
482+
} else {
483+
m.Infof("[ListJobs] No K8s client available, skipping enrichment")
466484
}
467485

468486
m.Infof("[ListJobs] Completed, returning %d jobs", len(jobs))

admin/reprocessing_db.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ func GetReprocessingJob(pool *pgxpool.Pool, jobID string) (*ReprocessingJob, err
120120
}
121121

122122
// ListReprocessingJobs lists all reprocessing jobs
123-
func ListReprocessingJobs(pool *pgxpool.Pool) ([]*ReprocessingJob, error) {
124-
rows, err := pool.Query(context.Background(),
123+
func ListReprocessingJobs(ctx context.Context, pool *pgxpool.Pool) ([]*ReprocessingJob, error) {
124+
rows, err := pool.Query(ctx,
125125
`SELECT id, config, status, created_at, started_at, completed_at, k8s_job_name, total_files, total_workers, error
126126
FROM reprocessing_jobs
127127
ORDER BY created_at DESC`)
@@ -154,7 +154,7 @@ func ListReprocessingJobs(pool *pgxpool.Pool) ([]*ReprocessingJob, error) {
154154
}
155155

156156
// Get aggregated stats for this job
157-
err = pool.QueryRow(context.Background(),
157+
err = pool.QueryRow(ctx,
158158
`SELECT
159159
COALESCE(SUM(processed_files), 0),
160160
COALESCE(SUM(total_lines), 0),

0 commit comments

Comments
 (0)