Skip to content
170 changes: 92 additions & 78 deletions functions/src/events/scrapeEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,75 @@ class SessionScraper extends EventScraper<SessionContent, Session> {
}
}

const submitTranscription = async ({
EventId,
maybeVideoUrl
}: {
EventId: number
maybeVideoUrl: string
}) => {
const newToken = randomBytes(16).toString("hex")

const transcript = await assembly.transcripts.submit({
audio:
// test with: "https://assemblyaiusercontent.com/playground/aKUqpEtmYmI.flac",
maybeVideoUrl,
webhook_url:
// test with: "https://ngrokid.ngrok-free.app/demo-dtp/us-central1/transcription",
process.env.NODE_ENV === "development"
? "https://us-central1-digital-testimony-dev.cloudfunctions.net/transcription"
: "https://us-central1-digital-testimony-prod.cloudfunctions.net/transcription",
speaker_labels: true,
webhook_auth_header_name: "x-maple-webhook",
webhook_auth_header_value: newToken
})

await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.collection("private")
.doc("webhookAuth")
.set({
videoAssemblyWebhookToken: sha256(newToken)
})

return transcript.id
}

const getHearingVideoUrl = async (EventId: number) => {
const req = await fetch(
`https://malegislature.gov/Events/Hearings/Detail/${EventId}`
)
const res = await req.text()
if (res) {
const dom = new JSDOM(res)
if (dom) {
const maybeVideoSource =
dom.window.document.querySelectorAll("video source")
if (maybeVideoSource.length && maybeVideoSource[0]) {
const firstVideoSource = maybeVideoSource[0] as HTMLSourceElement
return firstVideoSource.src
}
}
}
return null
}

const shouldScrapeVideo = async (EventId: number) => {
const eventInDb = await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.get()
const eventData = eventInDb.data()
if (!eventData) {
return false
}
if (!eventData.videoFetchedAt) {
return withinCutoff(new Date(eventData.StartTime))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did this change from Hearing.check(eventData).startsAt.toDate()? I don't think eventData has a StartTime field.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I misunderstand an earlier review comment. I'll change this in the next commit.

}
return false
}

