Skip to content

Commit 186bbeb

Browse files
authored
【Paddle.Fleet】【Cherry-Pick】fix grad_clip & gaussian_random & dataset & profiler (#31945)
* Remove PE special profiler (#30886) * remove pe special profiler * add profiler info * add truncated gaussian random (#30922) add truncated gaussian random * 【Paddle.Fleet】fix dataset zip py3 bug (#31441) * fix zip py3 bug * 【Paddle.Fleet】Fix one ps gradient clip (#31664) * fix one ps gradient clip
1 parent 8140485 commit 186bbeb

File tree

12 files changed

+163
-59
lines changed

12 files changed

+163
-59
lines changed

paddle/fluid/distributed/service/communicator.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,7 @@ void Communicator::SendGlobalStep(const CommContext &ctx, int batches,
385385
if (batches == 0) {
386386
return;
387387
}
388+
platform::RecordEvent record_event("Communicator->SendGlobalStep");
388389
auto &table_id = ctx.table_id;
389390
size_t request_call_num = _worker_ptr->get_server_nums();
390391

@@ -788,6 +789,7 @@ void SyncCommunicator::BarrierRecv() {
788789

789790
void GeoCommunicator::Send(const std::vector<std::string> &var_names,
790791
const framework::Scope &scope) {
792+
platform::RecordEvent record_event("GeoCommunicator->Send");
791793
waiting_ = false;
792794
auto before_send = GetCurrentUS();
793795
auto table_name = var_names[0];
@@ -1024,6 +1026,7 @@ void GeoCommunicator::InitSparse(const std::string &var_name, int table_id) {
10241026

10251027
std::vector<int64_t> GeoCommunicator::MergeSparseIds(
10261028
const std::string &send_varname) {
1029+
platform::RecordEvent record_event("GeoCommunicator->MergeSparseIds");
10271030
size_t merge_num = 0, wait_times = 0;
10281031
std::unordered_set<int64_t> sparse_ids;
10291032
while (merge_num < static_cast<size_t>(max_merge_var_num_)) {

paddle/fluid/distributed/table/common_dense_table.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ void CommonDenseTable::create_initializer(const std::string& attr,
2828
initializers_[name] = new FillConstantInitializer(slices);
2929
} else if (slices[0] == "uniform_random") {
3030
initializers_[name] = new UniformInitializer(slices);
31+
} else if (slices[0] == "truncated_gaussian_random") {
32+
initializers_[name] = new TruncatedGaussianInitializer(slices);
3133
} else {
3234
PADDLE_THROW(
3335
platform::errors::InvalidArgument("%s can not be supported", name));

paddle/fluid/distributed/table/depends/initializers.h

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
#include <gflags/gflags.h>
1818
#include <functional>
1919
#include <memory>
20+
#include <random>
2021
#include <string>
2122
#include <utility>
2223
#include <vector>
2324

2425
#include "paddle/fluid/framework/generator.h"
2526

27+
#include "paddle/fluid/operators/truncated_gaussian_random_op.h"
28+
2629
namespace paddle {
2730
namespace distributed {
2831

@@ -108,6 +111,40 @@ class GaussianInitializer : public Initializer {
108111
std::normal_distribution<float> dist_;
109112
};
110113

114+
class TruncatedGaussianInitializer : public Initializer {
115+
public:
116+
explicit TruncatedGaussianInitializer(const std::vector<std::string> &attrs) {
117+
name_ = attrs[0];
118+
seed_ = static_cast<unsigned int>(std::stoi(attrs[1]));
119+
mean_ = std::stof(attrs[2]);
120+
std_ = std::stof(attrs[3]);
121+
122+
std::uniform_real_distribution<float> dist_(
123+
std::numeric_limits<float>::min(), 1.0);
124+
random_engine_ = framework::GetCPURandomEngine(seed_);
125+
}
126+
127+
float GetValue() override {
128+
paddle::operators::TruncatedNormal<float> truncated_normal(mean_, std_);
129+
float value = truncated_normal(dist_(*random_engine_));
130+
return value;
131+
}
132+
133+
void GetValue(float *value, int numel) {
134+
paddle::operators::TruncatedNormal<float> truncated_normal(mean_, std_);
135+
for (int x = 0; x < numel; ++x) {
136+
value[x] = truncated_normal(dist_(*random_engine_));
137+
}
138+
}
139+
140+
private:
141+
float std_;
142+
float mean_;
143+
144+
std::shared_ptr<std::mt19937_64> random_engine_;
145+
std::uniform_real_distribution<float> dist_;
146+
};
147+
111148
class FillConstantInitializer : public Initializer {
112149
public:
113150
explicit FillConstantInitializer(const std::vector<std::string> &attrs) {

paddle/fluid/distributed/table/depends/large_scale_kv.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,9 @@ class ValueBlock {
125125
} else if (slices[0] == "uniform_random") {
126126
initializers_.emplace_back(
127127
std::make_shared<UniformInitializer>(slices));
128+
} else if (slices[0] == "truncated_gaussian_random") {
129+
initializers_.emplace_back(
130+
std::make_shared<TruncatedGaussianInitializer>(slices));
128131
} else {
129132
PADDLE_THROW(platform::errors::InvalidArgument(
130133
"%s can not be supported", attr));

paddle/fluid/framework/parallel_executor.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1121,8 +1121,6 @@ void ParallelExecutor::BCastParamsToDevices(
11211121
FetchResultType ParallelExecutor::Run(
11221122
const std::vector<std::string> &fetch_tensors, bool return_merged) {
11231123
VLOG(3) << "enter ParallelExecutor Run";
1124-
platform::RecordEvent parallel_executor_event(
1125-
"ParallelExecutor::Run", paddle::platform::EventRole::kSpecial);
11261124
#ifdef WITH_GPERFTOOLS
11271125
if (gProfileStarted) {
11281126
ProfilerFlush();

python/paddle/distributed/fleet/data_generator/data_generator.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ def set_batch(self, batch_size):
3232
'''
3333
Set batch size of current DataGenerator
3434
This is necessary only if a user wants to define generator_batch
35-
35+
3636
Example:
3737
3838
.. code-block:: python
39-
39+
4040
import paddle.distributed.fleet.data_generator as dg
4141
class MyData(dg.DataGenerator):
4242
@@ -52,7 +52,7 @@ def local_iter():
5252
yield ("words", s[1].extend([s[1][0]]))
5353
mydata = MyData()
5454
mydata.set_batch(128)
55-
55+
5656
'''
5757
self.batch_size_ = batch_size
5858

@@ -63,7 +63,7 @@ def run_from_memory(self):
6363
6464
Example:
6565
.. code-block:: python
66-
66+
6767
import paddle.distributed.fleet.data_generator as dg
6868
class MyData(dg.DataGenerator):
6969
@@ -100,9 +100,9 @@ def run_from_stdin(self):
100100
generated.
101101
102102
Example:
103-
103+
104104
.. code-block:: python
105-
105+
106106
import paddle.distributed.fleet.data_generator as dg
107107
class MyData(dg.DataGenerator):
108108
@@ -161,7 +161,7 @@ def generate_sample(self, line):
161161
The data format is list or tuple:
162162
[(name, [feasign, ...]), ...]
163163
or ((name, [feasign, ...]), ...)
164-
164+
165165
For example:
166166
[("words", [1926, 08, 17]), ("label", [1])]
167167
or (("words", [1926, 08, 17]), ("label", [1]))
@@ -174,7 +174,7 @@ def generate_sample(self, line):
174174
Example:
175175
176176
.. code-block:: python
177-
177+
178178
import paddle.distributed.fleet.data_generator as dg
179179
class MyData(dg.DataGenerator):
180180
@@ -206,7 +206,7 @@ def generate_batch(self, samples):
206206
Example:
207207
208208
.. code-block:: python
209-
209+
210210
import paddle.distributed.fleet.data_generator as dg
211211
class MyData(dg.DataGenerator):
212212
@@ -259,6 +259,9 @@ def _gen_str(self, line):
259259
Returns:
260260
Return a string data that can be read directly by the MultiSlotDataFeed.
261261
'''
262+
if sys.version > '3' and isinstance(line, zip):
263+
line = list(line)
264+
262265
if not isinstance(line, list) and not isinstance(line, tuple):
263266
raise ValueError(
264267
"the output of process() must be in list or tuple type"
@@ -289,7 +292,7 @@ def _gen_str(self, line):
289292
>>> [ids_num id1 id2 ...] ...
290293
The proto_info will be in this format:
291294
>>> [(name, type), ...]
292-
295+
293296
For example, if the input is like this:
294297
>>> [("words", [1926, 08, 17]), ("label", [1])]
295298
>>> or (("words", [1926, 08, 17]), ("label", [1]))
@@ -304,6 +307,9 @@ def _gen_str(self, line):
304307
Returns:
305308
Return a string data that can be read directly by the MultiSlotDataFeed.
306309
'''
310+
if sys.version > '3' and isinstance(line, zip):
311+
line = list(line)
312+
307313
if not isinstance(line, list) and not isinstance(line, tuple):
308314
raise ValueError(
309315
"the output of process() must be in list or tuple type"

python/paddle/distributed/fleet/runtime/the_one_ps.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,8 @@ def parse_by_optimizer(self, grad_name, is_sparse, total_dims,
150150
oop = None
151151

152152
for op in optimizer_ops:
153-
if op.input("Param")[0] == param_name:
153+
if ("Param" in op.input_names) and (
154+
op.input("Param")[0] == param_name):
154155
oop = op
155156
break
156157

python/paddle/fluid/incubate/fleet/parameter_server/ir/public.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
from paddle.fluid.transpiler.details.program_utils import delete_ops
3232

3333
OP_NAME_SCOPE = "op_namescope"
34-
CLIP_OP_NAME_SCOPE = "@CLIP"
34+
CLIP_OP_NAME_SCOPE = "gradient_clip"
3535
STEP_COUNTER = "@PS_STEP_COUNTER@"
3636
LEARNING_RATE_DECAY_COUNTER = "@LR_DECAY_COUNTER@"
3737

python/paddle/fluid/incubate/fleet/parameter_server/ir/trainer_pass.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from paddle.fluid.incubate.fleet.parameter_server.mode import DistributedMode
3333

3434
OP_NAME_SCOPE = "op_namescope"
35-
CLIP_OP_NAME_SCOPE = "@CLIP"
35+
CLIP_OP_NAME_SCOPE = "gradient_clip"
3636
STEP_COUNTER = "@PS_STEP_COUNTER@"
3737
OP_ROLE_VAR_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleVarAttrName()
3838
RPC_OP_ROLE_ATTR_NAME = core.op_proto_and_checker_maker.kOpRoleAttrName()

python/paddle/fluid/tests/unittests/test_data_generator.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,32 @@ def data_iter():
9595
return data_iter
9696

9797

98+
class MyMultiSlotStringDataGenerator_zip(fleet.MultiSlotStringDataGenerator):
99+
def generate_sample(self, line):
100+
def data_iter():
101+
for i in range(40):
102+
if i == 1:
103+
yield None
104+
feature_name = ["words", "label"]
105+
data = [["1", "2", "3", "4"], ["0"]]
106+
yield zip(feature_name, data)
107+
108+
return data_iter
109+
110+
111+
class MyMultiSlotDataGenerator_zip(fleet.MultiSlotDataGenerator):
112+
def generate_sample(self, line):
113+
def data_iter():
114+
for i in range(40):
115+
if i == 1:
116+
yield None
117+
feature_name = ["words", "label"]
118+
data = [[1, 2, 3, 4], [0]]
119+
yield zip(feature_name, data)
120+
121+
return data_iter
122+
123+
98124
class TestMultiSlotDataGenerator(unittest.TestCase):
99125
def test_MultiSlotDataGenerator_basic(self):
100126
my_ms_dg = MyMultiSlotDataGenerator()
@@ -149,5 +175,19 @@ def test_MultiSlotDataGenerator_error(self):
149175
my_ms_dg.run_from_memory()
150176

151177

178+
class TestMultiSlotStringDataGeneratorZip(unittest.TestCase):
179+
def test_MultiSlotStringDataGenerator_zip(self):
180+
my_ms_dg = MyMultiSlotStringDataGenerator_zip()
181+
my_ms_dg.set_batch(1)
182+
my_ms_dg.run_from_memory()
183+
184+
185+
class TestMultiSlotDataGeneratorZip(unittest.TestCase):
186+
def test_MultiSlotDataGenerator_zip(self):
187+
my_ms_dg = MyMultiSlotDataGenerator_zip()
188+
my_ms_dg.set_batch(1)
189+
my_ms_dg.run_from_memory()
190+
191+
152192
if __name__ == '__main__':
153193
unittest.main()

0 commit comments

Comments
 (0)