Skip to content
171 changes: 93 additions & 78 deletions functions/src/events/scrapeEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,76 @@ class SessionScraper extends EventScraper<SessionContent, Session> {
}
}

const submitTranscription = async ({
EventId,
maybeVideoUrl
}: {
EventId: number
maybeVideoUrl: string
}) => {
const newToken = randomBytes(16).toString("hex")

const transcript = await assembly.transcripts.submit({
audio:
// test with: "https://assemblyaiusercontent.com/playground/aKUqpEtmYmI.flac",
maybeVideoUrl,
webhook_url:
// test with: "https://ngrokid.ngrok-free.app/demo-dtp/us-central1/transcription",
process.env.NODE_ENV === "development"
? "https://us-central1-digital-testimony-dev.cloudfunctions.net/transcription"
: "https://us-central1-digital-testimony-prod.cloudfunctions.net/transcription",
speaker_labels: true,
webhook_auth_header_name: "x-maple-webhook",
webhook_auth_header_value: newToken
})

await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.collection("private")
.doc("webhookAuth")
.set({
videoAssemblyWebhookToken: sha256(newToken)
})

return transcript.id
}

const getHearingVideoUrl = async (EventId: number) => {
const req = await fetch(
`https://malegislature.gov/Events/Hearings/Detail/${EventId}`
)
const res = await req.text()
if (res) {
const dom = new JSDOM(res)
if (dom) {
const maybeVideoSource =
dom.window.document.querySelectorAll("video source")
if (maybeVideoSource.length && maybeVideoSource[0]) {
const firstVideoSource = maybeVideoSource[0] as HTMLSourceElement
return firstVideoSource.src
}
}
}
return null
}

const shouldScrapeVideo = async (EventId: number) => {
const eventInDb = await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.get()
const eventData = eventInDb.data()

if (!eventData) {
return false
}
if (!eventData.videoFetchedAt) {
return withinCutoff(new Date(Hearing.check(eventData).startsAt.toDate()))
}
return false
}

