diff --git a/functions/src/events.ts b/functions/src/events.ts index f890e10..10fe883 100644 --- a/functions/src/events.ts +++ b/functions/src/events.ts @@ -70,13 +70,3 @@ export const recordPendingEvent = async (change, doc) => { data: { doc }, }); }; - -export const recordRetryEvent = async (change, doc) => { - if (!eventChannel) return; - - return eventChannel.publish({ - type: "firebase.extensions.firestore-web-scraper.v1.onRetry", - subject: change.after.id, - data: { doc }, - }); -}; diff --git a/functions/src/index.ts b/functions/src/index.ts index d0cb869..ca941a0 100644 --- a/functions/src/index.ts +++ b/functions/src/index.ts @@ -29,29 +29,24 @@ async function initialize() { } export const processQueue = onDocumentCreated(config.scrapeCollection, - async ( - snapshot: FirestoreEvent, - ) => { - await initialize(); - logs.start(); - - try { - await processWrite(snapshot.data); - } catch (err) { - await events.recordErrorEvent( - snapshot.data.data(), - `Unhandled error occurred during processing: ${err.message}"` - ); - logs.unhandledError(err); - return null; - } - - /** record complete event */ - await events.recordCompleteEvent(snapshot); - - logs.complete(); + async ( + snapshot: FirestoreEvent, + ) => { + await initialize(); + logs.start(); + + try { + await processWrite(snapshot.data); + } catch (err) { + await handleUnhandledError(snapshot, err); + return null; } - ); + + await handleCompleteEvent(snapshot); + + logs.complete(); + } +); async function processWrite( snapshot: QueryDocumentSnapshot @@ -65,60 +60,149 @@ async function processWrite( const task: Task = snapshot.data() as Task; const doc = db.collection(config.scrapeCollection).doc(snapshot.id); + // Record pending event for the newly created task + const change = { after: { id: snapshot.id, ...task } }; + await events.recordPendingEvent(change, snapshot.data()); + // The task is invalid, set the error and return const isNotValid = validateTask(task); // is a message (invalid) or null (valid) if (isNotValid) { - await doc.update({ - ...task, - error: isNotValid, - startedAt: startedAtTimestamp, - concludedAt: Timestamp.now(), - stage: TaskStage.ERROR - }); - logs.error(isNotValid); - + await handleInvalidTask(doc, task, isNotValid, startedAtTimestamp); return; } + // Record start event before beginning processing + await events.recordStartEvent(change); + + await setTaskProcessing(doc, task, startedAtTimestamp, snapshot.id); + + try { + await handleTaskProcessing(doc, task, startedAtTimestamp, snapshot.id); + } catch (err) { + await handleTaskError(doc, task, startedAtTimestamp, snapshot, err); + } +} + +async function handleInvalidTask( + doc: admin.firestore.DocumentReference, + task: Task, + errorMsg: string, + startedAtTimestamp: FirebaseFirestore.Timestamp +) { + const errorDoc = { + ...task, + error: errorMsg, + startedAt: startedAtTimestamp, + concludedAt: Timestamp.now(), + stage: TaskStage.ERROR + }; + + await doc.update(errorDoc); + + // Record error event for invalid task + await events.recordErrorEvent({ + after: { + id: doc.id, + ...errorDoc, + } + }, new Error(errorMsg)); + + logs.error(errorMsg); +} + +async function setTaskProcessing( + doc: admin.firestore.DocumentReference, + task: Task, + startedAtTimestamp: FirebaseFirestore.Timestamp, + taskId: string +) { + const processingDoc = { + ...task, + startedAt: startedAtTimestamp, + stage: TaskStage.PROCESSING, + }; + + await doc.update(processingDoc); + + // Record processing event when stage is set to processing + await events.recordProcessingEvent({ + after: { + id: taskId, + ...processingDoc, + } + }); + + logs.debug(`Processing task: ${taskId}`); +} + +async function handleTaskProcessing( + doc: admin.firestore.DocumentReference, + task: Task, + startedAtTimestamp: FirebaseFirestore.Timestamp, + taskId: string +) { const { url, queries } = task; + // Request the data from the URL + const queriable = await sendHttpRequestTo(url); - // Set the task to processing + logs.debug(`Received data from ${url}: ${queriable.html}`); + // Run the queries on the data + const data = queriable.multiQuery(queries); + + const successDoc = { + ...task, + data: { ...data }, + startedAt: startedAtTimestamp, + concludedAt: Timestamp.now(), + stage: TaskStage.SUCCESS, + }; + + // Set the data in the Firestore document + await doc.update(successDoc); + + // Record success event when task completes successfully + await events.recordSuccessEvent({ + after: { + id: taskId, + ...successDoc, + } + }); + + logs.debug(`Task successful: ${taskId}`); +} + +async function handleTaskError( + doc: admin.firestore.DocumentReference, + task: Task, + startedAtTimestamp: FirebaseFirestore.Timestamp, + snapshot: QueryDocumentSnapshot, + err: any +) { await doc.update({ ...task, + error: err.toString().replace(/^Error: /, ''), startedAt: startedAtTimestamp, - stage: TaskStage.PROCESSING, + concludedAt: Timestamp.now(), + stage: TaskStage.ERROR, }); - logs.debug(`Processing task: ${snapshot.id}`); - try { - // Request the data from the URL - const queriable = await sendHttpRequestTo(url); - - logs.debug(`Received data from ${url}: ${queriable.html}`); - // Run the queries on the data - const data = queriable.multiQuery(queries); - - // Set the data in the Firestore document - await doc.update({ - ...task, - data: { ...data }, - startedAt: startedAtTimestamp, - concludedAt: Timestamp.now(), - stage: TaskStage.SUCCESS, - }); - - logs.debug(`Task successful: ${snapshot.id}`); - } catch (err) { - // Something went wrong, set the error and return - await doc.update({ - ...task, - error: err.toString().replace(/^Error: /, ''), - startedAt: startedAtTimestamp, - concludedAt: Timestamp.now(), - stage: TaskStage.ERROR, - }); - - await events.recordErrorEvent(snapshot, err); - logs.unhandledError(err); - } + await events.recordErrorEvent(snapshot, err); + logs.unhandledError(err); +} + +async function handleUnhandledError( + snapshot: FirestoreEvent, + err: any +) { + await events.recordErrorEvent( + snapshot.data.data(), + `Unhandled error occurred during processing: ${err.message}"` + ); + logs.unhandledError(err); +} + +async function handleCompleteEvent( + snapshot: FirestoreEvent +) { + await events.recordCompleteEvent(snapshot); } \ No newline at end of file