Skip to content

Commit 9d23586

Browse files
authored
[src] Deterministic shutdown and error checks to 11 in CudaDecoder (#4569)
* Wait for D2H then H2H transfers to dry out before stopping the pump. * Assert no H2H tasks after it stops. * Assert that an offset into vector is within bounds (in the #4556 scenario, it's not). * Add CU_SAFE_CALL to every CUDA call. * Use compile time asserts on constants instead of runtime asserts * The rest is reformatting comments, IWYU and updates to coding conventions.
1 parent 4973514 commit 9d23586

File tree

2 files changed

+119
-86
lines changed

2 files changed

+119
-86
lines changed

src/cudadecoder/cuda-decoder.cc

Lines changed: 96 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,27 @@
2222
#include "cudadecoder/cuda-decoder.h"
2323

2424
#include <algorithm>
25-
#include <map>
26-
#include <sstream>
25+
#include <atomic>
26+
#include <functional>
27+
#include <iterator>
28+
#include <list>
29+
#include <memory>
30+
#include <mutex>
31+
#include <stack>
32+
#include <string>
33+
#include <thread>
2734
#include <tuple>
35+
#include <unordered_map>
36+
#include <unordered_set>
37+
#include <utility>
38+
#include <vector>
2839

2940
#include <cuda_runtime_api.h>
3041
#include <nvToolsExt.h>
3142

43+
#include "base/kaldi-utils.h"
3244
#include "cudadecoder/cuda-decoder-kernels.h"
45+
#include "cudamatrix/cu-common.h"
3346
#include "online2/online-endpoint.h"
3447
#include "util/text-utils.h"
3548

@@ -62,10 +75,10 @@ CudaDecoder::CudaDecoder(const CudaFst &fst, const CudaDecoderConfig &config,
6275
KALDI_ASSERT(nchannels_ > 0);
6376
KALDI_ASSERT(nlanes_ <= nchannels_);
6477
// All GPU work in decoder will be sent to compute_st_
65-
cudaStreamCreate(&compute_st_);
78+
CU_SAFE_CALL(cudaStreamCreate(&compute_st_));
6679
// Copies D2H of tokens for storage on host are done on
6780
// copy_st_, in parallel with compute_st_
68-
cudaStreamCreate(&copy_st_);
81+
CU_SAFE_CALL(cudaStreamCreate(&copy_st_));
6982
// For all the allocating/initializing process
7083
// We create a special channel
7184
// containing the exact state a channel should have when starting a new
@@ -96,7 +109,7 @@ CudaDecoder::CudaDecoder(const CudaFst &fst, const CudaDecoderConfig &config,
96109
--nchannels_; // removing the special initial channel from the count
97110

98111
// Making sure that everything is ready to use
99-
cudaStreamSynchronize(compute_st_);
112+
CU_SAFE_CALL(cudaStreamSynchronize(compute_st_));
100113
KALDI_DECODER_CUDA_CHECK_ERROR();
101114
}
102115

@@ -252,8 +265,8 @@ void CudaDecoder::InitDeviceParams() {
252265
// Setting Kernel Params
253266
// Sent to cuda kernels by copy
254267
// Making sure we'll be able to send it to the kernels
255-
KALDI_ASSERT((sizeof(KernelParams) + sizeof(DeviceParams)) <
256-
KALDI_CUDA_DECODER_MAX_KERNEL_ARGUMENTS_BYTE_SIZE);
268+
KALDI_COMPILE_TIME_ASSERT((sizeof(KernelParams) + sizeof(DeviceParams)) <
269+
KALDI_CUDA_DECODER_MAX_KERNEL_ARGUMENTS_BYTE_SIZE);
257270

258271
h_device_params_->d_channels_counters = d_channels_counters_.GetView();
259272
h_device_params_->d_lanes_counters = d_lanes_counters_.GetView();
@@ -322,12 +335,20 @@ void CudaDecoder::InitDeviceParams() {
322335
}
323336

324337
CudaDecoder::~CudaDecoder() noexcept(false) {
325-
// Stopping h2h tasks
338+
// Wait for D2H copies before stopping H2H tasks.
339+
CU_SAFE_CALL(cudaStreamSynchronize(compute_st_));
340+
CU_SAFE_CALL(cudaStreamSynchronize(copy_st_));
341+
// Stop h2h tasks.
342+
WaitForInitDecodingH2HCopies();
343+
WaitForH2HCopies();
326344
h2h_threads_running_ = false;
327345
n_h2h_main_task_todo_cv_.notify_all();
328346
for (std::thread &thread : cpu_dedicated_threads_) thread.join();
329-
cudaStreamDestroy(compute_st_);
330-
cudaStreamDestroy(copy_st_);
347+
KALDI_ASSERT(n_h2h_main_task_todo_ <= 0);
348+
KALDI_ASSERT(n_h2h_task_not_done_ == 0);
349+
350+
CU_SAFE_CALL(cudaStreamDestroy(compute_st_));
351+
CU_SAFE_CALL(cudaStreamDestroy(copy_st_));
331352

332353
KALDI_DECODER_CUDA_API_CHECK_ERROR(cudaFreeHost(h_channels_counters_));
333354
KALDI_DECODER_CUDA_API_CHECK_ERROR(
@@ -354,26 +375,27 @@ CudaDecoder::~CudaDecoder() noexcept(false) {
354375

355376
void CudaDecoder::ComputeInitialChannel() {
356377
KALDI_ASSERT(nlanes_ > 0);
357-
const int32 ilane = 0;
358-
KALDI_ASSERT(ilane == 0);
378+
const LaneId kLane0 = 0;
379+
359380
// Following kernels working channel_id
360381
std::vector<ChannelId> channels = {init_channel_id_};
361382
num_frames_decoded_[init_channel_id_] = 0;
362383
SetChannelsInKernelParams(channels); // not calling LoadChannelsStateToLanes,
363384
// init_channel_id_ is a special case
364-
h_lanes_counters_.lane(ilane)->channel_to_compute = init_channel_id_;
385+
h_lanes_counters_.lane(kLane0)->channel_to_compute = init_channel_id_;
365386

366-
cudaMemcpyAsync(d_lanes_counters_.MutableData(), h_lanes_counters_.lane(0),
367-
1 * sizeof(*h_lanes_counters_.lane(0)),
368-
cudaMemcpyHostToDevice, compute_st_);
369-
h_lanes_counters_.lane(ilane)->main_q_narcs_and_end.y = 0;
387+
CU_SAFE_CALL(cudaMemcpyAsync(d_lanes_counters_.MutableData(),
388+
h_lanes_counters_.lane(0),
389+
1 * sizeof(*h_lanes_counters_.lane(0)),
390+
cudaMemcpyHostToDevice, compute_st_));
391+
h_lanes_counters_.lane(kLane0)->main_q_narcs_and_end.y = 0;
370392

371393
// Adding the start state to the initial token queue
372394
InitializeInitialLaneKernel(KaldiCudaDecoderNumBlocks(1, 1),
373395
KALDI_CUDA_DECODER_ONE_THREAD_BLOCK, compute_st_,
374396
*h_device_params_);
375397

376-
h_lanes_counters_.lane(ilane)->post_expand_aux_q_end = 1;
398+
h_lanes_counters_.lane(kLane0)->post_expand_aux_q_end = 1;
377399

378400
PruneAndPreprocess();
379401
FinalizeProcessNonEmittingKernel(
@@ -386,7 +408,7 @@ void CudaDecoder::ComputeInitialChannel() {
386408
CopyLaneCountersToHostSync();
387409

388410
const int32 main_q_end =
389-
h_lanes_counters_.lane(ilane)->main_q_narcs_and_end.y;
411+
h_lanes_counters_.lane(kLane0)->main_q_narcs_and_end.y;
390412
KALDI_ASSERT(main_q_end > 0);
391413

392414
// Moving all data linked to init_channel_id_ to host
@@ -406,9 +428,10 @@ void CudaDecoder::InitDecoding(const std::vector<ChannelId> &channels) {
406428
const int nlanes_used = channels.size();
407429
// Getting *h_kernel_params ready to use
408430
LoadChannelsStateToLanes(channels);
409-
cudaMemcpyAsync(d_lanes_counters_.MutableData(), h_lanes_counters_.lane(0),
410-
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
411-
cudaMemcpyHostToDevice, compute_st_);
431+
CU_SAFE_CALL(cudaMemcpyAsync(d_lanes_counters_.MutableData(),
432+
h_lanes_counters_.lane(0),
433+
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
434+
cudaMemcpyHostToDevice, compute_st_));
412435

413436
// Size of the initial main_q
414437
ChannelCounters &init_channel_counters =
@@ -424,8 +447,7 @@ void CudaDecoder::InitDecoding(const std::vector<ChannelId> &channels) {
424447
*h_kernel_params_);
425448

426449
{
427-
std::lock_guard<std::mutex> n_h2h_not_done_lk(
428-
n_init_decoding_h2h_task_not_done_mutex_);
450+
std::lock_guard<std::mutex> lk(n_init_decoding_h2h_task_not_done_mutex_);
429451
n_init_decoding_h2h_task_not_done_ += channels.size();
430452
}
431453
for (ChannelId ichannel : channels) {
@@ -549,14 +571,15 @@ int32 CudaDecoder::GetMaxForAllLanes(
549571
}
550572

551573
void CudaDecoder::CopyLaneCountersToHostAsync() {
552-
cudaMemcpyAsync(h_lanes_counters_.lane(0), d_lanes_counters_.MutableData(),
553-
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
554-
cudaMemcpyDeviceToHost, compute_st_);
574+
CU_SAFE_CALL(cudaMemcpyAsync(h_lanes_counters_.lane(0),
575+
d_lanes_counters_.MutableData(),
576+
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
577+
cudaMemcpyDeviceToHost, compute_st_));
555578
}
556579

557580
void CudaDecoder::CopyLaneCountersToHostSync() {
558581
CopyLaneCountersToHostAsync();
559-
cudaStreamSynchronize(compute_st_);
582+
CU_SAFE_CALL(cudaStreamSynchronize(compute_st_));
560583
}
561584

562585
// One sync has to happen between PerformConcatenatedCopy and
@@ -643,7 +666,7 @@ void CudaDecoder::PostProcessingMainQueue() {
643666
ComputeLaneOffsetsKernel(KaldiCudaDecoderNumBlocks(1, 1), // One CTA
644667
KALDI_CUDA_DECODER_1D_BLOCK, compute_st_,
645668
*h_device_params_, *h_kernel_params_);
646-
cudaEventRecord(lane_offsets_ready_evt_, compute_st_);
669+
CU_SAFE_CALL(cudaEventRecord(lane_offsets_ready_evt_, compute_st_));
647670

648671
EmittingPreprocessAndListExtraPrevTokensStep3Kernel(
649672
KaldiCudaDecoderNumBlocks(nlanes_used_), KALDI_CUDA_DECODER_1D_BLOCK,
@@ -659,11 +682,11 @@ void CudaDecoder::PostProcessingMainQueue() {
659682
}
660683

661684
void CudaDecoder::CopyMainQueueDataToHost() {
662-
cudaEventRecord(concatenated_data_ready_evt_, compute_st_);
663-
cudaStreamWaitEvent(copy_st_, concatenated_data_ready_evt_,
664-
0); // the copies on copy_st will wait on compute_st_
665-
cudaEventSynchronize(lane_offsets_ready_evt_); // we need the total
666-
// size of each segments
685+
CU_SAFE_CALL(cudaEventRecord(concatenated_data_ready_evt_, compute_st_));
686+
// The copies on copy_st will wait on compute_st_.
687+
CU_SAFE_CALL(cudaStreamWaitEvent(copy_st_, concatenated_data_ready_evt_, 0));
688+
// We need the total size of each segment on the host.
689+
CU_SAFE_CALL(cudaEventSynchronize(lane_offsets_ready_evt_));
667690
LaunchD2HCopies();
668691

669692
// Making sure the previous H2H copies are done
@@ -711,7 +734,7 @@ void CudaDecoder::LaunchD2HCopies() {
711734
nelements_acoustic_costs * sizeof(*d_acoustic_cost_concat_),
712735
cudaMemcpyDeviceToHost, copy_st_));
713736
}
714-
cudaEventRecord(d2h_copy_acoustic_evt_, copy_st_);
737+
CU_SAFE_CALL(cudaEventRecord(d2h_copy_acoustic_evt_, copy_st_));
715738

716739
int32 nelements_infotoken =
717740
h_lanes_counters_.lane(nlanes_used_)->main_q_end_lane_offset;
@@ -721,7 +744,7 @@ void CudaDecoder::LaunchD2HCopies() {
721744
nelements_infotoken * sizeof(*d_infotoken_concat_),
722745
cudaMemcpyDeviceToHost, copy_st_));
723746
}
724-
cudaEventRecord(d2h_copy_infotoken_evt_, copy_st_);
747+
CU_SAFE_CALL(cudaEventRecord(d2h_copy_infotoken_evt_, copy_st_));
725748
int32 nelements_extra_prev_tokens =
726749
h_lanes_counters_.lane(nlanes_used_)
727750
->main_q_n_extra_prev_tokens_lane_offset;
@@ -737,7 +760,7 @@ void CudaDecoder::LaunchD2HCopies() {
737760
sizeof(*d_extra_and_acoustic_cost_concat_),
738761
cudaMemcpyDeviceToHost, copy_st_));
739762
}
740-
cudaEventRecord(d2h_copy_extra_prev_tokens_evt_, copy_st_);
763+
CU_SAFE_CALL(cudaEventRecord(d2h_copy_extra_prev_tokens_evt_, copy_st_));
741764
}
742765

743766
void CudaDecoder::ConcatenateData() {
@@ -817,12 +840,13 @@ void CudaDecoder::AdvanceDecoding(
817840
}
818841
LoadChannelsStateToLanes(channels);
819842
KALDI_ASSERT(nlanes_used_ > 0);
820-
cudaMemcpyAsync(d_lanes_counters_.MutableData(), h_lanes_counters_.lane(0),
821-
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
822-
cudaMemcpyHostToDevice, compute_st_);
843+
CU_SAFE_CALL(cudaMemcpyAsync(d_lanes_counters_.MutableData(),
844+
h_lanes_counters_.lane(0),
845+
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
846+
cudaMemcpyHostToDevice, compute_st_));
823847
// compute_st_ will wait for nnet3 to complete
824-
cudaEventRecord(nnet3_done_evt_, cudaStreamPerThread);
825-
cudaStreamWaitEvent(compute_st_, nnet3_done_evt_, 0);
848+
CU_SAFE_CALL(cudaEventRecord(nnet3_done_evt_, cudaStreamPerThread));
849+
CU_SAFE_CALL(cudaStreamWaitEvent(compute_st_, nnet3_done_evt_, 0));
826850

827851
// Estimating cutoff using argmin from last frame
828852
ResetForFrameAndEstimateCutoffKernel(
@@ -874,7 +898,8 @@ void CudaDecoder::AdvanceDecoding(
874898
PostProcessingMainQueue();
875899

876900
// Waiting on previous d2h before writing on same device memory
877-
cudaStreamWaitEvent(compute_st_, d2h_copy_extra_prev_tokens_evt_, 0);
901+
CU_SAFE_CALL(cudaStreamWaitEvent(compute_st_,
902+
d2h_copy_extra_prev_tokens_evt_, 0));
878903
// Concatenating the data that will be moved to host into large arrays
879904
ConcatenateData();
880905
// Copying the final lane counters for that frame
@@ -902,9 +927,10 @@ void CudaDecoder::AdvanceDecoding(
902927

903928
void CudaDecoder::WaitForPartialHypotheses() {
904929
if (!generate_partial_hypotheses_) return;
905-
while (n_partial_traceback_threads_not_done_.load(std::memory_order_acquire) >
906-
0)
930+
while (n_partial_traceback_threads_not_done_
931+
.load(std::memory_order_acquire) > 0) {
907932
Sleep(200e-6);
933+
}
908934
}
909935

910936
void CudaDecoder::CheckOverflow() {
@@ -918,27 +944,15 @@ void CudaDecoder::CheckOverflow() {
918944

919945
if ((q_overflow & OVERFLOW_MAIN_Q) == OVERFLOW_MAIN_Q) {
920946
// overflowed main_q
921-
KALDI_WARN << "Preventing overflow of main_q. "
922-
"Continuing "
923-
<< "execution but the quality of "
924-
"the output may be decreased. "
925-
<< "To prevent this from happening, "
926-
"please increase the "
927-
"parameter "
928-
"--main-q-capacity"
929-
<< " and/or decrease --max-active";
947+
KALDI_WARN << ("Preventing overflow of main_q. The quality of the"
948+
" output may be reduced. Increase --main-q-capacity"
949+
" and/or decrease --max-active");
930950
}
931951
if ((q_overflow & OVERFLOW_AUX_Q) == OVERFLOW_AUX_Q) {
932952
// overflowed aux_q
933-
KALDI_WARN << "Preventing overflow of aux_q. "
934-
"Continuing "
935-
<< "execution but the quality of "
936-
"the output may be decreased. "
937-
<< "To prevent this from happening, "
938-
"please increase the "
939-
"parameter "
940-
"--aux-q-capacity"
941-
<< " and/or decrease --beam";
953+
KALDI_WARN << ("Preventing overflow of aux_q. The quality of the output"
954+
" may be reduced. Increase --aux-q-capacity and/or"
955+
" decrease --beam");
942956
}
943957

944958
KALDI_ASSERT(lane_counters->main_q_narcs_and_end.y < main_q_capacity_);
@@ -967,9 +981,10 @@ void CudaDecoder::GetBestCost(const std::vector<ChannelId> &channels,
967981
if (channels.size() == 0) return;
968982
// Getting the lanes ready to be used with those channels
969983
LoadChannelsStateToLanes(channels);
970-
cudaMemcpyAsync(d_lanes_counters_.MutableData(), h_lanes_counters_.lane(0),
971-
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
972-
cudaMemcpyHostToDevice, compute_st_);
984+
CU_SAFE_CALL(cudaMemcpyAsync(d_lanes_counters_.MutableData(),
985+
h_lanes_counters_.lane(0),
986+
nlanes_used_ * sizeof(*h_lanes_counters_.lane(0)),
987+
cudaMemcpyHostToDevice, compute_st_));
973988

974989
auto func_main_q_end = [](const LaneCounters &c) {
975990
return c.main_q_narcs_and_end.y;
@@ -1010,7 +1025,7 @@ void CudaDecoder::GetBestCost(const std::vector<ChannelId> &channels,
10101025
// best+lattice_beam]
10111026
list_finals_token_idx_and_cost->resize(nlanes_used_);
10121027
// Waiting for the copy
1013-
cudaStreamSynchronize(compute_st_);
1028+
CU_SAFE_CALL(cudaStreamSynchronize(compute_st_));
10141029
for (int32 ilane = 0; ilane < nlanes_used_; ++ilane) {
10151030
int ichannel = channels[ilane];
10161031
KALDI_ASSERT(NumFramesDecoded(ichannel) > 0);
@@ -1076,8 +1091,7 @@ void CudaDecoder::GetBestPredecessor(int32 ichannel, int32 curr_token_idx,
10761091
.size());
10771092
CostType arc_extra_cost =
10781093
h_all_tokens_extra_prev_tokens_extra_and_acoustic_cost_[ichannel]
1079-
[offset + i]
1080-
.x;
1094+
[offset + i].x;
10811095
// Picking one arc on the best path
10821096
// (extra_cost == 0)
10831097
if (arc_extra_cost == 0.0f) {
@@ -1280,14 +1294,17 @@ void CudaDecoder::AddFinalTokensToLattice(
12801294
// next_state[arc_idx] we just need a valid arc_idx
12811295
int32 arc_idx;
12821296
if (final_token.IsUniqueTokenForStateAndFrame()) {
1283-
// If unique, we can directly use this arc_idx
1297+
// If unique, we can directly use this arc_idx.
12841298
arc_idx = final_token.arc_idx;
12851299
} else {
1286-
// If we have multiple tokens associated to that
1287-
// fst state, just pick the first one from the
1288-
// list
1300+
// If we have multiple tokens associated to that FST state, just pick
1301+
// the first one from the list.
12891302
int32 offset, size;
12901303
std::tie(offset, size) = final_token.GetSameFSTStateTokensList();
1304+
KALDI_ASSERT(!h_all_tokens_extra_prev_tokens_.empty());
1305+
KALDI_ASSERT(ichannel < h_all_tokens_extra_prev_tokens_.size());
1306+
KALDI_ASSERT(!h_all_tokens_extra_prev_tokens_[ichannel].empty());
1307+
KALDI_ASSERT(offset < h_all_tokens_extra_prev_tokens_[ichannel].size());
12911308
InfoToken prev_token =
12921309
h_all_tokens_extra_prev_tokens_[ichannel][offset];
12931310
arc_idx = prev_token.arc_idx;
@@ -1797,8 +1814,9 @@ void CudaDecoder::CheckStaticAsserts() {
17971814

17981815
// We need that because we need to be able to do the scan in one pass in
17991816
// the kernel update_beam_using_histogram_kernel
1800-
KALDI_ASSERT(KALDI_CUDA_DECODER_HISTO_NBINS < KALDI_CUDA_DECODER_1D_BLOCK);
1801-
KALDI_ASSERT(KALDI_CUDA_DECODER_NONEM_LT_MAX_NARCS > 0);
1817+
KALDI_COMPILE_TIME_ASSERT(
1818+
KALDI_CUDA_DECODER_HISTO_NBINS < KALDI_CUDA_DECODER_1D_BLOCK);
1819+
KALDI_COMPILE_TIME_ASSERT(KALDI_CUDA_DECODER_NONEM_LT_MAX_NARCS > 0);
18021820
}
18031821

18041822
void CudaDecoder::LaunchH2HCopies() {
@@ -2006,7 +2024,7 @@ void CudaDecoder::ComputeH2HCopies() {
20062024
}
20072025
// Waiting for the D2H copies. This is threadsafe
20082026
// Step 1: acoustic costs
2009-
cudaEventSynchronize(d2h_copy_acoustic_evt_);
2027+
CU_SAFE_CALL(cudaEventSynchronize(d2h_copy_acoustic_evt_));
20102028
while ((ilane = n_acoustic_h2h_copies_todo_.fetch_sub(1)) >= 0) {
20112029
int32 ichannel = lanes2channels_todo_[ilane];
20122030
// Lock Channel
@@ -2025,7 +2043,7 @@ void CudaDecoder::ComputeH2HCopies() {
20252043
}
20262044

20272045
// Step 2: infotoken
2028-
cudaEventSynchronize(d2h_copy_infotoken_evt_);
2046+
CU_SAFE_CALL(cudaEventSynchronize(d2h_copy_infotoken_evt_));
20292047
while ((ilane = n_infotoken_h2h_copies_todo_.fetch_sub(1)) >= 0) {
20302048
int32 ichannel = lanes2channels_todo_[ilane];
20312049
// Lock Channel
@@ -2037,7 +2055,7 @@ void CudaDecoder::ComputeH2HCopies() {
20372055
// Step 3:
20382056
// - extra prev tokens
20392057
// - partial path and endpointing
2040-
cudaEventSynchronize(d2h_copy_extra_prev_tokens_evt_);
2058+
CU_SAFE_CALL(cudaEventSynchronize(d2h_copy_extra_prev_tokens_evt_));
20412059
while ((ilane = n_extra_prev_tokens_h2h_copies_todo_.fetch_sub(1)) >= 0) {
20422060
int32 ichannel = lanes2channels_todo_[ilane];
20432061
// Lock Channel

0 commit comments

Comments
 (0)