class HearingScraper extends EventScraper<HearingListItem, Hearing> {
constructor() {
super("every 60 minutes", 240)
Expand All @@ -150,88 +220,33 @@ class HearingScraper extends EventScraper<HearingListItem, Hearing> {
async getEvent({ EventId }: HearingListItem /* e.g. 4962 */) {
const data = await api.getHearing(EventId)
const content = HearingContent.check(data)
const eventInDb = await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.get()
const eventData = eventInDb.data()
const hearing = Hearing.check(eventData)
const shouldScrape = withinCutoff(hearing.startsAt.toDate())

let payload: Hearing = {

if (await shouldScrapeVideo(EventId)) {
const maybeVideoUrl = await getHearingVideoUrl(EventId)
if (maybeVideoUrl) {
const transcriptId = await submitTranscription({
maybeVideoUrl,
EventId
})

return {
id: `hearing-${EventId}`,
type: "hearing",
content,
...this.timestamps(content),
videoURL: maybeVideoUrl,
videoFetchedAt: Timestamp.now(),
videoTranscriptionId: transcriptId // using the assembly Id as our transcriptionId
} as Hearing
}
}

return {
id: `hearing-${EventId}`,
type: "hearing",
content,
...this.timestamps(content)
}
if (hearing) {
payload = {
...payload,
videoURL: hearing.videoURL,
videoFetchedAt: hearing.videoFetchedAt,
videoAssemblyId: hearing.videoAssemblyId
}
}
let maybeVideoURL = null
let transcript = null

if (!hearing.videoFetchedAt && shouldScrape) {
const req = await fetch(
`https://malegislature.gov/Events/Hearings/Detail/${EventId}`
)
const res = await req.text()
if (res) {
const dom = new JSDOM(res)
if (dom) {
const maybeVideoSource =
dom.window.document.querySelectorAll("video source")
if (maybeVideoSource.length && maybeVideoSource[0]) {
const newToken = randomBytes(16).toString("hex")
const firstVideoSource = maybeVideoSource[0] as HTMLSourceElement
maybeVideoURL = firstVideoSource.src

transcript = await assembly.transcripts.submit({
webhook_url:
process.env.NODE_ENV === "development"
? "https://us-central1-digital-testimony-dev.cloudfunctions.net/transcription"
: "https://us-central1-digital-testimony-prod.cloudfunctions.net/transcription",
webhook_auth_header_name: "X-Maple-Webhook",
webhook_auth_header_value: newToken,
audio: firstVideoSource.src,
auto_highlights: true,
custom_topics: true,
entity_detection: true,
iab_categories: false,
format_text: true,
punctuate: true,
speaker_labels: true,
summarization: true,
summary_model: "informative",
summary_type: "bullets"
})

await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.collection("private")
.doc("webhookAuth")
.set({
videoAssemblyWebhookToken: sha256(newToken)
})

payload = {
...payload,
videoURL: maybeVideoURL,
videoFetchedAt: Timestamp.now(),
videoAssemblyId: transcript.id
}
}
}
}
}

const event: Hearing = payload
return event
} as Hearing
}
}

Expand Down
69 changes: 53 additions & 16 deletions functions/src/webhooks/transcription.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,26 @@
import * as functions from "firebase-functions"
import { AssemblyAI } from "assemblyai"
import { db } from "../firebase"
import { db, Timestamp } from "../firebase"
import { sha256 } from "js-sha256"

const assembly = new AssemblyAI({
apiKey: process.env.ASSEMBLY_API_KEY ? process.env.ASSEMBLY_API_KEY : ""
})

export const transcription = functions.https.onRequest(async (req, res) => {
if (
req.headers["X-Maple-Webhook"] &&
req.headers["webhook_auth_header_value"]
) {
if (req.headers["x-maple-webhook"]) {
if (req.body.status === "completed") {
const transcript = await assembly.transcripts.get(req.body.transcript_id)
if (transcript && transcript.webhook_auth) {
const maybeEventInDb = await db
.collection("events")
.where("videoAssemblyId", "==", transcript.id)
.get()

if (maybeEventInDb.docs.length) {
const authenticatedEventsInDb = maybeEventInDb.docs.filter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking here - does this successfully filter out requests where the tokens don't match?

Now that I think about it, I would expect the async filter function to return a Promise that resolves to a boolean instead of a boolean - and since the Promise itself would always be truthy, this check would always return true and never filter anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so based on return hashedToken === tokenInDbData.videoAssemblyWebhookToken

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we've manually tested with a mismatched token and ensured the request was rejected, I'll drop this - but I still believe the async filter may be an issue - sample code below, see also https://stackoverflow.com/a/71600833:

const test = [1, 2, 3, 4]

// sync filter - result is [2, 4]
test.filter(e => e % 2 == 0)

// async filter = result is [1, 2, 3, 4]
test.filter(async e => e % 2 == 0)

async e => {
const hashedToken = sha256(
String(req.headers["webhook_auth_header_value"])
)
const hashedToken = sha256(String(req.headers["x-maple-webhook"]))

const tokenInDb = await db
.collection("events")
Expand All @@ -33,25 +29,66 @@ export const transcription = functions.https.onRequest(async (req, res) => {
.doc("webhookAuth")
.get()
const tokenInDbData = tokenInDb.data()

if (tokenInDbData) {
return hashedToken === tokenInDbData.videoAssemblyWebhookToken
}
return false
}
)

const { id, text, audio_url, utterances, words } = transcript
if (authenticatedEventsInDb) {
try {
await db
const transcriptionInDb = await db
.collection("transcriptions")
.doc(transcript.id)
.set({ _timestamp: new Date(), ...transcript })
.doc(id)

await transcriptionInDb.set({
id,
text,
createdAt: Timestamp.now(),
audio_url
})

if (utterances) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the expected scale of utterances (i.e. several dozen per document), I believe this is fine.

Firestore does generally caution against using sequential ids because it can lead to hotspotting - but given that we'll be disabling indexes for both utterances and words subcollections and our only initial query plan is to "fetchAll" the utterances subcollection, I don't think it makes much of a difference here.

If this does end up causing an issue for utterances, we can always switch to an autogenerated documentId and just use ordering on start in queries to maintain sequential order.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Come to think of it, this would have a potential problem if we ever re-transcribe the same document (e.g. with different settings) - the utterance divisions and start times could change and would only partially overwrite any existing data.

We don't have any plans for that right now - I just want to note a caveat that we need to delete any existing transcriptions/utterances/words in the DB before re-transcribing (should that ever prove necessary).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just going to go ahead and preemptively change this to a start. If I just remove ${utterance.start} from the doc path, that should hand over the ID generation to firestore, correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can test in the firebase-admin repl to be sure, but I think you might also need the more explicit method for a subcollection autogenerated id: e.g.

 db.collection("transcriptions").doc(`${transcript.id}`).collection("utterances").doc()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Reading this line of my last commit back and realizing it wouldn't work.

const writer = db.bulkWriter()
for (let utterance of utterances) {
const { speaker, confidence, start, end, text } = utterance
writer.set(
db.doc(
`/transcriptions/${transcript.id}/utterances/${utterance.start}`
),
{ speaker, confidence, start, end, text }
)
}

authenticatedEventsInDb.forEach(async d => {
await d.ref.update({
["webhook_auth_header_value"]: null
})
await writer.close()
}

if (words) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there are tens of thousands of words per document and we're using a sequential doc id, I'm worried this could potentially be problematically slow for a webhook.

e.g.:

  • Firestore default is 500 writes / second for new collections or those with sequential indexes.
  • ~50,000 words in an average hearing / 500 = ~100 seconds to write all the word docs to firestore.
  • The AssemblyAI webhook response timeout is 10 seconds, and they retry up to 10 times before failing permanently.
  • Therefore, we could potentially see a ton of repeated firestore writes before failing anyway.

Switching to auto-generated doc ids might help to avoid the sequential index issue (there's no documented writes/second cap on those), and we could also partition multiple words into one doc to cut down on doc count (e.g. 1000 words per doc as an in-between for 1 word per doc and all words in one doc).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That said, this is getting more complicated than it's worth for a "nice-to-have" feature. How does the following sound for a rollout plan?

  • Don't save words to the DB at all in this PR - only save the transcription metadata and the utterances
  • Merge + Deploy this and have the flow run against a small subset of hearings on the Dev environment (IMO the default behavior is safe since it should only scrape recent hearings and there are only a few of those at any given time).
  • While we get a few hearings to run through the flow, I can tweak my hacky test front-end for transcriptions to support the new utterances subcollection format.
  • We'll have Matt V + a few others look at the transcribed hearings and confirm that we only need utterances
    • I suspect the visual UI will help get a better feel for the accuracy of the diarization than we've previously gotten by skimming example data.

At this point, we'll either have confirmation that we're fine with just utterances, or will have a better sense for how to organize words to get some use out of it.

  • If we're fine with just utterances, we can just run the backfill to transcribe all past hearings
  • If we find utterances not good enough for V1, we should have a quick chat
    • V1 isn't supposed to make promises around transcription accuracy, so anything complicated is likely out of scope here, but it's possible we'll be fine with just the steps I mentioned in the above comments (to effectively hack together pseudo-utterances out of the words data).

This approach should let us unblock this PR and get a better sense for how well our current approach plays with real data. Thoughts?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds very sensible to me, I was starting to think the same thing. Pulling words out in the next commit.

const writer = db.bulkWriter()
for (let word of words) {
writer.set(
db.doc(
`/transcriptions/${transcript.id}/words/${word.start}`
),
word
)
}

await writer.close()
}

const batch = db.batch()
batch.set(db.collection("transcriptions").doc(transcript.id), {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we've already saved the transcription once at this point, can this just be a batch.update to update the _timestamp? (And come to think of it, now that we renamed the other timestamp to createdAt, should this be updatedAt?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE the async filter, I haven't tested this with a mismatching webhook, but I see your point. Seems like firestore doesn't have a findunique api. Any suggestions for this? I'm worried about working off of maybeEventInDb[0], but maybe I'm over thinking it? It's not like we would end up with two docs with the same videoAssemblyId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE the timestamp, I think we can actually just delete that batch. I don't think we need a created at and a timestamp. My bad for leaving it in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wouldn't expect the Firestore API itself to help us with the async filter issue.

It looks like the easiest way to do that is to first map the events to something like a Promise<Event|null> that resolves to the event when it's valid and resolves to null when the event is invalid, then filter out the nulls afterwards.

Then you can do something like:

async function getValidEvent(event: Event): Promise<Event|null> {
  // whatever async/await logic
}

const authenticatedEvents = (await Promise.all(events.map(getValidEvent))).filter(e => e)

I also wouldn't turn down something more iterative like:

const authenticatedEvents = []

docs.forEach(async doc => {
  const otherDoc = await getOtherDoc(whatever)
  
  if (isValid(doc, otherDoc)) {
    authenticatedEvents.push(doc)
  }
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the iterative approach is more readable. I'll do that.

_timestamp: Timestamp.now(),
...transcript
})
authenticatedEventsInDb.forEach(doc => {
batch.update(doc.ref, { ["x-maple-webhook"]: null })
})
console.log("transcript saved in db")
await batch.commit()
} catch (error) {
console.log(error)
}
Expand Down