Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 88 additions & 17 deletions ai_ta_backend/rabbitmq/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,11 @@ class Ingest:

def __init__(self):
self.openai_api_key = os.getenv('OPENAI_API_KEY') if os.getenv('OPENAI_API_KEY') else None
print("OpenAI API Key found:", self.openai_api_key)
self.openai_api_base = os.getenv('EMBEDDING_API_BASE') + "/embeddings" if os.getenv('EMBEDDING_API_BASE') else 'https://api.openai.com/v1/embeddings'
self.ncsa_hosted_api_key = self.openai_api_key if self.openai_api_key else os.getenv('NCSA_HOSTED_API_KEY')
# self.ncsa_hosted_api_key = self.openai_api_key if self.openai_api_key else os.getenv('NCSA_HOSTED_API_KEY')
self.ncsa_hosted_api_key = os.getenv('NCSA_HOSTED_API_KEY')
print("NCSA Hosted API Key found:", self.ncsa_hosted_api_key)
self.embedding_model = os.getenv('EMBEDDING_MODEL') if os.getenv('EMBEDDING_MODEL') else 'text-embedding-ada-002'
self.qdrant_url = os.getenv('QDRANT_URL')
self.qdrant_api_key = os.getenv('QDRANT_API_KEY')
Expand Down Expand Up @@ -868,6 +871,46 @@ def _ingest_html(self, s3_path: str, course_name: str, force_embeddings: bool, *
sentry_sdk.capture_exception(e)
return err

def _merge_transcript_segments(self, segments: list, interval_seconds: int = 15) -> list:
"""
Merge Whisper segments by time interval.

Args:
segments: List of Whisper segments, each containing start, end, text
interval_seconds: Merge interval in seconds, default 15

Returns:
List of merged chunks, each containing start_time, end_time, text
"""
if not segments:
return []

merged = []
current_chunk = {
"start_time": segments[0]["start"],
"end_time": segments[0]["end"],
"text": segments[0]["text"]
}

for seg in segments[1:]:
# If current chunk duration is within interval, keep merging
if seg["end"] - current_chunk["start_time"] <= interval_seconds:
current_chunk["end_time"] = seg["end"]
current_chunk["text"] += " " + seg["text"]
else:
# Save current chunk and start a new one
merged.append(current_chunk)
current_chunk = {
"start_time": seg["start"],
"end_time": seg["end"],
"text": seg["text"]
}

# Don't forget the last chunk
merged.append(current_chunk)

return merged

def _ingest_single_video(self, s3_path: str, course_name: str, force_embeddings: bool, **kwargs) -> str:
"""
Ingest a single video file from S3.
Expand Down Expand Up @@ -924,49 +967,77 @@ def _ingest_single_video(self, s3_path: str, course_name: str, force_embeddings:
# load the webm file into audio object
full_audio = AudioSegment.from_file(webm_tmpfile.name, "webm")
file_count = file_size // 26214400 + 1
split_segment = 35 * 60 * 1000
start = 0
split_segment_ms = 35 * 60 * 1000 # 35 minutes in milliseconds
start_ms = 0
count = 0
time_offset_seconds = 0 # Track cumulative time offset

while count < file_count:
with NamedTemporaryFile(suffix=".webm", dir=media_dir, delete=False) as split_tmp:
if count == file_count - 1:
# last segment
audio_chunk = full_audio[start:]
audio_chunk = full_audio[start_ms:]
else:
audio_chunk = full_audio[start:split_segment]
audio_chunk = full_audio[start_ms:start_ms + split_segment_ms]

audio_chunk.export(split_tmp.name, format="webm")

# transcribe the split file and store the text in dictionary
# transcribe the split file
with open(split_tmp.name, "rb") as f:
transcript = openai.Audio.transcribe("whisper-1", f)
transcript_list.append(transcript['text']) # type: ignore
start += split_segment
split_segment += split_segment
transcript = openai.Audio.transcribe("whisper-1", f, response_format="verbose_json")
print("=== TRANSCRIPT ===")
print(transcript)
print("==================")

# Merge segments and adjust timestamps with offset
merged_chunks = self._merge_transcript_segments(transcript.get('segments', []), interval_seconds=15)
for chunk in merged_chunks:
transcript_list.append({
"text": chunk["text"],
"start_time": chunk["start_time"] + time_offset_seconds
})

# Update time offset for next chunk (convert ms to seconds)
time_offset_seconds += len(audio_chunk) / 1000
start_ms += split_segment_ms
count += 1
os.remove(split_tmp.name)
else:
# transcribe the full audio
with open(webm_tmpfile.name, "rb") as f:
transcript = openai.Audio.transcribe("whisper-1", f)
transcript_list.append(transcript['text']) # type: ignore
transcript = openai.Audio.transcribe("whisper-1", f, response_format="verbose_json")
print("=== TRANSCRIPT ===")
print(transcript)
print("==================")

# Merge segments by 15-second intervals
merged_chunks = self._merge_transcript_segments(transcript.get('segments', []), interval_seconds=15)
print("=== MERGED CHUNKS ===")
for i, chunk in enumerate(merged_chunks):
print(f"Chunk {i}: start_time={chunk['start_time']:.2f}s, text={chunk['text'][:50]}...")
print("=====================")
for chunk in merged_chunks:
transcript_list.append({
"text": chunk["text"],
"start_time": chunk["start_time"]
})

os.remove(webm_tmpfile.name)

text = [txt for txt in transcript_list]
# Build texts and metadatas from merged chunks
texts = [chunk["text"] for chunk in transcript_list]
metadatas: List[Dict[str, Any]] = [{
'course_name': course_name,
's3_path': s3_path,
'readable_filename': kwargs.get('readable_filename',
Path(s3_path).name[37:]),
'pagenumber': '',
'timestamp': text.index(txt),
'pagenumber': chunk["start_time"], # Store timestamp in pagenumber
'timestamp': '',
'url': kwargs.get('url', ''),
'base_url': kwargs.get('base_url', ''),
} for txt in text]
} for chunk in transcript_list]

self.split_and_upload(texts=text, metadatas=metadatas, force_embeddings=force_embeddings, **kwargs)
self.split_and_upload(texts=texts, metadatas=metadatas, force_embeddings=force_embeddings, **kwargs)
return "Success"
except Exception as e:
err = f"❌❌ Error in (VIDEO ingest): `{inspect.currentframe().f_code.co_name}`: {e}\nTraceback:\n", traceback.format_exc(
Expand Down