diff --git a/api/src/__integration__/db/search-feature-repository.integration.ts b/api/src/__integration__/db/search-feature-repository.integration.ts new file mode 100644 index 000000000..279d7ac95 --- /dev/null +++ b/api/src/__integration__/db/search-feature-repository.integration.ts @@ -0,0 +1,180 @@ +// Integration test for SearchFeatureRepository.deleteSearchRecordsBySubmissionId — +// verifies the subquery-based delete correctly removes records from all 4 search tables +// for a given submission, without affecting records belonging to other submissions. +// +// Uses a transaction that is ROLLED BACK after each test, so no data is persisted. +// +// Run: make test-db +// Requires: make web (database must be running with seed data) + +import { expect } from 'chai'; +import SQL from 'sql-template-strings'; +import { defaultPoolConfig, getAPIUserDBConnection, IDBConnection, initDBPool } from '../../database/db'; +import { SearchFeatureRepository } from '../../repositories/search-feature-repository'; + +describe('SearchFeatureRepository (integration)', function () { + this.timeout(15000); + + let connection: IDBConnection; + let repo: SearchFeatureRepository; + + before(() => { + initDBPool(defaultPoolConfig); + }); + + beforeEach(async () => { + connection = getAPIUserDBConnection(); + await connection.open(); + repo = new SearchFeatureRepository(connection); + }); + + afterEach(async () => { + await connection.rollback(); + connection.release(); + }); + + /** + * Helper: insert a minimal submission and return its ID. + */ + async function createTestSubmission(): Promise { + const systemUserId = connection.systemUserId(); + + const result = await connection.sql(SQL` + INSERT INTO submission (uuid, system_user_id, source_system, name, description, comment, create_user) + VALUES (gen_random_uuid(), ${systemUserId}, 'SIMS', 'Search Index Test', 'Test', 'Test', ${systemUserId}) + RETURNING submission_id; + `); + + return result.rows[0].submission_id; + } + + /** + * Helper: insert a submission_feature and return its ID. + */ + async function createTestFeature(submissionId: number): Promise { + const systemUserId = connection.systemUserId(); + + const result = await connection.sql(SQL` + INSERT INTO submission_feature (submission_id, feature_type_id, data, data_byte_size, create_user) + VALUES ( + ${submissionId}, + (SELECT feature_type_id FROM feature_type WHERE name = 'dataset' LIMIT 1), + '{"name": "test"}'::jsonb, + 100, + ${systemUserId} + ) + RETURNING submission_feature_id; + `); + + return result.rows[0].submission_feature_id; + } + + /** + * Helper: insert a search_string record. + */ + async function insertSearchString(submissionFeatureId: number, featurePropertyId: number, value: string) { + const systemUserId = connection.systemUserId(); + + await connection.sql(SQL` + INSERT INTO search_string (submission_feature_id, feature_property_id, value, create_user) + VALUES (${submissionFeatureId}, ${featurePropertyId}, ${value}, ${systemUserId}); + `); + } + + /** + * Helper: insert a search_number record. + */ + async function insertSearchNumber(submissionFeatureId: number, featurePropertyId: number, value: number) { + const systemUserId = connection.systemUserId(); + + await connection.sql(SQL` + INSERT INTO search_number (submission_feature_id, feature_property_id, value, create_user) + VALUES (${submissionFeatureId}, ${featurePropertyId}, ${value}, ${systemUserId}); + `); + } + + /** + * Helper: count records in a search table for a given submission_feature_id. + */ + async function countSearchRecords(table: string, submissionFeatureId: number): Promise { + const result = await connection.sql( + SQL` + SELECT count(*)::integer as count + FROM `.append(table).append(SQL` + WHERE submission_feature_id = ${submissionFeatureId}; + `) + ); + + return result.rows[0].count; + } + + describe('deleteSearchRecordsBySubmissionId', () => { + it('deletes search_string and search_number records for the given submission', async () => { + // Arrange: create submission with a feature and search records + const submissionId = await createTestSubmission(); + const featureId = await createTestFeature(submissionId); + + // feature_property_id 1 = site_identifier (string type), 6 = measurement_value (number type) + await insertSearchString(featureId, 1, 'test-site'); + await insertSearchNumber(featureId, 6, 42); + + // Verify records exist + expect(await countSearchRecords('search_string', featureId)).to.equal(1); + expect(await countSearchRecords('search_number', featureId)).to.equal(1); + + // Act + await repo.deleteSearchRecordsBySubmissionId(submissionId); + + // Assert: all records deleted + expect(await countSearchRecords('search_string', featureId)).to.equal(0); + expect(await countSearchRecords('search_number', featureId)).to.equal(0); + }); + + it('does not delete records belonging to a different submission', async () => { + // Arrange: create two submissions with search records + const submissionId1 = await createTestSubmission(); + const featureId1 = await createTestFeature(submissionId1); + await insertSearchString(featureId1, 1, 'submission-1-value'); + + const submissionId2 = await createTestSubmission(); + const featureId2 = await createTestFeature(submissionId2); + await insertSearchString(featureId2, 1, 'submission-2-value'); + + // Act: delete only submission 1's records + await repo.deleteSearchRecordsBySubmissionId(submissionId1); + + // Assert: submission 1 records gone, submission 2 records intact + expect(await countSearchRecords('search_string', featureId1)).to.equal(0); + expect(await countSearchRecords('search_string', featureId2)).to.equal(1); + }); + + it('succeeds with no error when submission has no search records', async () => { + const submissionId = await createTestSubmission(); + await createTestFeature(submissionId); + + // Act: should not throw + await repo.deleteSearchRecordsBySubmissionId(submissionId); + }); + + it('deletes records across multiple features in the same submission', async () => { + // Arrange: one submission, two features, each with search records + const submissionId = await createTestSubmission(); + const featureId1 = await createTestFeature(submissionId); + const featureId2 = await createTestFeature(submissionId); + + await insertSearchString(featureId1, 1, 'feature-1-value'); + await insertSearchString(featureId2, 1, 'feature-2-value'); + await insertSearchNumber(featureId1, 6, 10); + await insertSearchNumber(featureId2, 6, 20); + + // Act + await repo.deleteSearchRecordsBySubmissionId(submissionId); + + // Assert: all records across both features deleted + expect(await countSearchRecords('search_string', featureId1)).to.equal(0); + expect(await countSearchRecords('search_string', featureId2)).to.equal(0); + expect(await countSearchRecords('search_number', featureId1)).to.equal(0); + expect(await countSearchRecords('search_number', featureId2)).to.equal(0); + }); + }); +}); diff --git a/api/src/__integration__/system/process-submission-features-worker.integration.ts b/api/src/__integration__/system/process-submission-features-worker.integration.ts index 08908ff15..4ad8ce8dd 100644 --- a/api/src/__integration__/system/process-submission-features-worker.integration.ts +++ b/api/src/__integration__/system/process-submission-features-worker.integration.ts @@ -89,6 +89,17 @@ describe('Process Submission Features Worker', function () { try { // Clean up in reverse FK order for (const submissionId of createdSubmissionIds) { + // search_ tables FK to submission_feature — delete first + const featureIds = await db('biohub.submission_feature') + .where('submission_id', submissionId) + .select('submission_feature_id'); + const ids = featureIds.map((r: { submission_feature_id: number }) => r.submission_feature_id); + if (ids.length) { + await db('biohub.search_string').whereIn('submission_feature_id', ids).del(); + await db('biohub.search_number').whereIn('submission_feature_id', ids).del(); + await db('biohub.search_datetime').whereIn('submission_feature_id', ids).del(); + await db('biohub.search_spatial').whereIn('submission_feature_id', ids).del(); + } await db('biohub.submission_feature').where('submission_id', submissionId).del(); await db('biohub.submission_validation').where('submission_id', submissionId).del(); await db('biohub.submission_upload').where('submission_id', submissionId).del(); diff --git a/api/src/queue/jobs/index-submission-features-job.test.ts b/api/src/queue/jobs/index-submission-features-job.test.ts new file mode 100644 index 000000000..fb5404dcb --- /dev/null +++ b/api/src/queue/jobs/index-submission-features-job.test.ts @@ -0,0 +1,139 @@ +import chai, { expect } from 'chai'; +import { describe } from 'mocha'; +import PgBoss from 'pg-boss'; +import sinon from 'sinon'; +import sinonChai from 'sinon-chai'; +import * as db from '../../database/db'; +import { SearchFeatureService } from '../../services/search-feature-service'; +import { getMockDBConnection } from '../../__mocks__/db'; +import { + IIndexSubmissionFeaturesJobData, + indexSubmissionFeaturesFailedHandler, + indexSubmissionFeaturesJobHandler +} from './index-submission-features-job'; + +chai.use(sinonChai); + +describe('indexSubmissionFeaturesJobHandler', () => { + afterEach(() => { + sinon.restore(); + }); + + const createMockJob = (submissionId: number, id = 'job-1'): PgBoss.Job => + ({ + id, + name: 'index-submission-features', + data: { submissionId } + } as PgBoss.Job); + + it('should index submission successfully', async () => { + const mockDBConnection = getMockDBConnection(); + const openStub = sinon.stub().resolves(); + const commitStub = sinon.stub().resolves(); + const releaseStub = sinon.stub(); + mockDBConnection.open = openStub; + mockDBConnection.commit = commitStub; + mockDBConnection.release = releaseStub; + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + const indexStub = sinon.stub(SearchFeatureService.prototype, 'indexFeaturesBySubmissionId').resolves(); + + await indexSubmissionFeaturesJobHandler([createMockJob(777)]); + + expect(indexStub).to.have.been.calledOnceWith(777); + expect(commitStub).to.have.been.calledOnce; + expect(releaseStub).to.have.been.calledOnce; + }); + + it('should roll back and throw on indexing failure', async () => { + const mockDBConnection = getMockDBConnection(); + const rollbackStub = sinon.stub().resolves(); + const releaseStub = sinon.stub(); + mockDBConnection.open = sinon.stub().resolves(); + mockDBConnection.rollback = rollbackStub; + mockDBConnection.release = releaseStub; + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + const testError = new Error('Indexing failed'); + sinon.stub(SearchFeatureService.prototype, 'indexFeaturesBySubmissionId').rejects(testError); + + try { + await indexSubmissionFeaturesJobHandler([createMockJob(777)]); + expect.fail('Should have thrown an error'); + } catch (error) { + expect((error as Error).message).to.equal('Indexing failed'); + } + + expect(rollbackStub).to.have.been.calledOnce; + expect(releaseStub).to.have.been.calledOnce; + }); + + it('should process multiple jobs in sequence', async () => { + const openStub = sinon.stub().resolves(); + const commitStub = sinon.stub().resolves(); + const releaseStub = sinon.stub(); + + const mockDBConnection = getMockDBConnection(); + mockDBConnection.open = openStub; + mockDBConnection.commit = commitStub; + mockDBConnection.release = releaseStub; + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + const indexStub = sinon.stub(SearchFeatureService.prototype, 'indexFeaturesBySubmissionId').resolves(); + + await indexSubmissionFeaturesJobHandler([createMockJob(1, 'job-1'), createMockJob(2, 'job-2')]); + + expect(indexStub.callCount).to.equal(2); + expect(openStub.callCount).to.equal(2); + expect(commitStub.callCount).to.equal(2); + expect(releaseStub.callCount).to.equal(2); + }); + + it('should handle empty jobs array', async () => { + const getConnectionStub = sinon.stub(db, 'getAPIUserDBConnection'); + + await indexSubmissionFeaturesJobHandler([]); + + expect(getConnectionStub).not.to.have.been.called; + }); +}); + +describe('indexSubmissionFeaturesFailedHandler', () => { + afterEach(() => { + sinon.restore(); + }); + + it('should log failure with error output without throwing', async () => { + const getConnectionStub = sinon.stub(db, 'getAPIUserDBConnection'); + + const job = { + id: 'job-1', + name: 'index-submission-features-failed', + data: { submissionId: 777 }, + output: { message: 'Indexing failed after retries' } + } as unknown as PgBoss.Job; + + await indexSubmissionFeaturesFailedHandler([job]); + + // DLQ handler is log-only — no DB connection should be opened + expect(getConnectionStub).not.to.have.been.called; + }); + + it('should log default message when output is null', async () => { + const getConnectionStub = sinon.stub(db, 'getAPIUserDBConnection'); + + const job = { + id: 'job-2', + name: 'index-submission-features-failed', + data: { submissionId: 888 }, + output: null + } as unknown as PgBoss.Job; + + await indexSubmissionFeaturesFailedHandler([job]); + + expect(getConnectionStub).not.to.have.been.called; + }); +}); diff --git a/api/src/queue/jobs/index-submission-features-job.ts b/api/src/queue/jobs/index-submission-features-job.ts new file mode 100644 index 000000000..46cec2ac8 --- /dev/null +++ b/api/src/queue/jobs/index-submission-features-job.ts @@ -0,0 +1,100 @@ +import PgBoss from 'pg-boss'; +import { getAPIUserDBConnection } from '../../database/db'; +import { SearchFeatureService } from '../../services/search-feature-service'; +import { getLogger } from '../../utils/logger'; + +const defaultLog = getLogger('queue/jobs/index-submission-features-job'); + +/** + * Index submission features job data interface. + * Contains the submission ID for async search indexing. + */ +export interface IIndexSubmissionFeaturesJobData { + /** The submission ID whose features should be indexed for search */ + submissionId: number; +} + +/** + * Index submission features job handler. + * + * Indexes submission features for search asynchronously after validation completes. + * Indexing is a separate concern from validation — a failed index does not invalidate + * the submission. The DLQ handler logs but does not affect validation status. + * + * @param {PgBoss.Job[]} jobs The jobs to process + * @return {*} {Promise} + */ +export const indexSubmissionFeaturesJobHandler: PgBoss.WorkHandler = async (jobs) => { + for (const job of jobs) { + const { submissionId } = job.data; + + defaultLog.info({ + label: 'indexSubmissionFeaturesJobHandler', + message: 'Processing index submission features job', + jobId: job.id, + submissionId + }); + + const connection = getAPIUserDBConnection(); + + try { + await connection.open(); + + const searchFeatureService = new SearchFeatureService(connection); + await searchFeatureService.indexFeaturesBySubmissionId(submissionId); + + await connection.commit(); + + defaultLog.info({ + label: 'indexSubmissionFeaturesJobHandler', + message: 'Index submission features job completed successfully', + jobId: job.id, + submissionId + }); + } catch (error) { + await connection.rollback(); + + defaultLog.error({ + label: 'indexSubmissionFeaturesJobHandler', + message: 'Index submission features job failed', + jobId: job.id, + submissionId, + error + }); + + throw error; // pg-boss will handle retry based on configuration + } finally { + connection.release(); + } + } +}; + +/** + * Dead Letter Queue handler for failed index submission features jobs. + * + * Indexing failure is independent of validation — a failed index does not invalidate + * the submission. This handler only logs the failure with error details. There is no + * tracking record to update (unlike malware scan or download jobs), so no DB connection + * is needed. + * + * @param {PgBoss.Job[]} jobs The failed jobs + * @return {*} {Promise} + */ +export const indexSubmissionFeaturesFailedHandler: PgBoss.WorkHandler = async ( + jobs +) => { + for (const job of jobs) { + const { submissionId } = job.data; + + // Cast to access output field available on failed jobs + const jobOutput = (job as PgBoss.JobWithMetadata).output; + + defaultLog.warn({ + label: 'indexSubmissionFeaturesFailedHandler', + message: 'Index submission features job failed after all retries', + jobId: job.id, + submissionId, + output: jobOutput ?? 'Job failed after all retries' + }); + } +}; diff --git a/api/src/queue/jobs/index.ts b/api/src/queue/jobs/index.ts index a834ef978..583e1e308 100644 --- a/api/src/queue/jobs/index.ts +++ b/api/src/queue/jobs/index.ts @@ -30,7 +30,16 @@ export const JobQueues = { * Dead letter queue for failed download jobs. * Jobs are moved here after all retries are exhausted. */ - PROCESS_DOWNLOAD_FAILED: 'process-download-failed' + PROCESS_DOWNLOAD_FAILED: 'process-download-failed', + /** + * Index submission features job queue for async search indexing after validation. + */ + INDEX_SUBMISSION_FEATURES: 'index-submission-features', + /** + * Dead letter queue for failed index-submission-features jobs. + * Jobs are moved here after all retries are exhausted. + */ + INDEX_SUBMISSION_FEATURES_FAILED: 'index-submission-features-failed' } as const; export type JobQueueName = (typeof JobQueues)[keyof typeof JobQueues]; diff --git a/api/src/queue/jobs/process-submission-features-job.test.ts b/api/src/queue/jobs/process-submission-features-job.test.ts index 0ca3fa48e..c5fe76c20 100644 --- a/api/src/queue/jobs/process-submission-features-job.test.ts +++ b/api/src/queue/jobs/process-submission-features-job.test.ts @@ -7,6 +7,7 @@ import { ValidationErrorType } from '../../services/ingestion/feature-validation import { SubmissionIngestionService } from '../../services/ingestion/submission-ingestion-service'; import { SubmissionValidationService } from '../../services/submission-validation-service'; import { getMockDBConnection } from '../../__mocks__/db'; +import * as publisher from '../publisher'; import { IProcessSubmissionFeaturesJobData, processSubmissionFeaturesFailedHandler, @@ -41,6 +42,10 @@ describe('process-submission-features-job', () => { sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + const mockJobs = [createMockJob(123)]; await processSubmissionFeaturesJobHandler(mockJobs); @@ -63,6 +68,10 @@ describe('process-submission-features-job', () => { .stub(SubmissionIngestionService.prototype, 'processSubmission') .resolves({ valid: true, errors: [] }); + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + const mockJobs = [createMockJob(456)]; await processSubmissionFeaturesJobHandler(mockJobs); @@ -158,6 +167,10 @@ describe('process-submission-features-job', () => { sinon.stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus').resolves(); sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + const mockJobs = [createMockJob(123, 'job-1'), createMockJob(456, 'job-2')]; await processSubmissionFeaturesJobHandler(mockJobs); @@ -193,12 +206,137 @@ describe('process-submission-features-job', () => { sinon.stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus').resolves(); sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + const mockJobs = [createMockJob(123)]; await processSubmissionFeaturesJobHandler(mockJobs); expect(releaseStub.calledOnce).to.be.true; }); + + it('publishes indexing job after validation completes', async () => { + const mockDBConnection = getMockDBConnection(); + + mockDBConnection.open = sinon.stub().resolves(); + mockDBConnection.commit = sinon.stub().resolves(); + mockDBConnection.release = sinon.stub(); + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + const updateStatusStub = sinon + .stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus') + .resolves(); + + sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + + const publishStub = sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + + const mockJobs = [createMockJob(123)]; + + await processSubmissionFeaturesJobHandler(mockJobs); + + expect(publishStub.calledOnce).to.be.true; + expect(publishStub.firstCall.args[0]).to.equal(mockDBConnection); + expect(publishStub.firstCall.args[1]).to.deep.equal({ submissionId: 123 }); + + // Publish must happen after 'completed' status update + expect(updateStatusStub.calledWith('test-job-id', 'completed')).to.be.true; + expect(publishStub.calledAfter(updateStatusStub)).to.be.true; + }); + + it('validation succeeds even if indexing publish fails (fire-and-forget)', async () => { + const mockDBConnection = getMockDBConnection(); + const commitStub = sinon.stub().resolves(); + + mockDBConnection.open = sinon.stub().resolves(); + mockDBConnection.commit = commitStub; + mockDBConnection.release = sinon.stub(); + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + sinon.stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus').resolves(); + + sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'error', message: 'pg-boss unavailable' }); + + const mockJobs = [createMockJob(123)]; + + // Should NOT throw — fire-and-forget + await processSubmissionFeaturesJobHandler(mockJobs); + + // Connection should still be committed (validation succeeded) + expect(commitStub.called).to.be.true; + }); + + it('validation succeeds when indexing publish returns duplicate', async () => { + const mockDBConnection = getMockDBConnection(); + const commitStub = sinon.stub().resolves(); + + mockDBConnection.open = sinon.stub().resolves(); + mockDBConnection.commit = commitStub; + mockDBConnection.release = sinon.stub(); + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + sinon.stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus').resolves(); + + sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ valid: true, errors: [] }); + + sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'duplicate', message: 'Job already exists for this submission' }); + + const mockJobs = [createMockJob(123)]; + + // Should NOT throw — duplicate is acceptable + await processSubmissionFeaturesJobHandler(mockJobs); + + expect(commitStub.called).to.be.true; + }); + + it('does not publish indexing job when validation returns invalid', async () => { + const mockDBConnection = getMockDBConnection(); + + mockDBConnection.open = sinon.stub().resolves(); + mockDBConnection.commit = sinon.stub().resolves(); + mockDBConnection.release = sinon.stub(); + + sinon.stub(db, 'getAPIUserDBConnection').returns(mockDBConnection); + + sinon.stub(SubmissionValidationService.prototype, 'updateSubmissionValidationStatus').resolves(); + + sinon.stub(SubmissionIngestionService.prototype, 'processSubmission').resolves({ + valid: false, + errors: [ + { + type: ValidationErrorType.INVALID_PROPERTY_TYPE, + featureId: 'feat-1', + featureType: 'observation', + field: 'count', + value: 'abc', + message: 'Expected number' + } + ] + }); + + const publishStub = sinon + .stub(publisher, 'publishIndexSubmissionFeaturesJob') + .resolves({ status: 'published', jobId: 'index-job-id' }); + + const mockJobs = [createMockJob(123)]; + + await processSubmissionFeaturesJobHandler(mockJobs); + + expect(publishStub.called).to.be.false; + }); }); describe('processSubmissionFeaturesFailedHandler', () => { diff --git a/api/src/queue/jobs/process-submission-features-job.ts b/api/src/queue/jobs/process-submission-features-job.ts index eacdfe30e..08732a5b4 100644 --- a/api/src/queue/jobs/process-submission-features-job.ts +++ b/api/src/queue/jobs/process-submission-features-job.ts @@ -3,6 +3,7 @@ import { getAPIUserDBConnection } from '../../database/db'; import { SubmissionIngestionService } from '../../services/ingestion/submission-ingestion-service'; import { SubmissionValidationService } from '../../services/submission-validation-service'; import { getLogger } from '../../utils/logger'; +import { publishIndexSubmissionFeaturesJob } from '../publisher'; const defaultLog = getLogger('queue/jobs/process-submission-features-job'); @@ -23,7 +24,6 @@ export interface IProcessSubmissionFeaturesJobData { * 2. Extracts and validates features * 3. Inserts feature records * 4. Indexes features for search - * 5. Calculates and adds geographic regions * * @param {PgBoss.Job[]} jobs The jobs to process * @return {*} {Promise} @@ -78,6 +78,18 @@ export const processSubmissionFeaturesJobHandler: PgBoss.WorkHandler { afterEach(() => { @@ -299,6 +304,100 @@ describe('publisher', () => { }); }); + describe('publishIndexSubmissionFeaturesJob', () => { + it('publishes an index submission features job', async () => { + const mockConnection = getMockDBConnection(); + const sendStub = sinon.stub().resolves('index-job-id'); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { send: sendStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + const data = { submissionId: 777 }; + const result = await publishIndexSubmissionFeaturesJob(mockConnection, data); + + expect(createQueueStub.calledOnce).to.be.true; + expect(createQueueStub.firstCall.args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES); + expect(sendStub.calledOnce).to.be.true; + expect(sendStub.firstCall.args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES); + expect(sendStub.firstCall.args[1]).to.deep.equal(data); + expect(result.status).to.equal('published'); + expect((result as { status: 'published'; jobId: string }).jobId).to.equal('index-job-id'); + }); + + it('uses index submission features options with 10 minute timeout', async () => { + const mockConnection = getMockDBConnection(); + const sendStub = sinon.stub().resolves('index-job-id'); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { send: sendStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + await publishIndexSubmissionFeaturesJob(mockConnection, { submissionId: 777 }); + + const options = sendStub.firstCall.args[2]; + expect(options.retryLimit).to.equal(3); + expect(options.retryDelay).to.equal(60); + expect(options.retryBackoff).to.equal(true); + expect(options.expireInSeconds).to.equal(60 * 10); // 10 minutes + }); + + it('passes db option using caller connection for transactional job insert', async () => { + const mockConnection = getMockDBConnection(); + const sendStub = sinon.stub().resolves('index-job-id'); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { send: sendStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + await publishIndexSubmissionFeaturesJob(mockConnection, { submissionId: 777 }); + + const options = sendStub.firstCall.args[2]; + expect(options.db).to.exist; + expect(options.db.executeSql).to.be.a('function'); + }); + + it('uses singletonKey based on submissionId to prevent duplicates', async () => { + const mockConnection = getMockDBConnection(); + const sendStub = sinon.stub().resolves('index-job-id'); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { send: sendStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + await publishIndexSubmissionFeaturesJob(mockConnection, { submissionId: 456 }); + + const options = sendStub.firstCall.args[2]; + expect(options.singletonKey).to.equal('submission-idx-456'); + }); + + it('returns duplicate status when send returns null', async () => { + const mockConnection = getMockDBConnection(); + const sendStub = sinon.stub().resolves(null); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { send: sendStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + const result = await publishIndexSubmissionFeaturesJob(mockConnection, { submissionId: 777 }); + + expect(result.status).to.equal('duplicate'); + expect((result as { status: 'duplicate'; message: string }).message).to.equal( + 'Job already exists for this submission' + ); + }); + + it('returns error status when pg-boss throws', async () => { + const mockConnection = getMockDBConnection(); + sinon.stub(pgBossService, 'getPgBoss').throws(new Error('pg-boss not initialized')); + + const result = await publishIndexSubmissionFeaturesJob(mockConnection, { submissionId: 777 }); + + expect(result.status).to.equal('error'); + expect((result as { status: 'error'; message: string }).message).to.equal('pg-boss not initialized'); + }); + }); + describe('publishProcessDownloadJob', () => { const createMockDownload = (overrides: Partial = {}): DownloadRecord => ({ download_id: 'aaaa0000-0000-0000-0000-000000000001', diff --git a/api/src/queue/publisher.ts b/api/src/queue/publisher.ts index 60ae2e4f7..45bcb3a45 100644 --- a/api/src/queue/publisher.ts +++ b/api/src/queue/publisher.ts @@ -4,6 +4,7 @@ import { DownloadService } from '../services/download/download-service'; import { SubmissionValidationService } from '../services/submission-validation-service'; import { getLogger } from '../utils/logger'; import { JobQueues } from './jobs'; +import { IIndexSubmissionFeaturesJobData } from './jobs/index-submission-features-job'; import { IMalwareScanJobData } from './jobs/malware-scan-job'; import { IProcessDownloadJobData } from './jobs/process-download-job'; import { IProcessSubmissionFeaturesJobData } from './jobs/process-submission-features-job'; @@ -323,3 +324,77 @@ export const publishProcessDownloadJob = async ( return { status: 'error', message: errorMessage }; } }; + +/** + * Options for index submission features jobs. + * Same timeout as validation — indexing should complete within minutes. + */ +const INDEX_SUBMISSION_FEATURES_OPTIONS: IPublishOptions = { + retryLimit: 3, + retryDelay: 60, + retryBackoff: true, + expireInSeconds: 60 * 10 // 10 minutes +}; + +/** + * Publish an index submission features job to the queue. + * + * Queues async search indexing for a submission's features. Uses the caller's + * DB connection via pg-boss's `db` option so the job insert participates in + * the same transaction — if the caller rolls back, the job is never visible. + * + * @param {IDBConnection} connection Database connection for transactional job insert + * @param {IIndexSubmissionFeaturesJobData} data Job data containing submissionId + * @param {IPublishOptions} [options={}] Job options + * @return {*} {Promise} Result indicating success, duplicate, or error + */ +export const publishIndexSubmissionFeaturesJob = async ( + connection: IDBConnection, + data: IIndexSubmissionFeaturesJobData, + options: IPublishOptions = {} +): Promise => { + try { + const boss = getPgBoss(); + const mergedOptions = { ...INDEX_SUBMISSION_FEATURES_OPTIONS, ...options }; + + await boss.createQueue(JobQueues.INDEX_SUBMISSION_FEATURES); + + // Use singletonKey to prevent duplicate concurrent indexing jobs for the same submission + // Pass caller's connection via db option so job insert is part of the same transaction + const jobId = await boss.send(JobQueues.INDEX_SUBMISSION_FEATURES, data, { + ...mergedOptions, + singletonKey: `submission-idx-${data.submissionId}`, + db: { executeSql: (text: string, values: any[]) => connection.query(text, values) } + }); + + if (jobId) { + defaultLog.info({ + label: 'publishIndexSubmissionFeaturesJob', + message: 'Index submission features job published', + jobId, + submissionId: data.submissionId + }); + + return { status: 'published', jobId }; + } + + defaultLog.warn({ + label: 'publishIndexSubmissionFeaturesJob', + message: 'Job not published (duplicate or throttled)', + submissionId: data.submissionId + }); + + return { status: 'duplicate', message: 'Job already exists for this submission' }; + } catch (error) { + const errorMessage = error instanceof Error ? error.message : 'Unknown error'; + + defaultLog.error({ + label: 'publishIndexSubmissionFeaturesJob', + message: 'Failed to publish job', + submissionId: data.submissionId, + error + }); + + return { status: 'error', message: errorMessage }; + } +}; diff --git a/api/src/queue/worker.test.ts b/api/src/queue/worker.test.ts index af009ae82..ad0d2b7d9 100644 --- a/api/src/queue/worker.test.ts +++ b/api/src/queue/worker.test.ts @@ -2,6 +2,7 @@ import { expect } from 'chai'; import { describe } from 'mocha'; import sinon from 'sinon'; import { JobQueues } from './jobs'; +import * as indexSubmissionFeaturesJob from './jobs/index-submission-features-job'; import * as malwareScanJob from './jobs/malware-scan-job'; import * as processSubmissionFeaturesJob from './jobs/process-submission-features-job'; import * as pgBossService from './pg-boss-service'; @@ -51,14 +52,16 @@ describe('worker', () => { await registerWorkers(); // createQueue is called for all queues (including dead letter queues) - // 6 queues: PROCESS_SUBMISSION_FEATURES + FAILED, MALWARE_SCAN + FAILED, PROCESS_DOWNLOAD + FAILED - expect(createQueueStub.callCount).to.equal(6); + // 8 queues: PROCESS_SUBMISSION_FEATURES + FAILED, MALWARE_SCAN + FAILED, PROCESS_DOWNLOAD + FAILED, INDEX_SUBMISSION_FEATURES + FAILED + expect(createQueueStub.callCount).to.equal(8); expect(createQueueStub.firstCall.args[0]).to.equal(JobQueues.PROCESS_SUBMISSION_FEATURES_FAILED); expect(createQueueStub.secondCall.args[0]).to.equal(JobQueues.PROCESS_SUBMISSION_FEATURES); expect(createQueueStub.thirdCall.args[0]).to.equal(JobQueues.MALWARE_SCAN_FAILED); expect(createQueueStub.getCall(3).args[0]).to.equal(JobQueues.MALWARE_SCAN); expect(createQueueStub.getCall(4).args[0]).to.equal(JobQueues.PROCESS_DOWNLOAD_FAILED); expect(createQueueStub.getCall(5).args[0]).to.equal(JobQueues.PROCESS_DOWNLOAD); + expect(createQueueStub.getCall(6).args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES_FAILED); + expect(createQueueStub.getCall(7).args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES); }); it('configures dead letter queue for process-submission-features', async () => { @@ -90,5 +93,38 @@ describe('worker', () => { expect(workStub.secondCall.args[0]).to.equal(JobQueues.PROCESS_SUBMISSION_FEATURES_FAILED); expect(workStub.secondCall.args[1]).to.equal(processSubmissionFeaturesJob.processSubmissionFeaturesFailedHandler); }); + + it('registers the index submission features job handler with pg-boss', async () => { + const workStub = sinon.stub().resolves(); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { work: workStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + await registerWorkers(); + + // Index submission features handlers are registered after download handlers + expect(workStub.getCall(6).args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES); + expect(workStub.getCall(6).args[1]).to.equal(indexSubmissionFeaturesJob.indexSubmissionFeaturesJobHandler); + + expect(workStub.getCall(7).args[0]).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES_FAILED); + expect(workStub.getCall(7).args[1]).to.equal(indexSubmissionFeaturesJob.indexSubmissionFeaturesFailedHandler); + }); + + it('configures dead letter queue for index-submission-features', async () => { + const workStub = sinon.stub().resolves(); + const createQueueStub = sinon.stub().resolves(); + const mockBoss = { work: workStub, createQueue: createQueueStub }; + + sinon.stub(pgBossService, 'getPgBoss').returns(mockBoss as any); + + await registerWorkers(); + + // 8th createQueue call (INDEX_SUBMISSION_FEATURES) should have DLQ config + const queueConfig = createQueueStub.getCall(7).args[1]; + expect(queueConfig.deadLetter).to.equal(JobQueues.INDEX_SUBMISSION_FEATURES_FAILED); + expect(queueConfig.retryLimit).to.equal(3); + expect(queueConfig.retryBackoff).to.equal(true); + }); }); }); diff --git a/api/src/queue/worker.ts b/api/src/queue/worker.ts index 449cf7108..7f3e8a0b5 100644 --- a/api/src/queue/worker.ts +++ b/api/src/queue/worker.ts @@ -1,5 +1,10 @@ import { getLogger } from '../utils/logger'; import { JobQueues } from './jobs'; +import { + IIndexSubmissionFeaturesJobData, + indexSubmissionFeaturesFailedHandler, + indexSubmissionFeaturesJobHandler +} from './jobs/index-submission-features-job'; import { IMalwareScanJobData, malwareScanFailedHandler, malwareScanJobHandler } from './jobs/malware-scan-job'; import { IProcessDownloadJobData, @@ -81,6 +86,29 @@ export const registerWorkers = async (): Promise => { // Register dead letter queue handler for failed download jobs await boss.work(JobQueues.PROCESS_DOWNLOAD_FAILED, processDownloadFailedHandler); + // Create dead letter queue first (must exist before main queue references it) + await boss.createQueue(JobQueues.INDEX_SUBMISSION_FEATURES_FAILED); + + // Create main queue with dead letter queue and retry configuration + await boss.createQueue(JobQueues.INDEX_SUBMISSION_FEATURES, { + deadLetter: JobQueues.INDEX_SUBMISSION_FEATURES_FAILED, + retryLimit: 3, + retryDelay: 60, + retryBackoff: true + }); + + // Register index submission features job handler + await boss.work( + JobQueues.INDEX_SUBMISSION_FEATURES, + indexSubmissionFeaturesJobHandler + ); + + // Register dead letter queue handler for failed index submission features jobs + await boss.work( + JobQueues.INDEX_SUBMISSION_FEATURES_FAILED, + indexSubmissionFeaturesFailedHandler + ); + defaultLog.info({ label: 'registerWorkers', message: 'Workers registered', diff --git a/api/src/repositories/search-feature-repository.test.ts b/api/src/repositories/search-feature-repository.test.ts index 726c5ef6c..1e7a3bbeb 100644 --- a/api/src/repositories/search-feature-repository.test.ts +++ b/api/src/repositories/search-feature-repository.test.ts @@ -18,6 +18,36 @@ describe('SearchFeatureRepository', () => { Sinon.restore(); }); + describe('deleteSearchRecordsBySubmissionId', () => { + it('should delete from all 4 search tables for the given submission', async () => { + const knexSpy = Sinon.stub().resolves({ rowCount: 0, rows: [] }); + + const mockDBConnection = getMockDBConnection({ + knex: knexSpy + }); + + const repository = new SearchFeatureRepository(mockDBConnection); + + await repository.deleteSearchRecordsBySubmissionId(777); + + expect(knexSpy.callCount).to.equal(4); + }); + + it('should succeed when no existing records to delete', async () => { + const knexSpy = Sinon.stub().resolves({ rowCount: 0, rows: [] }); + + const mockDBConnection = getMockDBConnection({ + knex: knexSpy + }); + + const repository = new SearchFeatureRepository(mockDBConnection); + + await repository.deleteSearchRecordsBySubmissionId(999); + + expect(knexSpy.callCount).to.equal(4); + }); + }); + describe('insertSearchableDatetimeRecords', () => { it('should succeed on insert with matching row count', async () => { const mockRows: DatetimeSearchableRecord[] = [ diff --git a/api/src/repositories/search-feature-repository.ts b/api/src/repositories/search-feature-repository.ts index 8e5d3244b..6fcb16172 100644 --- a/api/src/repositories/search-feature-repository.ts +++ b/api/src/repositories/search-feature-repository.ts @@ -31,6 +31,34 @@ const defaultLog = getLogger('repositories/search-feature-repository'); * Supports keyword, feature type, species, and property filters with relevancy scoring. */ export class SearchFeatureRepository extends BaseRepository { + /** + * Deletes all existing search records (string, number, datetime, spatial) for features + * belonging to the given submission. Used before re-indexing to ensure idempotency — + * job retries and manual re-indexing would otherwise accumulate duplicate records + * because the search tables have no unique constraint on (submission_feature_id, feature_property_id). + * + * @param {number} submissionId - The submission whose search records should be cleared + * @return {Promise} + */ + async deleteSearchRecordsBySubmissionId(submissionId: number): Promise { + defaultLog.debug({ label: 'deleteSearchRecordsBySubmissionId', message: 'start', submissionId }); + + const knex = getKnex(); + const featureIdSubquery = knex + .select('submission_feature_id') + .from('submission_feature') + .where('submission_id', submissionId); + + const tables = ['search_string', 'search_number', 'search_datetime', 'search_spatial']; + + await Promise.all( + tables.map((table) => { + const qb = knex.queryBuilder().delete().from(table).whereIn('submission_feature_id', featureIdSubquery); + return this.connection.knex(qb); + }) + ); + } + /** * Inserts searchable datetime records into the search_datetime table. * @param datetimeRecords - Array of datetime records to insert diff --git a/api/src/services/search-feature-service.test.ts b/api/src/services/search-feature-service.test.ts index 66e67ecbd..adc5b4853 100644 --- a/api/src/services/search-feature-service.test.ts +++ b/api/src/services/search-feature-service.test.ts @@ -23,6 +23,8 @@ describe('SearchFeatureService', () => { const searchFeatureService = new SearchFeatureService(mockDBConnection); + sinon.stub(SearchFeatureRepository.prototype, 'deleteSearchRecordsBySubmissionId').resolves(); + const getSubmissionFeaturesStub = sinon .stub(SubmissionRepository.prototype, 'getSubmissionFeaturesBySubmissionId') .resolves([ @@ -289,6 +291,8 @@ describe('SearchFeatureService', () => { const searchFeatureService = new SearchFeatureService(mockDBConnection); + sinon.stub(SearchFeatureRepository.prototype, 'deleteSearchRecordsBySubmissionId').resolves(); + sinon.stub(SubmissionRepository.prototype, 'getSubmissionFeaturesBySubmissionId').resolves([ { submission_feature_id: 11111, @@ -326,6 +330,8 @@ describe('SearchFeatureService', () => { const searchFeatureService = new SearchFeatureService(mockDBConnection); + sinon.stub(SearchFeatureRepository.prototype, 'deleteSearchRecordsBySubmissionId').resolves(); + sinon.stub(SubmissionRepository.prototype, 'getSubmissionFeaturesBySubmissionId').resolves([ { submission_feature_id: 11111, @@ -391,6 +397,75 @@ describe('SearchFeatureService', () => { } ]); }); + + it('should call delete before insert for idempotency', async () => { + const mockDBConnection = getMockDBConnection(); + const searchFeatureService = new SearchFeatureService(mockDBConnection); + + const deleteStub = sinon.stub(SearchFeatureRepository.prototype, 'deleteSearchRecordsBySubmissionId').resolves(); + + sinon.stub(SubmissionRepository.prototype, 'getSubmissionFeaturesBySubmissionId').resolves([ + { + submission_feature_id: 11111, + submission_id: 777, + feature_type_id: 1, + urn: 'urn:777:dataset:11111', + data: { name: 'Dataset1' }, + source_id: '123', + uuid: '123-456-789', + parent_submission_feature_id: null, + record_effective_date: '2024-01-01', + record_end_date: null, + create_date: '2024-01-01', + create_user: 1, + update_date: null, + update_user: null, + revision_count: 0, + feature_type_name: 'dataset', + feature_type_display_name: 'Dataset', + submission_feature_security_ids: [] + } + ]); + + sinon.stub(CodeService.prototype, 'getFeatureTypePropertyCodes').resolves([ + { + feature_type: { feature_type_id: 1, feature_type_name: 'dataset', feature_type_display_name: 'Dataset' }, + feature_type_properties: [ + { + feature_property_id: 1, + feature_property_name: 'name', + feature_property_display_name: 'Name', + feature_property_type_id: 1, + feature_property_type_name: 'string' + } + ] + } + ]); + + const insertStub = sinon.stub(SearchFeatureRepository.prototype, 'insertSearchableStringRecords'); + + await searchFeatureService.indexFeaturesBySubmissionId(777); + + expect(deleteStub).to.have.been.calledOnceWith(777); + expect(deleteStub).to.have.been.calledBefore(insertStub); + }); + + it('should call delete even when no features exist for the submission', async () => { + const mockDBConnection = getMockDBConnection(); + const searchFeatureService = new SearchFeatureService(mockDBConnection); + + const deleteStub = sinon.stub(SearchFeatureRepository.prototype, 'deleteSearchRecordsBySubmissionId').resolves(); + + sinon.stub(SubmissionRepository.prototype, 'getSubmissionFeaturesBySubmissionId').resolves([]); + sinon.stub(CodeService.prototype, 'getFeatureTypePropertyCodes').resolves([]); + + const insertStub = sinon.stub(SearchFeatureRepository.prototype, 'insertSearchableStringRecords'); + + await searchFeatureService.indexFeaturesBySubmissionId(777); + + expect(deleteStub).to.have.been.calledOnceWith(777); + expect(insertStub).not.to.have.been.called; + }); }); describe('searchFeatures', () => { diff --git a/api/src/services/search-feature-service.ts b/api/src/services/search-feature-service.ts index cb4af7297..acea5b7ab 100644 --- a/api/src/services/search-feature-service.ts +++ b/api/src/services/search-feature-service.ts @@ -68,12 +68,21 @@ export class SearchFeatureService extends DBService { * Creates search indexes for datetime, number, spatial and string properties belonging to * all features found for the given submission. * + * Deletes existing search records first for idempotency — job retries and manual re-indexing + * can run this multiple times for the same submission. Without delete-before-insert, duplicate + * records accumulate because the search tables have no unique constraint on + * (submission_feature_id, feature_property_id). Upsert was rejected because it can't clean up + * orphaned rows when properties are removed between runs. + * * @param {number} submissionId * @return {Promise} */ async indexFeaturesBySubmissionId(submissionId: number): Promise { defaultLog.debug({ label: 'indexFeaturesBySubmissionId', message: 'start', submissionId }); + // Delete existing search records for idempotency (safe for retries and manual re-indexing) + await this.searchFeatureRepository.deleteSearchRecordsBySubmissionId(submissionId); + const datetimeRecords: InsertDatetimeSearchableRecord[] = []; const numberRecords: InsertNumberSearchableRecord[] = []; const spatialRecords: InsertSpatialSearchableRecord[] = [];