|
| 1 | +BEGIN; |
| 2 | + |
| 3 | +CREATE OR REPLACE FUNCTION admin.import_job_state_change_before() |
| 4 | + RETURNS trigger |
| 5 | + LANGUAGE plpgsql |
| 6 | +AS $function$ |
| 7 | +DECLARE |
| 8 | + v_timestamp TIMESTAMPTZ := now(); |
| 9 | + v_row_count INTEGER; |
| 10 | +BEGIN |
| 11 | + -- Record timestamps for state changes if not already recorded |
| 12 | + IF NEW.state = 'preparing_data' AND NEW.preparing_data_at IS NULL THEN |
| 13 | + NEW.preparing_data_at := v_timestamp; |
| 14 | + END IF; |
| 15 | + |
| 16 | + IF NEW.state = 'analysing_data' AND NEW.analysis_start_at IS NULL THEN |
| 17 | + NEW.analysis_start_at := v_timestamp; |
| 18 | + END IF; |
| 19 | + |
| 20 | + -- Set stop timestamps when transitioning *out* of a processing state |
| 21 | + IF OLD.state = 'analysing_data' AND NEW.state != OLD.state AND NEW.analysis_stop_at IS NULL THEN |
| 22 | + NEW.analysis_stop_at := v_timestamp; |
| 23 | + -- Compute final analysis speed here, since the progress trigger won't fire |
| 24 | + -- (it watches UPDATE OF analysis_stop_at, but this is a BEFORE trigger on UPDATE OF state) |
| 25 | + IF NEW.analysis_start_at IS NOT NULL AND NEW.total_rows > 0 THEN |
| 26 | + NEW.analysis_rows_per_sec := CASE |
| 27 | + WHEN EXTRACT(EPOCH FROM (NEW.analysis_stop_at - NEW.analysis_start_at)) <= 0 THEN 0 |
| 28 | + ELSE ROUND((NEW.total_rows::numeric / EXTRACT(EPOCH FROM (NEW.analysis_stop_at - NEW.analysis_start_at))), 2) |
| 29 | + END; |
| 30 | + END IF; |
| 31 | + END IF; |
| 32 | + |
| 33 | + IF OLD.state = 'processing_data' AND NEW.state != OLD.state AND NEW.processing_stop_at IS NULL THEN |
| 34 | + NEW.processing_stop_at := v_timestamp; |
| 35 | + -- Compute final processing speed here for the same reason |
| 36 | + IF NEW.processing_start_at IS NOT NULL AND NEW.imported_rows > 0 THEN |
| 37 | + NEW.import_rows_per_sec := CASE |
| 38 | + WHEN EXTRACT(EPOCH FROM (NEW.processing_stop_at - NEW.processing_start_at)) <= 0 THEN 0 |
| 39 | + ELSE ROUND((NEW.imported_rows::numeric / EXTRACT(EPOCH FROM (NEW.processing_stop_at - NEW.processing_start_at))), 2) |
| 40 | + END; |
| 41 | + END IF; |
| 42 | + END IF; |
| 43 | + |
| 44 | + -- Record timestamps for approval/rejection states |
| 45 | + IF NEW.state = 'approved' AND NEW.changes_approved_at IS NULL THEN |
| 46 | + NEW.changes_approved_at := v_timestamp; |
| 47 | + END IF; |
| 48 | + |
| 49 | + IF NEW.state = 'rejected' AND NEW.changes_rejected_at IS NULL THEN |
| 50 | + NEW.changes_rejected_at := v_timestamp; |
| 51 | + END IF; |
| 52 | + |
| 53 | + -- When a job is finished, waiting, or rejected, it is no longer actively processing a step. |
| 54 | + -- Clear the current step tracking fields. |
| 55 | + IF NEW.state IN ('waiting_for_review', 'finished', 'rejected') THEN |
| 56 | + NEW.current_step_code := NULL; |
| 57 | + NEW.current_step_priority := NULL; |
| 58 | + |
| 59 | + -- When a job is finished or rejected, it's done. The performance index will be dropped with the table. |
| 60 | + IF NEW.state IN ('finished', 'rejected') THEN |
| 61 | + RAISE DEBUG '[Job %] State is now %, performance index will be dropped with table.', NEW.id, NEW.state; |
| 62 | + END IF; |
| 63 | + END IF; |
| 64 | + |
| 65 | + -- Record start timestamp for processing_data state |
| 66 | + IF NEW.state = 'processing_data' AND NEW.processing_start_at IS NULL THEN |
| 67 | + NEW.processing_start_at := v_timestamp; |
| 68 | + END IF; |
| 69 | + |
| 70 | + -- Derive total_rows when state changes from waiting_for_upload to upload_completed |
| 71 | + IF OLD.state = 'waiting_for_upload' AND NEW.state = 'upload_completed' THEN |
| 72 | + -- Count rows in the upload table |
| 73 | + EXECUTE format('SELECT COUNT(*) FROM public.%I', NEW.upload_table_name) INTO v_row_count; |
| 74 | + NEW.total_rows := v_row_count; |
| 75 | + |
| 76 | + -- Calculate total weighted steps now that total_rows is known |
| 77 | + IF NEW.max_analysis_priority IS NOT NULL AND v_row_count > 0 THEN |
| 78 | + NEW.total_analysis_steps_weighted := v_row_count * NEW.max_analysis_priority; |
| 79 | + END IF; |
| 80 | + |
| 81 | + -- Set priority using the dedicated sequence |
| 82 | + -- Lower values = higher priority, so earlier jobs get lower sequence values |
| 83 | + -- This ensures jobs are processed in the order they were created |
| 84 | + NEW.priority := nextval('public.import_job_priority_seq')::integer; |
| 85 | + |
| 86 | + RAISE DEBUG 'Set total_rows to % and calculated total weighted steps for import job %', v_row_count, NEW.id; |
| 87 | + END IF; |
| 88 | + |
| 89 | + RETURN NEW; |
| 90 | +END; |
| 91 | +$function$; |
| 92 | + |
| 93 | +END; |
0 commit comments