Skip to content

Commit 893ea7e

Browse files
authored
[cherry-pick] find lookup table in order & support dump param (#21347)
* support dump param of model into afs (#20302) * support dump param to afs test=develop * code style test=develop * code style test=develop * dump param test=develop * dump param test=develop * dump param test=develop * dump param test=develop * find lookup table in order (#20932) test=develop * cherry-pick test=develop * solve pslib core in stop worker test=develop * print table stat info for pslib test=develop
1 parent 5dbe9e5 commit 893ea7e

File tree

14 files changed

+670
-45
lines changed

14 files changed

+670
-45
lines changed

paddle/fluid/framework/device_worker.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ class PullDenseWorker {
105105
// should incorporate different type of device
106106
class DeviceWorker {
107107
public:
108-
DeviceWorker() { use_cvm_ = false; }
108+
DeviceWorker() {
109+
no_cvm_ = true;
110+
use_cvm_ = false;
111+
}
109112
virtual ~DeviceWorker() {}
110113
virtual void Initialize(const TrainerDesc& desc) = 0;
111114
virtual void SetDeviceIndex(int tid) = 0;
@@ -135,6 +138,7 @@ class DeviceWorker {
135138
int64_t batch_num_;
136139
FetchConfig fetch_config_;
137140
bool use_cvm_;
141+
bool no_cvm_;
138142
};
139143

140144
class CPUWorkerBase : public DeviceWorker {
@@ -203,6 +207,8 @@ class DownpourWorker : public HogwildWorker {
203207
void CopyDenseVars();
204208

205209
private:
210+
bool need_dump_param_;
211+
std::vector<std::string> dump_param_;
206212
bool need_to_push_dense_;
207213
bool need_dump_field_;
208214
bool dump_slot_;

paddle/fluid/framework/downpour_worker.cc

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,23 @@ void DownpourWorker::Initialize(const TrainerDesc& desc) {
7575
fleet_ptr_ = FleetWrapper::GetInstance();
7676
fetch_config_ = desc.fetch_config();
7777
use_cvm_ = desc.use_cvm();
78+
// for sparse value accessor, embedding only
79+
no_cvm_ = desc.no_cvm();
7880
scale_datanorm_ = desc.scale_datanorm();
7981
dump_slot_ = desc.dump_slot();
8082
dump_fields_.resize(desc.dump_fields_size());
8183
for (int i = 0; i < desc.dump_fields_size(); ++i) {
8284
dump_fields_[i] = desc.dump_fields(i);
8385
}
8486
adjust_ins_weight_config_ = desc.adjust_ins_weight_config();
87+
need_dump_param_ = false;
88+
dump_param_.resize(desc.dump_param_size());
89+
for (int i = 0; i < desc.dump_param_size(); ++i) {
90+
dump_param_[i] = desc.dump_param(i);
91+
}
92+
if (desc.dump_param_size() != 0) {
93+
need_dump_param_ = true;
94+
}
8595
for (int i = 0; i < desc.check_nan_var_names_size(); ++i) {
8696
check_nan_var_names_.push_back(desc.check_nan_var_names(i));
8797
}
@@ -186,7 +196,26 @@ bool CheckValidOutput(LoDTensor* tensor, int batch_size) {
186196
return true;
187197
}
188198

199+
void DownpourWorker::DumpParam() {
200+
std::string os;
201+
for (auto& param : dump_param_) {
202+
os.clear();
203+
os = param;
204+
Variable* var = thread_scope_->FindVar(param);
205+
if (var == nullptr) {
206+
continue;
207+
}
208+
LoDTensor* tensor = var->GetMutable<LoDTensor>();
209+
int64_t len = tensor->numel();
210+
os += PrintLodTensor(tensor, 0, len);
211+
writer_ << os;
212+
}
213+
}
214+
189215
void DownpourWorker::CollectLabelInfo(size_t table_idx) {
216+
if (no_cvm_) {
217+
return;
218+
}
190219
uint64_t table_id = static_cast<uint64_t>(
191220
param_.program_config(0).pull_sparse_table_id(table_idx));
192221

@@ -288,7 +317,7 @@ void DownpourWorker::FillSparseValue(size_t table_idx) {
288317
int nid_ins_index = 0;
289318

290319
for (int index = 0; index < len; ++index) {
291-
if (use_cvm_) {
320+
if (use_cvm_ || no_cvm_) {
292321
if (ids[index] == 0u) {
293322
memcpy(ptr + table.emb_dim() * index, init_value.data(),
294323
sizeof(float) * table.emb_dim());
@@ -657,7 +686,7 @@ void DownpourWorker::TrainFilesWithProfiler() {
657686
*thread_scope_, tid, features_[tid], feature_labels_[tid],
658687
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
659688
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
660-
dump_slot_, &sparse_push_keys_[tid]);
689+
dump_slot_, &sparse_push_keys_[tid], no_cvm_);
661690
timeline.Pause();
662691
push_sparse_time += timeline.ElapsedSec();
663692
total_time += timeline.ElapsedSec();
@@ -882,7 +911,7 @@ void DownpourWorker::TrainFiles() {
882911
*thread_scope_, tid, features_[tid], feature_labels_[tid],
883912
sparse_key_names_[tid], sparse_grad_names_[tid], table.emb_dim(),
884913
&feature_grads_[tid], &push_sparse_status_, cur_batch, use_cvm_,
885-
dump_slot_, &sparse_push_keys_[tid]);
914+
dump_slot_, &sparse_push_keys_[tid], no_cvm_);
886915
}
887916
}
888917

@@ -977,6 +1006,9 @@ void DownpourWorker::TrainFiles() {
9771006
}
9781007
writer_ << ars[i];
9791008
}
1009+
if (need_dump_param_ && thread_id_ == 0) {
1010+
DumpParam();
1011+
}
9801012
}
9811013

9821014
PrintFetchVars();

paddle/fluid/framework/fleet/fleet_wrapper.cc

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,13 @@ void FleetWrapper::StopServer() {
9191
#endif
9292
}
9393

94+
void FleetWrapper::FinalizeWorker() {
95+
#ifdef PADDLE_WITH_PSLIB
96+
VLOG(3) << "Going to finalize worker";
97+
pslib_ptr_->finalize_worker();
98+
#endif
99+
}
100+
94101
uint64_t FleetWrapper::RunServer() {
95102
#ifdef PADDLE_WITH_PSLIB
96103
VLOG(3) << "Going to run server";
@@ -303,7 +310,7 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
303310
std::vector<std::vector<float>>* push_values,
304311
std::vector<::std::future<int32_t>>* push_sparse_status,
305312
const int batch_size, const bool use_cvm, const bool dump_slot,
306-
std::vector<uint64_t>* sparse_push_keys) {
313+
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm) {
307314
#ifdef PADDLE_WITH_PSLIB
308315
int offset = 2;
309316
int slot_offset = 0;
@@ -314,6 +321,10 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
314321
offset = 0;
315322
grad_dim = emb_dim - 2;
316323
}
324+
if (no_cvm) {
325+
offset = 0;
326+
grad_dim = emb_dim;
327+
}
317328
if (dump_slot) {
318329
slot_offset = 1;
319330
show_index = 1;
@@ -370,12 +381,12 @@ void FleetWrapper::PushSparseVarsWithLabelAsync(
370381
}
371382
sparse_push_keys->push_back(ids[id_idx]);
372383
CHECK(fea_idx < (*push_values).size());
373-
CHECK(fea_idx < fea_labels.size());
374384

375-
if (use_cvm) {
385+
if (use_cvm || no_cvm) {
376386
memcpy((*push_values)[fea_idx].data() + offset + slot_offset, g,
377387
sizeof(float) * emb_dim);
378388
} else {
389+
CHECK(fea_idx < fea_labels.size());
379390
memcpy((*push_values)[fea_idx].data() + offset + slot_offset, g,
380391
sizeof(float) * emb_dim);
381392
(*push_values)[fea_idx][show_index] = 1.0f;
@@ -549,6 +560,19 @@ void FleetWrapper::SaveModel(const std::string& path, const int mode) {
549560
#endif
550561
}
551562

563+
void FleetWrapper::PrintTableStat(const uint64_t table_id) {
564+
#ifdef PADDLE_WITH_PSLIB
565+
auto ret = pslib_ptr_->_worker_ptr->print_table_stat(table_id);
566+
ret.wait();
567+
int32_t err_code = ret.get();
568+
if (err_code == -1) {
569+
LOG(ERROR) << "print table stat failed";
570+
}
571+
#else
572+
VLOG(0) << "FleetWrapper::PrintTableStat does nothing when no pslib";
573+
#endif
574+
}
575+
552576
double FleetWrapper::GetCacheThreshold(int table_id) {
553577
#ifdef PADDLE_WITH_PSLIB
554578
double cache_threshold = 0.0;

paddle/fluid/framework/fleet/fleet_wrapper.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ class FleetWrapper {
124124
std::vector<std::vector<float>>* push_values,
125125
std::vector<::std::future<int32_t>>* push_sparse_status,
126126
const int batch_size, const bool use_cvm, const bool dump_slot,
127-
std::vector<uint64_t>* sparse_push_keys);
127+
std::vector<uint64_t>* sparse_push_keys, const bool no_cvm);
128128

129129
// Push sparse variables to server in Async mode
130130
// Param<In>: scope, table_id, fea_keys, sparse_grad_names
@@ -147,6 +147,8 @@ class FleetWrapper {
147147
int index);
148148
// stop server
149149
void StopServer();
150+
// finalize worker to make worker can be stop
151+
void FinalizeWorker();
150152
// run server
151153
uint64_t RunServer();
152154
// gather server ip
@@ -165,6 +167,8 @@ class FleetWrapper {
165167
std::string model_path, std::string model_proto_file,
166168
std::vector<std::string> table_var_list,
167169
bool load_combine);
170+
171+
void PrintTableStat(const uint64_t table_id);
168172
// mode = 0, load all feature
169173
// mode = 1, laod delta feature, which means load diff
170174
void LoadModel(const std::string& path, const int mode);

paddle/fluid/framework/trainer.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,6 @@ class DistMultiTrainer : public MultiTrainer {
105105
bool need_dump_field_;
106106
std::string dump_fields_path_;
107107
std::string dump_converter_;
108-
std::vector<std::string> dump_fields_;
109108
int mpi_rank_;
110109
int mpi_size_;
111110
int dump_file_num_;

paddle/fluid/framework/trainer_desc.proto

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,14 @@ message TrainerDesc {
4040
repeated string dump_fields = 13;
4141
optional string dump_converter = 14;
4242
repeated string dump_param = 15;
43+
4344
optional int32 mpi_size = 16 [ default = -1 ];
4445
optional int32 dump_file_num = 17 [ default = 16 ];
4546
repeated string check_nan_var_names = 18;
4647
optional CopyTableConfig copy_table_config = 19;
4748
// adjust ins weight
4849
optional AdjustInsWeightConfig adjust_ins_weight_config = 20;
50+
optional bool no_cvm = 21 [ default = false ];
4951

5052
// device worker parameters
5153
optional HogwildWorkerParameter hogwild_param = 101;

paddle/fluid/pybind/fleet_wrapper_py.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,15 @@ void BindFleetWrapper(py::module* m) {
5555
.def("load_model", &framework::FleetWrapper::LoadModel)
5656
.def("clear_model", &framework::FleetWrapper::ClearModel)
5757
.def("stop_server", &framework::FleetWrapper::StopServer)
58+
.def("finalize_worker", &framework::FleetWrapper::FinalizeWorker)
5859
.def("gather_servers", &framework::FleetWrapper::GatherServers)
5960
.def("gather_clients", &framework::FleetWrapper::GatherClients)
6061
.def("get_clients_info", &framework::FleetWrapper::GetClientsInfo)
6162
.def("create_client2client_connection",
6263
&framework::FleetWrapper::CreateClient2ClientConnection)
6364
.def("shrink_sparse_table", &framework::FleetWrapper::ShrinkSparseTable)
6465
.def("shrink_dense_table", &framework::FleetWrapper::ShrinkDenseTable)
66+
.def("print_table_stat", &framework::FleetWrapper::PrintTableStat)
6567
.def("client_flush", &framework::FleetWrapper::ClientFlush)
6668
.def("load_from_paddle_model",
6769
&framework::FleetWrapper::LoadFromPaddleModel)

python/paddle/fluid/device_worker.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ def _gen_worker_desc(self, trainer_desc):
160160
.sparse_table[i].slot_value)
161161
sparse_table.sparse_grad_name.extend(worker.get_desc().sparse_table[
162162
i].slot_gradient)
163-
if opt_info["use_cvm"]:
163+
if opt_info["use_cvm"] or "no_cvm" in opt_info and opt_info[
164+
"no_cvm"] == True:
164165
sparse_table.emb_dim = \
165166
self._fleet_desc.server_param.downpour_server_param.downpour_table_param[
166167
i].accessor.fea_dim

python/paddle/fluid/incubate/fleet/parameter_server/pslib/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ def stop_worker(self):
182182
destroyed when stop() is called.
183183
"""
184184
self._role_maker._barrier_worker()
185+
# all worker should be finalize first
186+
if self._role_maker.is_worker():
187+
self._fleet_ptr.finalize_worker()
188+
self._role_maker._barrier_worker()
185189
if self._role_maker.is_first_worker():
186190
self._fleet_ptr.stop_server()
187191
self._role_maker._barrier_worker()
@@ -234,6 +238,25 @@ def save_inference_model(self,
234238
"""
235239
self._fleet_ptr.save_model(dirname)
236240

241+
def print_table_stat(self, table_id):
242+
"""
243+
print stat info of table_id,
244+
format: tableid, feasign size, mf size
245+
246+
Args:
247+
table_id(int): the id of table
248+
249+
Example:
250+
.. code-block:: python
251+
252+
fleet.print_table_stat(0)
253+
254+
"""
255+
self._role_maker._barrier_worker()
256+
if self._role_maker.is_first_worker():
257+
self._fleet_ptr.print_table_stat(table_id)
258+
self._role_maker._barrier_worker()
259+
237260
def save_persistables(self, executor, dirname, main_program=None, **kwargs):
238261
"""
239262
save presistable parameters,

python/paddle/fluid/incubate/fleet/parameter_server/pslib/node.py

Lines changed: 70 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,8 @@ def add_sparse_table(self, table_id, strategy):
8080
'sparse_click_coeff', 'sparse_base_threshold', 'sparse_delta_threshold', 'sparse_delta_keep_days', \
8181
'sparse_delete_after_unseen_days', 'sparse_show_click_decay_rate', 'sparse_delete_threshold', \
8282
'sparse_converter', 'sparse_deconverter', 'sparse_enable_cache', 'sparse_cache_rate', \
83-
'sparse_cache_file_num']
83+
'sparse_cache_file_num', 'sparse_beta1_decay_rate', 'sparse_beta2_decay_rate', \
84+
'sparse_ada_epsilon', 'sparse_optimizer']
8485

8586
for key in strategy:
8687
if key not in support_sparse_key_list:
@@ -108,9 +109,13 @@ def add_sparse_table(self, table_id, strategy):
108109
table.compress_in_save = strategy.get('sparse_compress_in_save',
109110
True)
110111
table.shard_num = strategy.get('sparse_shard_num', 1000)
112+
# DownpourFeatureValueAccessor: for ctr task, has cvm, embedding and sgd info
113+
# DownpourCtrAccessor : for ctr task, has cvm, slot, embedding and sgd info
114+
# DownpourSparseValueAccessor : for general task, has embedding and sgd info
111115

112116
support_accessor_class = [
113-
'DownpourFeatureValueAccessor', 'DownpourCtrAccessor'
117+
'DownpourFeatureValueAccessor', 'DownpourCtrAccessor',
118+
'DownpourSparseValueAccessor'
114119
]
115120
if strategy.get('sparse_accessor_class') is not None:
116121
accessor_class = strategy.get('sparse_accessor_class')
@@ -169,6 +174,69 @@ def add_sparse_table(self, table_id, strategy):
169174
table1.converter = converter
170175
table1.deconverter = deconverter
171176

177+
table2 = table.accessor.table_accessor_save_param.add()
178+
table2.param = 2
179+
table2.converter = converter
180+
table2.deconverter = deconverter
181+
elif accessor_class == 'DownpourSparseValueAccessor':
182+
optimizer_name = strategy.get("sparse_optimizer", "adam")
183+
table.accessor.sparse_commonsgd_param.name = optimizer_name
184+
table.accessor.embedx_dim = strategy.get('sparse_embedx_dim', 8)
185+
table.accessor.fea_dim = int(table.accessor.embedx_dim)
186+
if optimizer_name == "naive":
187+
table.accessor.sparse_commonsgd_param.naive.learning_rate = \
188+
strategy.get('sparse_learning_rate', 0.05)
189+
table.accessor.sparse_commonsgd_param.naive.initial_range = \
190+
strategy.get('sparse_initial_range', 1e-4)
191+
if strategy.get('sparse_weight_bounds') is None:
192+
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
193+
[-10, 10])
194+
else:
195+
table.accessor.sparse_commonsgd_param.naive.weight_bounds.extend(
196+
strategy.get('sparse_weight_bounds'))
197+
elif optimizer_name == "adagrad":
198+
table.accessor.sparse_commonsgd_param.adagrad.learning_rate = \
199+
strategy.get('sparse_learning_rate', 0.05)
200+
table.accessor.sparse_commonsgd_param.adagrad.initial_range = \
201+
strategy.get('sparse_initial_range', 1e-4)
202+
table.accessor.sparse_commonsgd_param.adagrad.initial_g2sum = strategy.get(
203+
'sparse_initial_g2sum', 3)
204+
if strategy.get('sparse_weight_bounds') is None:
205+
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
206+
[-10, 10])
207+
else:
208+
table.accessor.sparse_commonsgd_param.adagrad.weight_bounds.extend(
209+
strategy.get('sparse_weight_bounds'))
210+
elif optimizer_name == "adam":
211+
table.accessor.sparse_commonsgd_param.adam.learning_rate = \
212+
strategy.get('sparse_learning_rate', 0.001)
213+
table.accessor.sparse_commonsgd_param.adam.initial_range = \
214+
strategy.get('sparse_initial_range', 1e-4)
215+
table.accessor.sparse_commonsgd_param.adam.beta1_decay_rate = strategy.get(
216+
'sparse_beta1_decay_rate', 0.9)
217+
table.accessor.sparse_commonsgd_param.adam.beta2_decay_rate = strategy.get(
218+
'sparse_beta2_decay_rate', 0.999)
219+
table.accessor.sparse_commonsgd_param.adam.ada_epsilon = strategy.get(
220+
'sparse_ada_epsilon', 1e-8)
221+
if strategy.get('sparse_weight_bounds') is None:
222+
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
223+
[-10, 10])
224+
else:
225+
table.accessor.sparse_commonsgd_param.adam.weight_bounds.extend(
226+
strategy.get('sparse_weight_bounds'))
227+
converter = strategy.get(
228+
'sparse_converter',
229+
"(scripts/xbox_compressor_mf.py | bin/xbox_pb_converter)")
230+
deconverter = strategy.get(
231+
'sparse_deconverter',
232+
"(bin/xbox_pb_deconverter | scripts/xbox_decompressor_mf.awk)"
233+
)
234+
235+
table1 = table.accessor.table_accessor_save_param.add()
236+
table1.param = 1
237+
table1.converter = converter
238+
table1.deconverter = deconverter
239+
172240
table2 = table.accessor.table_accessor_save_param.add()
173241
table2.param = 2
174242
table2.converter = converter

0 commit comments

Comments
 (0)