Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions retriever/audio_stage_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Example config for:
# - `retriever audio stage run --config <this.yaml> --input <primitives.parquet>`
#
# This YAML is parsed into `nv_ingest_api.internal.schemas.extract.extract_audio_schema.AudioExtractorSchema`
# via `retriever.audio.config.load_audio_extractor_schema_from_dict`.
#
# IMPORTANT:
# `audio_extraction_config.audio_endpoints` must provide at least one endpoint
# (gRPC or HTTP). Both cannot be null/empty. HTTP is not supported for audio;
# use gRPC.

# Optional worker settings
max_queue_size: 1
n_workers: 2
raise_on_failure: false

# Audio extraction configuration (Riva / Parakeet NIM).
audio_extraction_config:
# Optional auth token for secured services (NIM / NVCF)
auth_token: null

# Tuple/list in the form: [grpc, http]
# Riva/Parakeet ASR endpoint. Only gRPC is supported for audio.
#
# For the provided docker-compose.yaml the host-mapped ports are:
# - gRPC: audio:50051 (inside docker network)
# - gRPC: localhost:50051 (from host)
#
# For NVCF hosted endpoints:
# - gRPC: grpc.nvcf.nvidia.com:443
# audio_endpoints: ["audio:50051", null]
audio_endpoints: ["localhost:50051", null]


# Optional; if omitted it is inferred from which endpoint is present.
# Only "grpc" is supported for audio.
audio_infer_protocol: grpc

# Optional NVCF function ID (required when using grpc.nvcf.nvidia.com)
function_id: null

# SSL settings (auto-detected for NVCF endpoints)
use_ssl: null
ssl_cert: null

# If true, each speech segment (sentence) becomes a separate row with
# start_time / end_time metadata. If false (default), one row per file
# containing the full transcript.
segment_audio: false
13 changes: 13 additions & 0 deletions retriever/src/retriever/audio/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from .transcribe import (
audio_bytes_to_transcript_df,
audio_file_to_transcript_df,
)

__all__ = [
"audio_bytes_to_transcript_df",
"audio_file_to_transcript_df",
]
63 changes: 63 additions & 0 deletions retriever/src/retriever/audio/ray_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# SPDX-FileCopyrightText: Copyright (c) 2024-25, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

"""
Ray Data adapter for audio: AudioTranscribeActor turns bytes+path batches into transcript rows.
"""

from __future__ import annotations

from typing import List

import pandas as pd

from retriever.params import AudioExtractParams

from .transcribe import audio_bytes_to_transcript_df


class AudioTranscribeActor:
"""Ray Data map_batches callable: DataFrame with bytes, path -> DataFrame of transcript chunks.

Each output row has: text, content, path, page_number, metadata
(same shape as audio_file_to_transcript_df).
"""

def __init__(self, params: AudioExtractParams | None = None) -> None:
self._params = params or AudioExtractParams()

def __call__(self, batch_df: pd.DataFrame) -> pd.DataFrame:
if not isinstance(batch_df, pd.DataFrame) or batch_df.empty:
return pd.DataFrame(columns=["text", "content", "path", "page_number", "metadata"])

params = self._params
out_dfs: List[pd.DataFrame] = []
for _, row in batch_df.iterrows():
raw = row.get("bytes")
path = row.get("path")
if raw is None or path is None:
continue
path_str = str(path) if path is not None else ""
try:
chunk_df = audio_bytes_to_transcript_df(
raw,
path_str,
grpc_endpoint=params.grpc_endpoint,
auth_token=params.auth_token,
function_id=params.function_id,
use_ssl=params.use_ssl,
ssl_cert=params.ssl_cert,
segment_audio=params.segment_audio,
max_tokens=params.max_tokens,
overlap_tokens=params.overlap_tokens,
tokenizer_model_id=params.tokenizer_model_id,
tokenizer_cache_dir=params.tokenizer_cache_dir,
)
if not chunk_df.empty:
out_dfs.append(chunk_df)
except Exception:
continue
if not out_dfs:
return pd.DataFrame(columns=["text", "content", "path", "page_number", "metadata"])
return pd.concat(out_dfs, ignore_index=True)
Loading
Loading