Skip to content

Commit fa927e6

Browse files
authored
Support chunked output buffers (#74)
Support chunked output buffers. Signed-off-by: Rafal <[email protected]>
1 parent 114d7ac commit fa927e6

File tree

5 files changed

+148
-45
lines changed

5 files changed

+148
-45
lines changed

src/dali_executor/dali_executor.cc

Lines changed: 58 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,40 +44,82 @@ void DaliExecutor::SetupInputs(const std::vector<IDescr>& inputs) {
4444
assert(inp_size <= c_inputs.back().buffers[0].size);
4545
}
4646
}
47-
RunInputCopy();
47+
WaitForCopies();
4848
for (auto& inp : c_inputs) {
4949
pipeline_.SetInput(inp);
5050
}
5151
}
5252

5353

54-
IDescr DaliExecutor::ScheduleInputCopy(const IDescr& input) {
55-
assert(input.buffers.size() > 0);
54+
IOBufferI* DaliExecutor::GetInputBuffer(const std::string& name, device_type_t device) {
5655
IOBufferI* buffer;
57-
if (input.buffers[0].device == device_type_t::CPU) {
58-
buffer = &cpu_buffers_[input.meta.name];
56+
if (device == device_type_t::CPU) {
57+
buffer = &cpu_buffers_[name + "_inp"];
5958
} else {
60-
buffer = &gpu_buffers_[input.meta.name];
59+
buffer = &gpu_buffers_[name + "_inp"];
6160
}
61+
return buffer;
62+
}
63+
64+
65+
IOBufferI* DaliExecutor::GetOutputBuffer(const std::string& name, device_type_t device) {
66+
IOBufferI* buffer;
67+
if (device == device_type_t::CPU) {
68+
buffer = &cpu_buffers_[name + "_out"];
69+
} else {
70+
buffer = &gpu_buffers_[name + "_out"];
71+
}
72+
return buffer;
73+
}
74+
75+
76+
IDescr DaliExecutor::ScheduleInputCopy(const IDescr& input) {
77+
assert(input.buffers.size() > 0);
78+
IOBufferI* buffer = GetInputBuffer(input.meta.name, input.buffers[0].device);
6279
size_t size = 0;
6380
for (auto& buf : input.buffers)
6481
size += buf.size;
6582
buffer->resize(size);
6683
auto descriptor = buffer->get_descr();
6784
char* dst = reinterpret_cast<char*>(descriptor.data);
85+
auto stream = pipeline_.CopyStream();
6886
for (auto& buf : input.buffers) {
6987
thread_pool_.AddWork(
70-
[descriptor, dst, buf](int) {
71-
MemCopy(descriptor.device, dst, buf.device, buf.data, buf.size);
88+
[stream, descriptor, dst, buf](int) {
89+
MemCopy(descriptor.device, dst, buf.device, buf.data, buf.size, stream);
7290
},
7391
buf.size, true);
7492
dst += buf.size;
7593
}
7694
return IDescr{input.meta, {descriptor}};
7795
}
7896

79-
void DaliExecutor::RunInputCopy() {
97+
void DaliExecutor::ScheduleOutputCopy(const ODescr& output, int output_idx) {
98+
const auto& name = output.meta.name;
99+
const auto& out_buffers = output.buffers;
100+
size_t size = 0;
101+
for (auto& out_buff : out_buffers) {
102+
size += out_buff.size;
103+
}
104+
IOBufferI* interm_buffer = GetOutputBuffer(name, pipeline_.GetOutputDevice(output_idx));
105+
interm_buffer->resize(size);
106+
auto interm_descr = interm_buffer->get_descr();
107+
pipeline_.PutOutput(interm_descr.data, output_idx, interm_descr.device);
108+
char* src = reinterpret_cast<char*>(interm_descr.data);
109+
auto stream = pipeline_.CopyStream();
110+
for (auto& buf : out_buffers) {
111+
thread_pool_.AddWork(
112+
[stream, src, buf, interm_descr](int) {
113+
MemCopy(buf.device, buf.data, interm_descr.device, src, buf.size, stream);
114+
},
115+
buf.size);
116+
src += buf.size;
117+
}
118+
}
119+
120+
void DaliExecutor::WaitForCopies() {
80121
thread_pool_.RunAll();
122+
pipeline_.SyncStream();
81123
}
82124

83125

@@ -106,14 +148,14 @@ std::vector<OutputInfo> DaliExecutor::Run(const std::vector<IDescr>& inputs) {
106148

107149
void DaliExecutor::PutOutputs(const std::vector<ODescr>& outputs) {
108150
for (uint32_t output_idx = 0; output_idx < outputs.size(); ++output_idx) {
109-
ENFORCE(outputs[output_idx].buffers.size() == 1,
110-
"Ouptut can be copied only to a single buffer");
111-
auto buffer = outputs[output_idx].buffers[0];
112-
auto data = buffer.data;
113-
auto device_type = buffer.device;
114-
pipeline_.PutOutput(data, output_idx, device_type);
151+
if (outputs[output_idx].buffers.size() == 1) {
152+
auto buffer = outputs[output_idx].buffers[0];
153+
pipeline_.PutOutput(buffer.data, output_idx, buffer.device);
154+
} else {
155+
ScheduleOutputCopy(outputs[output_idx], output_idx);
156+
}
115157
}
116-
pipeline_.SyncOutputStream();
158+
WaitForCopies();
117159
}
118160

119161
}}} // namespace triton::backend::dali

src/dali_executor/dali_executor.h

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,16 +61,23 @@ class DaliExecutor {
6161
void SetupInputs(const std::vector<IDescr>& inputs);
6262

6363
/**
64-
* @brief Schedule a copy off all buffers within input IDescr to a continuous buffer.
65-
* The copy will be performed after calling RunInputCopy().
64+
* @brief Schedule a copy of all buffers within input IDescr to a continuous buffer.
65+
* Call WaitForCopies() to wait for the copy to finish.
6666
* @return IDecr to the new, continuous, buffer.
6767
*/
6868
IDescr ScheduleInputCopy(const IDescr& buffers);
6969

7070
/**
71-
* @brief Run copies scheduled by ScheduleInputCopy and wait for them to finish.
71+
* @brief Schedule a copy to a chunked output through an intermediate buffer.
72+
* Call WaitForCopies() to wait for the copy to finish.
7273
*/
73-
void RunInputCopy();
74+
void ScheduleOutputCopy(const ODescr& output, int output_idx);
75+
76+
/**
77+
* @brief Wait for the copies scheduled by ScheduleInputCopy or ScheduleOutputCopy
78+
* and wait for them to finish.
79+
*/
80+
void WaitForCopies();
7481

7582
/**
7683
* @brief Check if an input can be used without a copy.
@@ -82,6 +89,16 @@ class DaliExecutor {
8289
return (n_threads < 1) ? 1 : n_threads;
8390
}
8491

92+
/**
93+
* @brief Get an intermediate buffer located on the \p device for an input with a given \p name
94+
*/
95+
IOBufferI* GetInputBuffer(const std::string& name, device_type_t device);
96+
97+
/**
98+
* @brief Get an intermediate buffer located on the \p device for an output with a given \p name
99+
*/
100+
IOBufferI* GetOutputBuffer(const std::string& name, device_type_t device);
101+
85102
DaliPipeline pipeline_;
86103
ThreadPool thread_pool_;
87104
std::map<std::string, IOBuffer<CPU>> cpu_buffers_;

src/dali_executor/dali_pipeline.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ void DaliPipeline::SetInput(const IDescr& io_descr) {
7777
SetInput(buffer.data, meta.name.c_str(), buffer.device, meta.type, meta.shape);
7878
}
7979

80-
void DaliPipeline::SyncOutputStream() {
80+
void DaliPipeline::SyncStream() {
8181
if (NoGpu())
8282
return;
8383
DeviceGuard dg(device_id_);

src/dali_executor/dali_pipeline.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,11 +125,15 @@ class DaliPipeline {
125125
void PutOutput(void* destination, int output_idx, device_type_t destination_device);
126126

127127
/**
128-
* @brief Wait for all output copies.
128+
* @brief Wait for the work scheduled on the copy stream.
129129
*
130130
* This should be always called after copying all of the pipeline outputs.
131131
*/
132-
void SyncOutputStream();
132+
void SyncStream();
133+
134+
cudaStream_t CopyStream() {
135+
return output_stream_;
136+
}
133137

134138
void Reset() {
135139
ReleasePipeline();

src/dali_executor/executor.test.cc

Lines changed: 62 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,49 @@
2929

3030
namespace triton { namespace backend { namespace dali { namespace test {
3131

32+
template<typename T, typename Op>
33+
void coalesced_compare(const std::vector<OBufferDescr> &obuffers,
34+
const std::vector<std::vector<T>> &ibuffers, size_t inp_size, const Op &op) {
35+
size_t inp_buff_i = 0;
36+
size_t inp_i = 0;
37+
size_t out_buff_i = 0;
38+
size_t out_i = 0;
39+
std::vector<T> obuffer;
40+
for (size_t i = 0; i < inp_size; ++i) {
41+
if (inp_i == ibuffers[inp_buff_i].size()) {
42+
inp_i = 0;
43+
inp_buff_i++;
44+
}
45+
if (out_i == obuffers[out_buff_i].size / sizeof(T)) {
46+
out_i = 0;
47+
out_buff_i++;
48+
}
49+
if (out_i == 0) {
50+
auto descr = obuffers[out_buff_i];
51+
REQUIRE(descr.size % sizeof(T) == 0);
52+
obuffer.resize(descr.size / sizeof(T));
53+
MemCopy(CPU, obuffer.data(), descr.device, descr.data, descr.size);
54+
}
55+
REQUIRE(obuffer[out_i] == op(ibuffers[inp_buff_i][inp_i]));
56+
out_i++;
57+
inp_i++;
58+
}
59+
}
60+
3261
TEST_CASE("Scaling Pipeline") {
3362
std::string pipeline_s((const char *)pipelines::scale_pipeline_str,
3463
pipelines::scale_pipeline_len);
35-
DaliPipeline pipeline(pipeline_s, 8, 4, 0);
64+
DaliPipeline pipeline(pipeline_s, 256, 4, 0);
3665
DaliExecutor executor(std::move(pipeline));
3766
std::mt19937 rand(1217);
3867
std::uniform_real_distribution<float> dist(-1.f, 1.f);
3968
const std::string inp_name = "INPUT0";
40-
auto scaling_test = [&](const std::vector<int> &batch_sizes) {
69+
auto scaling_test = [&](const std::vector<int> &batch_sizes,
70+
const std::vector<int> &out_batch_sizes,
71+
const std::vector<device_type_t> &out_devs) {
72+
REQUIRE(std::accumulate(batch_sizes.begin(), batch_sizes.end(), 0) ==
73+
std::accumulate(out_batch_sizes.begin(), out_batch_sizes.end(), 0));
74+
REQUIRE(out_devs.size() == out_batch_sizes.size());
4175
std::vector<TensorListShape<>> shapes;
4276
for (auto batch_size : batch_sizes) {
4377
TensorListShape<> shape(batch_size, 2);
@@ -53,33 +87,39 @@ TEST_CASE("Scaling Pipeline") {
5387
size_t inp_size = 0;
5488
for (auto &inp_buffer : input_buffers)
5589
inp_size += inp_buffer.size();
56-
std::vector<float> output_buffer(inp_size);
90+
std::vector<std::unique_ptr<IOBufferI>> output_buffers;
91+
int ti = 0;
92+
for (size_t out_i = 0; out_i < out_batch_sizes.size(); ++out_i) {
93+
int64_t buffer_vol = 0;
94+
for (int i = 0; i < out_batch_sizes[out_i]; ++i) {
95+
buffer_vol += volume(output[0].shape[ti]) * sizeof(float);
96+
ti++;
97+
}
98+
if (out_devs[out_i] == device_type_t::CPU) {
99+
output_buffers.emplace_back(std::make_unique<IOBuffer<CPU>>(buffer_vol));
100+
} else {
101+
output_buffers.emplace_back(std::make_unique<IOBuffer<GPU>>(buffer_vol));
102+
}
103+
}
57104
std::vector<ODescr> output_vec(1);
58105
auto &outdesc = output_vec[0];
59-
OBufferDescr buf_descr;
60-
buf_descr.device = device_type_t::CPU;
61-
buf_descr.data = output_buffer.data();
62-
buf_descr.size = output_buffer.size() * sizeof(decltype(output_buffer)::size_type);
63-
outdesc.buffers = {buf_descr};
64-
executor.PutOutputs(output_vec);
65-
size_t out_i = 0;
66-
int i = 0;
67-
for (auto &inp_buffer : input_buffers) {
68-
for (size_t i = 0; i < inp_buffer.size(); ++i) {
69-
REQUIRE(output_buffer[out_i] == inp_buffer[i] * 2);
70-
++out_i;
71-
}
106+
for (auto &out_buffer : output_buffers) {
107+
outdesc.buffers.push_back(out_buffer->get_descr());
72108
}
109+
executor.PutOutputs(output_vec);
110+
coalesced_compare(outdesc.buffers, input_buffers, inp_size, [](float a) { return a * 2; });
73111
};
74112

75113
SECTION("Simple execute") {
76-
scaling_test({3, 2, 1});
77-
scaling_test({5});
114+
scaling_test({3, 2, 1}, {6}, {CPU});
115+
scaling_test({5}, {5}, {GPU});
78116
}
79117

80-
SECTION("Repeat batch size") {
81-
scaling_test({3, 3});
82-
scaling_test({6});
118+
SECTION("Chunked output") {
119+
scaling_test({3, 3}, {3, 3}, {CPU, CPU});
120+
scaling_test({6}, {2, 4}, {GPU, GPU});
121+
scaling_test({8}, {6, 2}, {CPU, GPU});
122+
scaling_test({64}, {32, 16, 16}, {CPU, GPU, GPU});
83123
}
84124
}
85125

@@ -110,7 +150,7 @@ TEST_CASE("RN50 pipeline") {
110150
obuffer.device = device_type_t::CPU;
111151
obuffer.device_id = 0;
112152
obuffer.data = output_buffer.data();
113-
obuffer.size = output_buffer.size() * sizeof(decltype(output_buffer)::size_type);
153+
obuffer.size = output_buffer.size() * sizeof(decltype(output_buffer)::value_type);
114154
outdesc.buffers = {obuffer};
115155
executor.PutOutputs(output_vec);
116156
for (int c = 0; c < output_c; ++c) {

0 commit comments

Comments
 (0)