Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/cudadecoder/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ OBJFILES = cuda-decoder.o cuda-decoder-kernels.o cuda-fst.o \
batched-threaded-nnet3-cuda-pipeline2.o \
batched-static-nnet3.o batched-static-nnet3-kernels.o \
cuda-online-pipeline-dynamic-batcher.o decodable-cumatrix.o \
cuda-pipeline-common.o lattice-postprocessor.o
cuda-pipeline-common.o lattice-postprocessor.o \
thread-pool-cia.o

LIBNAME = kaldi-cudadecoder

Expand Down
2 changes: 1 addition & 1 deletion src/cudadecoder/batched-static-nnet3.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void BatchedStaticNnet3::PresetKernelParams() {
}

void BatchedStaticNnet3::Allocate() {
cudaEventCreate(&batch_slot_assignement_copy_evt_);
cudaEventCreate(&batch_slot_assignement_copy_evt_, cudaEventDisableTiming);
d_all_context_frames_.Resize(nchannels_ * total_nnet_context_, input_dim_);
d_batch_with_context_.Resize(
max_batch_size_ * input_frames_per_chunk_with_context_, input_dim_);
Expand Down
66 changes: 58 additions & 8 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,14 +240,57 @@ bool BatchedThreadedNnet3CudaOnlinePipeline::TryInitCorrID(

void BatchedThreadedNnet3CudaOnlinePipeline::CompactWavesToMatrix(
const std::vector<SubVector<BaseFloat>> &wave_samples) {
for (int i = 0; i < wave_samples.size(); ++i) {
const SubVector<BaseFloat> &src = wave_samples[i];
int size = src.Dim();
n_samples_valid_[i] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = h_all_waveform_.RowData(i);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
nvtxRangePushA(__func__);

if (!batching_copy_thread_pool_) {
for (int i = 0; i < wave_samples.size(); ++i) {
const SubVector<BaseFloat> &src = wave_samples[i];
int size = src.Dim();
n_samples_valid_[i] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = h_all_waveform_.RowData(i);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
}
} else {
const size_t batch_size =
KALDI_CUDA_DECODER_DIV_ROUND_UP(wave_samples.size(),
config_.num_batching_copy_threads);

std::mutex m;
std::condition_variable cv;

std::atomic<size_t> tasks_remaining;
std::atomic_init(&tasks_remaining, KALDI_CUDA_DECODER_DIV_ROUND_UP(wave_samples.size(), batch_size));

for (size_t i = 0; i < wave_samples.size(); i += batch_size) {

auto task = [i, this, &wave_samples, &m, &cv, &tasks_remaining, &batch_size]() {
nvtxRangePush("CompactWavesToMatrix task");
for (size_t j = i; j < std::min(i + batch_size, wave_samples.size()); ++j) {
const SubVector<BaseFloat> &src = wave_samples[j];
int size = src.Dim();
n_samples_valid_[j] = size;
const BaseFloat *wave_src = src.Data();
BaseFloat *wave_dst = this->h_all_waveform_.RowData(j);
std::memcpy(wave_dst, wave_src, size * sizeof(BaseFloat));
}
--tasks_remaining;
if (tasks_remaining.load() == 0) {
std::lock_guard<std::mutex> lock(m);
cv.notify_one();
}
nvtxRangePop();
};
batching_copy_thread_pool_->submit(task);
}

// wait for all threads to finish
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&tasks_remaining](){ return tasks_remaining == 0; });
}
}
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ComputeGPUFeatureExtraction(
Expand All @@ -258,9 +301,11 @@ void BatchedThreadedNnet3CudaOnlinePipeline::ComputeGPUFeatureExtraction(
// CopyFromMat syncs, avoiding it
KALDI_ASSERT(d_all_waveform_.SizeInBytes() == h_all_waveform.SizeInBytes());
// Note : we could have smaller copies using the actual channels.size()
nvtxRangePushA("ComputeGPUFeatureExtractioncudaMemcpyAsync");
cudaMemcpyAsync(d_all_waveform_.Data(), h_all_waveform.Data(),
h_all_waveform.SizeInBytes(), cudaMemcpyHostToDevice,
cudaStreamPerThread);
nvtxRangePop();

KALDI_ASSERT(channels.size() == is_last_chunk.size());
KALDI_ASSERT(channels.size() == is_first_chunk.size());
Expand Down Expand Up @@ -348,6 +393,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::DecodeBatch(
ListIChannelsInBatch(corr_ids, &channels_);

// Compact in h_all_waveform_ to use the main DecodeBatch version
// this is slow
CompactWavesToMatrix(wave_samples);

DecodeBatch(corr_ids, h_all_waveform_, n_samples_valid_, is_first_chunk,
Expand Down Expand Up @@ -575,6 +621,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunLatticeCallbacks(
void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize(
const std::vector<CorrelationID> &corr_ids,
const std::vector<int> &channels, const std::vector<bool> &is_last_chunk) {
nvtxRangePushA("RunCallbacksAndFinalize");
// Reading endpoints, figuring out is_end_of_segment_
for (size_t i = 0; i < is_last_chunk.size(); ++i) {
bool endpoint_detected = false;
Expand All @@ -589,6 +636,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunCallbacksAndFinalize(
RunBestPathCallbacks(corr_ids, channels);

RunLatticeCallbacks(corr_ids, channels, is_last_chunk);
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ListIChannelsInBatch(
Expand Down Expand Up @@ -646,7 +694,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::InitDecoding(
}

if (should_reset_decoder)
init_decoding_list_channels_.push_back((channels)[i]);
init_decoding_list_channels_.push_back(channels[i]);
}

if (!init_decoding_list_channels_.empty())
Expand All @@ -655,6 +703,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::InitDecoding(

void BatchedThreadedNnet3CudaOnlinePipeline::RunDecoder(
const std::vector<int> &channels, const std::vector<bool> &is_first_chunk) {
nvtxRangePushA("RunDecoder");
if (partial_hypotheses_) {
// We're going to have to generate the partial hypotheses
if (word_syms_ == nullptr) {
Expand Down Expand Up @@ -690,6 +739,7 @@ void BatchedThreadedNnet3CudaOnlinePipeline::RunDecoder(
(*end_points_)[i] = cuda_decoder_->EndpointDetected(ichannel);
}
}
nvtxRangePop();
}

void BatchedThreadedNnet3CudaOnlinePipeline::ReadParametersFromModel() {
Expand Down
24 changes: 22 additions & 2 deletions src/cudadecoder/batched-threaded-nnet3-cuda-online-pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
#include "nnet3/nnet-optimize.h"
#include "online2/online-nnet2-feature-pipeline.h"

#include "cudadecoder/thread-pool-cia.h"

namespace kaldi {
namespace cuda_decoder {

Expand All @@ -66,7 +68,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
determinize_lattice(true),
num_decoder_copy_threads(2),
use_gpu_feature_extraction(true),
reset_on_endpoint(false) {}
reset_on_endpoint(false),
num_batching_copy_threads(0) {}
void Register(OptionsItf *po) {
po->Register("max-batch-size", &max_batch_size,
"The maximum execution batch size."
Expand All @@ -88,6 +91,12 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
po->Register(
"reset-on-endpoint", &reset_on_endpoint,
"Reset a decoder channel when endpoint detected. Do not close stream");
po->Register(
"batching-copy-threads", &num_batching_copy_threads,
"Number of threads to use for copying inputs on CPU into single pinned memory matrix. "
"0 means to just use the main thread. Recommend setting this to 8 because the memory "
"copy can starve the GPU of work."
);

feature_opts.Register(po);
decoder_opts.Register(po);
Expand All @@ -101,6 +110,7 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
int num_decoder_copy_threads;
bool use_gpu_feature_extraction;
bool reset_on_endpoint;
int num_batching_copy_threads;

OnlineNnet2FeaturePipelineConfig feature_opts;
CudaDecoderConfig decoder_opts;
Expand All @@ -121,6 +131,8 @@ struct BatchedThreadedNnet3CudaOnlinePipelineConfig {
num_worker_threads = (num_worker_threads > 0)
? num_worker_threads
: std::thread::hardware_concurrency();

KALDI_ASSERT(num_batching_copy_threads >= 0);
}
};

Expand Down Expand Up @@ -150,9 +162,15 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
word_syms_(NULL) {
config_.compute_opts.CheckAndFixConfigs(am_nnet_->GetNnet().Modulus());
config_.CheckAndFixConfigs();
Initialize(decode_fst);
int num_worker_threads = config_.num_worker_threads;
thread_pool_ = std::make_unique<ThreadPoolLight>(num_worker_threads);

int num_batching_copy_threads = config_.num_batching_copy_threads;
if (num_batching_copy_threads > 0) {
batching_copy_thread_pool_ = std::make_unique<work_stealing_thread_pool>(num_batching_copy_threads);
}

Initialize(decode_fst);
}

~BatchedThreadedNnet3CudaOnlinePipeline();
Expand Down Expand Up @@ -503,6 +521,8 @@ class BatchedThreadedNnet3CudaOnlinePipeline {
// destructor blocks until the thread pool is drained of work items.
std::unique_ptr<ThreadPoolLight> thread_pool_;

std::unique_ptr<work_stealing_thread_pool> batching_copy_thread_pool_;

// The decoder owns thread(s) that reconstruct lattices transferred from the
// device in a compacted form as arrays with offsets instead of pointers.
std::unique_ptr<CudaDecoder> cuda_decoder_;
Expand Down
4 changes: 4 additions & 0 deletions src/cudadecoder/batched-threaded-nnet3-cuda-pipeline2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,17 @@ void BatchedThreadedNnet3CudaPipeline2::AcquireTasks() {

void BatchedThreadedNnet3CudaPipeline2::ComputeTasks() {
while (threads_running_) {
nvtxRangePushA("AcquireTasks");
if (current_tasks_.size() < max_batch_size_) AcquireTasks();
nvtxRangePop();
if (current_tasks_.empty()) {
// If we still have nothing to do, let's sleep a bit
Sleep(kSleepForNewTask);
continue;
}
nvtxRangePushA("BuildBatch");
BuildBatchFromCurrentTasks();
nvtxRangePop();

if (use_online_features_)
cuda_online_pipeline_.DecodeBatch(batch_corr_ids_, batch_wave_samples_,
Expand Down
2 changes: 0 additions & 2 deletions src/cudadecoder/cuda-decoder-common.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@
#define KALDI_CUDA_DECODER_BATCH_KERNEL_LOOP(i, n) \
for (int i = blockIdx.y; i < (n); i += gridDim.y)

#define KALDI_CUDA_DECODER_DIV_ROUND_UP(a, b) ((a + b - 1) / b)

#define KALDI_CUDA_DECODER_1D_BLOCK 256
#define KALDI_CUDA_DECODER_LARGEST_1D_BLOCK 1024
#define KALDI_CUDA_DECODER_ONE_THREAD_BLOCK 1
Expand Down
2 changes: 1 addition & 1 deletion src/cudadecoder/cuda-decoder-kernels.cu
Original file line number Diff line number Diff line change
Expand Up @@ -1538,7 +1538,7 @@ __global__ void emitting_preprocess_and_list_extra_prev_tokens_step1_kernel(
// Token index of one of the token which the lowest token.cost for that
// state
uint32_t state_best_int_cost_argmin;
GetArgFromPackedArgminUInt64(h_val.min_and_argmin_int_cost_u64, &state_best_int_cost_argmin);
GetArgFromPackedArgminUInt64(h_val.min_and_argmin_int_cost_u64, &state_best_int_cost_argmin);

// Checking if we're the representative of that state
representing_state = (main_q_idx == state_best_int_cost_argmin);
Expand Down
Loading