Skip to content

Commit 750c6f4

Browse files
authored
multi-loss optimization by adding a DownpourOpt worker (#22025) (#22638)
* update * update test=develop * update compile set test=develop * update compile set test=develop * update test=develop * update test=develop * update test=develop * update compile setting test=develop * update compile setting test=develop * update run demo test=develop * update test=develop * update test=develop * fix test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update format test=develop * update format test=develop * update style test=develop * update style test=develop * change style test=develop * change style test=develop * change style test=develop * add dataset unittest test=develop * update test=develop * update for record test=develop * udpate style for record test=develop * update for record test=develop * update for record test=develop * update for record test=develop * fix format test=develop * update test=develop * update test=develop * update test=develop * update test=develop * update test=develop
1 parent c35413b commit 750c6f4

23 files changed

+1284
-142
lines changed

cmake/inference_lib.cmake

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,11 @@ copy(inference_lib_dist
156156
SRCS ${ZLIB_INCLUDE_DIR} ${ZLIB_LIBRARIES}
157157
DSTS ${dst_dir} ${dst_dir}/lib)
158158

159+
set(dst_dir "${FLUID_INFERENCE_INSTALL_DIR}/third_party/threadpool")
160+
copy(inference_lib_dist
161+
SRCS ${THREADPOOL_INCLUDE_DIR}/ThreadPool.h
162+
DSTS ${dst_dir})
163+
159164
copy(inference_lib_dist
160165
SRCS ${CMAKE_CURRENT_BINARY_DIR}/CMakeCache.txt
161166
DSTS ${FLUID_INFERENCE_INSTALL_DIR})

paddle/fluid/framework/CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ cc_library(executor_gc_helper SRCS executor_gc_helper.cc DEPS scope proto_desc o
189189
if(WITH_DISTRIBUTE)
190190
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
191191
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
192-
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc
192+
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
193193
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
194194
device_context scope framework_proto trainer_desc_proto glog fs shell fleet_wrapper lodtensor_printer
195195
lod_rank_table feed_fetch_method sendrecvop_rpc communicator collective_helper ${GLOB_DISTRIBUTE_DEPS}
@@ -199,7 +199,7 @@ set_source_files_properties(executor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_CO
199199
else()
200200
cc_library(executor SRCS executor.cc multi_trainer.cc pipeline_trainer.cc dataset_factory.cc
201201
dist_multi_trainer.cc trainer_factory.cc trainer.cc data_feed_factory.cc
202-
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc
202+
data_feed.cc device_worker.cc hogwild_worker.cc downpour_worker.cc downpour_worker_opt.cc
203203
pull_dense_worker.cc section_worker.cc device_worker_factory.cc data_set.cc DEPS op_registry
204204
device_context scope framework_proto data_feed_proto trainer_desc_proto glog
205205
lod_rank_table fs shell fleet_wrapper lodtensor_printer feed_fetch_method

paddle/fluid/framework/data_set.cc

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,12 @@ void DatasetImpl<T>::SetMergeByInsId(int merge_size) {
123123
merge_size_ = merge_size;
124124
}
125125

126+
template <typename T>
127+
void DatasetImpl<T>::SetGenerateUniqueFeasign(bool gen_uni_feasigns) {
128+
gen_uni_feasigns_ = gen_uni_feasigns;
129+
VLOG(3) << "Set generate unique feasigns: " << gen_uni_feasigns;
130+
}
131+
126132
template <typename T>
127133
void DatasetImpl<T>::SetFeaEval(bool fea_eval, int record_candidate_size) {
128134
slots_shuffle_fea_eval_ = fea_eval;
@@ -640,6 +646,85 @@ int DatasetImpl<T>::ReceiveFromClient(int msg_type, int client_id,
640646
// explicit instantiation
641647
template class DatasetImpl<Record>;
642648

649+
void MultiSlotDataset::GenerateLocalTablesUnlock(int table_id, int feadim,
650+
int read_thread_num,
651+
int consume_thread_num,
652+
int shard_num) {
653+
VLOG(3) << "MultiSlotDataset::GenerateUniqueFeasign begin";
654+
if (!gen_uni_feasigns_) {
655+
VLOG(3) << "generate_unique_feasign_=false, will not GenerateUniqueFeasign";
656+
return;
657+
}
658+
659+
CHECK(multi_output_channel_.size() != 0); // NOLINT
660+
auto fleet_ptr_ = FleetWrapper::GetInstance();
661+
std::vector<std::unordered_map<uint64_t, std::vector<float>>>&
662+
local_map_tables = fleet_ptr_->GetLocalTable();
663+
local_map_tables.resize(shard_num);
664+
// read thread
665+
int channel_num = multi_output_channel_.size();
666+
if (read_thread_num < channel_num) {
667+
read_thread_num = channel_num;
668+
}
669+
std::vector<std::thread> threads(read_thread_num);
670+
consume_task_pool_.resize(consume_thread_num);
671+
for (size_t i = 0; i < consume_task_pool_.size(); i++) {
672+
consume_task_pool_[i].reset(new ::ThreadPool(1));
673+
}
674+
auto consume_func = [&local_map_tables](int shard_id, int feadim,
675+
std::vector<uint64_t>& keys) {
676+
for (auto k : keys) {
677+
if (local_map_tables[shard_id].find(k) ==
678+
local_map_tables[shard_id].end()) {
679+
local_map_tables[shard_id][k] = std::vector<float>(feadim, 0);
680+
}
681+
}
682+
};
683+
auto gen_func = [this, &shard_num, &feadim, &local_map_tables,
684+
&consume_func](int i) {
685+
std::vector<Record> vec_data;
686+
std::vector<std::vector<uint64_t>> task_keys(shard_num);
687+
std::vector<std::future<void>> task_futures;
688+
this->multi_output_channel_[i]->Close();
689+
this->multi_output_channel_[i]->ReadAll(vec_data);
690+
for (size_t j = 0; j < vec_data.size(); j++) {
691+
for (auto& feature : vec_data[j].uint64_feasigns_) {
692+
int shard = feature.sign().uint64_feasign_ % shard_num;
693+
task_keys[shard].push_back(feature.sign().uint64_feasign_);
694+
}
695+
}
696+
697+
for (int shard_id = 0; shard_id < shard_num; shard_id++) {
698+
task_futures.emplace_back(consume_task_pool_[shard_id]->enqueue(
699+
consume_func, shard_id, feadim, task_keys[shard_id]));
700+
}
701+
702+
multi_output_channel_[i]->Open();
703+
multi_output_channel_[i]->Write(std::move(vec_data));
704+
vec_data.clear();
705+
vec_data.shrink_to_fit();
706+
for (auto& tk : task_keys) {
707+
tk.clear();
708+
std::vector<uint64_t>().swap(tk);
709+
}
710+
task_keys.clear();
711+
std::vector<std::vector<uint64_t>>().swap(task_keys);
712+
for (auto& tf : task_futures) {
713+
tf.wait();
714+
}
715+
};
716+
for (size_t i = 0; i < threads.size(); i++) {
717+
threads[i] = std::thread(gen_func, i);
718+
}
719+
for (std::thread& t : threads) {
720+
t.join();
721+
}
722+
for (size_t i = 0; i < consume_task_pool_.size(); i++) {
723+
consume_task_pool_[i].reset();
724+
}
725+
consume_task_pool_.clear();
726+
fleet_ptr_->PullSparseToLocal(table_id, feadim);
727+
}
643728
void MultiSlotDataset::MergeByInsId() {
644729
VLOG(3) << "MultiSlotDataset::MergeByInsId begin";
645730
if (!merge_by_insid_) {

paddle/fluid/framework/data_set.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,14 @@
1414

1515
#pragma once
1616

17+
#include <ThreadPool.h>
1718
#include <fstream>
1819
#include <memory>
1920
#include <mutex> // NOLINT
2021
#include <set>
2122
#include <string>
2223
#include <thread> // NOLINT
24+
#include <unordered_set>
2325
#include <utility>
2426
#include <vector>
2527

@@ -63,6 +65,7 @@ class Dataset {
6365
virtual void SetParseContent(bool parse_content) = 0;
6466
// set merge by ins id
6567
virtual void SetMergeByInsId(int merge_size) = 0;
68+
virtual void SetGenerateUniqueFeasign(bool gen_uni_feasigns) = 0;
6669
// set fea eval mode
6770
virtual void SetFeaEval(bool fea_eval, int record_candidate_size) = 0;
6871
// get file list
@@ -112,6 +115,11 @@ class Dataset {
112115
virtual int64_t GetShuffleDataSize() = 0;
113116
// merge by ins id
114117
virtual void MergeByInsId() = 0;
118+
virtual void GenerateLocalTablesUnlock(int table_id, int feadim,
119+
int read_thread_num,
120+
int consume_thread_num,
121+
int shard_num) = 0;
122+
virtual void ClearLocalTables() = 0;
115123
// create preload readers
116124
virtual void CreatePreLoadReaders() = 0;
117125
// destroy preload readers after prelaod done
@@ -148,7 +156,7 @@ class DatasetImpl : public Dataset {
148156
virtual void SetParseInsId(bool parse_ins_id);
149157
virtual void SetParseContent(bool parse_content);
150158
virtual void SetMergeByInsId(int merge_size);
151-
159+
virtual void SetGenerateUniqueFeasign(bool gen_uni_feasigns);
152160
virtual void SetFeaEval(bool fea_eval, int record_candidate_size);
153161
virtual const std::vector<std::string>& GetFileList() { return filelist_; }
154162
virtual int GetThreadNum() { return thread_num_; }
@@ -179,6 +187,11 @@ class DatasetImpl : public Dataset {
179187
virtual int64_t GetMemoryDataSize();
180188
virtual int64_t GetShuffleDataSize();
181189
virtual void MergeByInsId() {}
190+
virtual void GenerateLocalTablesUnlock(int table_id, int feadim,
191+
int read_thread_num,
192+
int consume_thread_num,
193+
int shard_num) {}
194+
virtual void ClearLocalTables() {}
182195
virtual void CreatePreLoadReaders();
183196
virtual void DestroyPreLoadReaders();
184197
virtual void SetPreLoadThreadNum(int thread_num);
@@ -195,13 +208,15 @@ class DatasetImpl : public Dataset {
195208
int channel_num_;
196209
std::vector<paddle::framework::Channel<T>> multi_output_channel_;
197210
std::vector<paddle::framework::Channel<T>> multi_consume_channel_;
211+
std::vector<std::unordered_set<uint64_t>> local_tables_;
198212
// when read ins, we put ins from one channel to the other,
199213
// and when finish reading, we set cur_channel = 1 - cur_channel,
200214
// so if cur_channel=0, all data are in output_channel, else consume_channel
201215
int cur_channel_;
202216
std::vector<T> slots_shuffle_original_data_;
203217
RecordCandidateList slots_shuffle_rclist_;
204218
int thread_num_;
219+
int pull_sparse_to_local_thread_num_;
205220
paddle::framework::DataFeedDesc data_feed_desc_;
206221
int trainer_num_;
207222
std::vector<std::string> filelist_;
@@ -217,16 +232,28 @@ class DatasetImpl : public Dataset {
217232
bool parse_content_;
218233
size_t merge_size_;
219234
bool slots_shuffle_fea_eval_ = false;
235+
bool gen_uni_feasigns_ = false;
220236
int preload_thread_num_;
221237
std::mutex global_index_mutex_;
222238
int64_t global_index_ = 0;
239+
std::vector<std::shared_ptr<ThreadPool>> consume_task_pool_;
223240
};
224241

225242
// use std::vector<MultiSlotType> or Record as data type
226243
class MultiSlotDataset : public DatasetImpl<Record> {
227244
public:
228245
MultiSlotDataset() {}
229246
virtual void MergeByInsId();
247+
virtual void GenerateLocalTablesUnlock(int table_id, int feadim,
248+
int read_thread_num,
249+
int consume_thread_num, int shard_num);
250+
virtual void ClearLocalTables() {
251+
for (auto& t : local_tables_) {
252+
t.clear();
253+
std::unordered_set<uint64_t>().swap(t);
254+
}
255+
std::vector<std::unordered_set<uint64_t>>().swap(local_tables_);
256+
}
230257
virtual void SlotsShuffle(const std::set<std::string>& slots_to_replace);
231258
virtual void GetRandomData(const std::set<uint16_t>& slots_to_replace,
232259
std::vector<Record>* result);

paddle/fluid/framework/device_worker.h

Lines changed: 59 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -207,54 +207,80 @@ class DownpourWorker : public HogwildWorker {
207207
void CopySparseTable();
208208
void CopyDenseTable();
209209
void CopyDenseVars();
210-
211-
private:
212-
bool need_dump_param_;
213-
std::vector<std::string> dump_param_;
214-
bool need_to_push_dense_;
215-
bool need_dump_field_;
216-
bool dump_slot_;
217-
bool need_to_push_sparse_;
218-
std::vector<std::string> dump_fields_;
219-
ChannelWriter<std::string> writer_;
210+
std::string PrintLodTensor(LoDTensor* tensor, int64_t start, int64_t end);
211+
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index);
212+
bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);
220213
DownpourWorkerParameter param_;
221-
float scale_datanorm_;
222-
// just save the value in param_ for easy access
223-
std::map<uint64_t, std::string> label_var_name_;
224-
std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
225-
std::map<uint64_t, std::vector<std::string>> sparse_value_names_;
226-
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
227-
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
228-
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
214+
// copy table
215+
CopyTableConfig copy_table_config_;
216+
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
217+
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
229218
// actually pushed feasign of each table
230219
std::map<uint64_t, std::vector<uint64_t>> sparse_push_keys_;
231-
220+
std::map<uint64_t, std::vector<std::string>> sparse_key_names_;
232221
// feasign
233222
std::map<uint64_t, std::vector<uint64_t>> features_;
234-
// feasign stats
235-
std::map<uint64_t, std::vector<float>> feature_labels_;
236223
// feasign embedding
237224
std::map<uint64_t, std::vector<std::vector<float>>> feature_values_;
225+
std::map<uint64_t, std::vector<std::string>> sparse_value_names_;
226+
// adjust ins weight
227+
AdjustInsWeightConfig adjust_ins_weight_config_;
228+
// check nan and inf during training
229+
std::vector<std::string> check_nan_var_names_;
230+
bool need_to_push_sparse_;
231+
// feasign stats
232+
std::map<uint64_t, std::vector<float>> feature_labels_;
233+
std::map<uint64_t, std::vector<std::string>> sparse_grad_names_;
238234
// feasign embedding gradient
239235
std::map<uint64_t, std::vector<std::vector<float>>> feature_grads_;
236+
std::vector<::std::future<int32_t>> push_sparse_status_;
237+
bool dump_slot_;
238+
bool need_to_push_dense_;
239+
bool need_dump_field_;
240+
bool need_dump_param_;
241+
std::map<uint64_t, std::vector<std::string>> dense_grad_names_;
242+
float scale_datanorm_;
243+
std::vector<::std::future<int32_t>> push_dense_status_;
244+
std::vector<std::string> dump_fields_;
245+
ChannelWriter<std::string> writer_;
240246
// skipped ops
241247
std::vector<std::string> skip_ops_;
248+
std::vector<std::string> dump_param_;
249+
// just save the value in param_ for easy access
250+
std::map<uint64_t, std::string> label_var_name_;
251+
std::map<uint64_t, std::vector<std::string>> dense_value_names_;
252+
std::map<uint64_t, uint64_t> table_dependency_;
253+
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
254+
255+
private:
256+
// std::vector<std::string> dump_param_;
257+
// just save the value in param_ for easy access
258+
// std::map<uint64_t, std::string> label_var_name_;
259+
// std::map<uint64_t, std::vector<std::string>> dense_value_names_;
242260

243261
std::shared_ptr<PullDenseWorker> _pull_dense_worker;
244-
std::vector<::std::future<int32_t>> push_sparse_status_;
245-
std::vector<::std::future<int32_t>> push_dense_status_;
246262

247-
// adjust ins weight
248-
AdjustInsWeightConfig adjust_ins_weight_config_;
249263
std::vector<float> nid_show_;
250-
// check nan and inf during training
251-
std::vector<std::string> check_nan_var_names_;
252-
// copy table
253-
CopyTableConfig copy_table_config_;
254-
std::map<uint64_t, uint64_t> table_dependency_;
255-
std::vector<std::pair<uint64_t, uint64_t>> copy_sparse_tables_;
256-
std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
257-
std::unordered_map<uint64_t, std::unordered_set<uint64_t>> feasign_set_;
264+
// std::map<uint64_t, uint64_t> table_dependency_;
265+
// std::vector<std::pair<uint64_t, uint64_t>> copy_dense_tables_;
266+
};
267+
268+
class DownpourWorkerOpt : public DownpourWorker {
269+
public:
270+
DownpourWorkerOpt() {}
271+
virtual ~DownpourWorkerOpt() {}
272+
virtual void CreateDeviceResource(const ProgramDesc& main_prog);
273+
virtual void Initialize(const TrainerDesc& desc);
274+
virtual void TrainFiles();
275+
276+
protected:
277+
void CreateThreadOperatorsWithRerank(const ProgramDesc& program);
278+
std::vector<std::vector<OperatorBase*>> loss_ops_;
279+
std::vector<std::vector<std::string>> loss_op_names_;
280+
std::vector<std::string> loss_names_;
281+
std::string async_wait_name_;
282+
int async_index_ = -1;
283+
uint64_t async_tid_ = 0;
258284
};
259285

260286
#if defined(PADDLE_WITH_NCCL)

paddle/fluid/framework/device_worker_factory.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ std::shared_ptr<DeviceWorker> DeviceWorkerFactory::CreateDeviceWorker(
6161

6262
REGISTER_DEVICE_WORKER_CLASS(HogwildWorker);
6363
REGISTER_DEVICE_WORKER_CLASS(DownpourWorker);
64+
REGISTER_DEVICE_WORKER_CLASS(DownpourWorkerOpt);
6465
#if defined(PADDLE_WITH_NCCL)
6566
REGISTER_DEVICE_WORKER_CLASS(SectionWorker);
6667
#endif

paddle/fluid/framework/downpour_worker.cc

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,8 @@ std::string PrintLodTensorIntType(LoDTensor* tensor, int64_t start,
157157
return os.str();
158158
}
159159

160-
std::string PrintLodTensor(LoDTensor* tensor, int64_t start, int64_t end) {
160+
std::string DownpourWorker::PrintLodTensor(LoDTensor* tensor, int64_t start,
161+
int64_t end) {
161162
std::string out_val;
162163
if (tensor->type() == proto::VarType::FP32) {
163164
out_val = PrintLodTensorType<float>(tensor, start, end);
@@ -171,7 +172,8 @@ std::string PrintLodTensor(LoDTensor* tensor, int64_t start, int64_t end) {
171172
return out_val;
172173
}
173174

174-
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index) {
175+
std::pair<int64_t, int64_t> DownpourWorker::GetTensorBound(LoDTensor* tensor,
176+
int index) {
175177
auto& dims = tensor->dims();
176178
if (tensor->lod().size() != 0) {
177179
auto& lod = tensor->lod()[0];
@@ -181,7 +183,7 @@ std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index) {
181183
}
182184
}
183185

184-
bool CheckValidOutput(LoDTensor* tensor, size_t batch_size) {
186+
bool DownpourWorker::CheckValidOutput(LoDTensor* tensor, size_t batch_size) {
185187
auto& dims = tensor->dims();
186188
if (dims.size() != 2) return false;
187189
if (tensor->lod().size() != 0) {

0 commit comments

Comments
 (0)