class HearingScraper extends EventScraper<HearingListItem, Hearing> {
constructor() {
super("every 60 minutes", 240)
Expand All @@ -150,88 +219,33 @@ class HearingScraper extends EventScraper<HearingListItem, Hearing> {
async getEvent({ EventId }: HearingListItem /* e.g. 4962 */) {
const data = await api.getHearing(EventId)
const content = HearingContent.check(data)
const eventInDb = await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.get()
const eventData = eventInDb.data()
const hearing = Hearing.check(eventData)
const shouldScrape = withinCutoff(hearing.startsAt.toDate())

let payload: Hearing = {

if (await shouldScrapeVideo(EventId)) {
const maybeVideoUrl = await getHearingVideoUrl(EventId)
if (maybeVideoUrl) {
const transcriptId = await submitTranscription({
maybeVideoUrl,
EventId
})

return {
id: `hearing-${EventId}`,
type: "hearing",
content,
...this.timestamps(content),
videoURL: maybeVideoUrl,
videoFetchedAt: Timestamp.now(),
videoTranscriptionId: transcriptId // using the assembly Id as our transcriptionId
} as Hearing
}
}

return {
id: `hearing-${EventId}`,
type: "hearing",
content,
...this.timestamps(content)
}
if (hearing) {
payload = {
...payload,
videoURL: hearing.videoURL,
videoFetchedAt: hearing.videoFetchedAt,
videoAssemblyId: hearing.videoAssemblyId
}
}
let maybeVideoURL = null
let transcript = null

if (!hearing.videoFetchedAt && shouldScrape) {
const req = await fetch(
`https://malegislature.gov/Events/Hearings/Detail/${EventId}`
)
const res = await req.text()
if (res) {
const dom = new JSDOM(res)
if (dom) {
const maybeVideoSource =
dom.window.document.querySelectorAll("video source")
if (maybeVideoSource.length && maybeVideoSource[0]) {
const newToken = randomBytes(16).toString("hex")
const firstVideoSource = maybeVideoSource[0] as HTMLSourceElement
maybeVideoURL = firstVideoSource.src

transcript = await assembly.transcripts.submit({
webhook_url:
process.env.NODE_ENV === "development"
? "https://us-central1-digital-testimony-dev.cloudfunctions.net/transcription"
: "https://us-central1-digital-testimony-prod.cloudfunctions.net/transcription",
webhook_auth_header_name: "X-Maple-Webhook",
webhook_auth_header_value: newToken,
audio: firstVideoSource.src,
auto_highlights: true,
custom_topics: true,
entity_detection: true,
iab_categories: false,
format_text: true,
punctuate: true,
speaker_labels: true,
summarization: true,
summary_model: "informative",
summary_type: "bullets"
})

await db
.collection("events")
.doc(`hearing-${String(EventId)}`)
.collection("private")
.doc("webhookAuth")
.set({
videoAssemblyWebhookToken: sha256(newToken)
})

payload = {
...payload,
videoURL: maybeVideoURL,
videoFetchedAt: Timestamp.now(),
videoAssemblyId: transcript.id
}
}
}
}
}

const event: Hearing = payload
return event
} as Hearing
}
}

Expand Down
58 changes: 46 additions & 12 deletions functions/src/webhooks/transcription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,19 @@ const assembly = new AssemblyAI({
})

export const transcription = functions.https.onRequest(async (req, res) => {
if (
req.headers["X-Maple-Webhook"] &&
req.headers["webhook_auth_header_value"]
) {
if (req.headers["x-maple-webhook"]) {
if (req.body.status === "completed") {
const transcript = await assembly.transcripts.get(req.body.transcript_id)
if (transcript && transcript.webhook_auth) {
const maybeEventInDb = await db
.collection("events")
.where("videoAssemblyId", "==", transcript.id)
.get()

if (maybeEventInDb.docs.length) {
const authenticatedEventsInDb = maybeEventInDb.docs.filter(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just double-checking here - does this successfully filter out requests where the tokens don't match?

Now that I think about it, I would expect the async filter function to return a Promise that resolves to a boolean instead of a boolean - and since the Promise itself would always be truthy, this check would always return true and never filter anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe so based on return hashedToken === tokenInDbData.videoAssemblyWebhookToken

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as we've manually tested with a mismatched token and ensured the request was rejected, I'll drop this - but I still believe the async filter may be an issue - sample code below, see also https://stackoverflow.com/a/71600833:

const test = [1, 2, 3, 4]

// sync filter - result is [2, 4]
test.filter(e => e % 2 == 0)

// async filter = result is [1, 2, 3, 4]
test.filter(async e => e % 2 == 0)

async e => {
const hashedToken = sha256(
String(req.headers["webhook_auth_header_value"])
)
const hashedToken = sha256(String(req.headers["x-maple-webhook"]))

const tokenInDb = await db
.collection("events")
Expand All @@ -33,24 +29,62 @@ export const transcription = functions.https.onRequest(async (req, res) => {
.doc("webhookAuth")
.get()
const tokenInDbData = tokenInDb.data()
console.log("tokenInDbData", tokenInDbData)

if (tokenInDbData) {
return hashedToken === tokenInDbData.videoAssemblyWebhookToken
}
return false
}
)

const { id, text, audio_url, utterances, words } = transcript
if (authenticatedEventsInDb) {
try {
await db
const transcriptionInDb = db
.collection("transcriptions")
.doc(transcript.id)
.set({ _timestamp: new Date(), ...transcript })

authenticatedEventsInDb.forEach(async d => {
await d.ref.update({
["webhook_auth_header_value"]: null
transcriptionInDb.set({
id,
text,
timestamp: new Date(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be _timestamp to match the timestamp below? If they're actually different timestamps, could we name them accordingly?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, come to think of it, should this be a Timestamp instead of a Date?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure thing, updating in next commit.

audio_url,
words
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As per our discussion the other day, we don't want words in the main transcriptions document - just in the subcollection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oy of course. Fixing in the next commit.

})

transcriptionInDb
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these .set() calls return Promises, so we should either await them all or write them in a transaction batch.

I don't have a strong opinion on which - firestore shouldn't need the root document to exist to create the subcollections, and worst case if one of the writes fails, we'll need to re-run the webhook anyway and that will overwrite any partially saved values.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

adding awaits in next commit

.collection("timestamps")
.doc("utterances")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Based on our discussion the other day, I was expecting two separate sub-collections (utterances and words) where each doc represents one utterance/word. I'm not locked in on that approach necessarily - just wanted to clarify.

I am onboard with storing all of the utterances as one doc for now given how we'll be using it (we can always re-structure without re-scraping later if we find we need the flexibility).

I do have two concerns here, especially re: words:

  • Indexing - I'd rather not index anything with the words since we're not planning to query on them. I don't think Firestore gives us the ability to index only a subset of the documents in a collection.
    • Given our planned initial query pattern (e.g. grab the only utterances doc for a given transcription), I'm fine with just ensuring we don't index anything on utterances/words for launch (which may require adding single-field exemptions) - we can always go back and re-index if we change our query patterns. If that's not in this PR, it should come before we backfill existing hearings.
  • Document Size - there's a stated max size of 1 MB for a Firestore document, and I'm worried stuffing all of the words into one document could hit that limit.
    • For reference, the JSON of the hearing at https://prodarchivevideo.blob.core.windows.net/video/2024/Hearings/Joint/June/26_1.mp4 is ~1.1MB with both words and utterances.words included, and ~110KB without either words array included for a ~45 minute video. Reasonably assuming that splits out to 500KB for words, that could become an issue for hearings over ~90 minutes.
    • Have we tested this with a much longer (3/4+ hour) hearing? It's possible Firestore compacts this by default and this is a false alarm, but I'd rather know now so we can adjust how we store words.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to make a doc for each word and utterance. Is there a more clever/efficient/firestorie way that you would suggest other than iterating over the arrays I already have and making a doc for each object in that loop?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really AFAIK - one caveat would be that we should probably use the BulkWriter (since some of these videos will have dozens of utterances and tens of thousands of words). I don't think we should need sequential ids on these subcollection docs since the startTimestamp should provide a natural ordering, but I'll leave that to your best judgment.

If making words work proves problematic in any way, give me a shout in Slack and we can try another solution (whether trying a different data structure, just dumping the words json into storage and dealing with it later, or ignoring words and just re-scraping if we find we need them).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to one doc per utterance and word. I'm not sure if there is a more efficient way to do to all the db transactions together, so I left everything else as is.

.set({
utterances: utterances?.map(
({ speaker, confidence, start, end, text }) => ({
speaker,
confidence,
start,
end,
text
})
)
})

transcriptionInDb.collection("timestamps").doc("words").set({
words
})

const batch = db.batch()

batch.set(db.collection("transcriptions").doc(transcript.id), {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that we've already saved the transcription once at this point, can this just be a batch.update to update the _timestamp? (And come to think of it, now that we renamed the other timestamp to createdAt, should this be updatedAt?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE the async filter, I haven't tested this with a mismatching webhook, but I see your point. Seems like firestore doesn't have a findunique api. Any suggestions for this? I'm worried about working off of maybeEventInDb[0], but maybe I'm over thinking it? It's not like we would end up with two docs with the same videoAssemblyId.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RE the timestamp, I think we can actually just delete that batch. I don't think we need a created at and a timestamp. My bad for leaving it in.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I wouldn't expect the Firestore API itself to help us with the async filter issue.

It looks like the easiest way to do that is to first map the events to something like a Promise<Event|null> that resolves to the event when it's valid and resolves to null when the event is invalid, then filter out the nulls afterwards.

Then you can do something like:

async function getValidEvent(event: Event): Promise<Event|null> {
  // whatever async/await logic
}

const authenticatedEvents = (await Promise.all(events.map(getValidEvent))).filter(e => e)

I also wouldn't turn down something more iterative like:

const authenticatedEvents = []

docs.forEach(async doc => {
  const otherDoc = await getOtherDoc(whatever)
  
  if (isValid(doc, otherDoc)) {
    authenticatedEvents.push(doc)
  }
})

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the iterative approach is more readable. I'll do that.

_timestamp: new Date(),
...transcript
})

authenticatedEventsInDb.forEach(doc => {
batch.update(doc.ref, { ["x-maple-webhook"]: null })
})

await batch.commit()

console.log("transcript saved in db")
} catch (error) {
console.log(error)
Expand Down