@@ -2650,57 +2650,67 @@ def restore(self, executor):
2650
2650
class PipelineOptimizer (object ):
2651
2651
"""
2652
2652
Pipeline Optimizer
2653
- Train with pipeline mode. The program will be splited by cut_list.
2654
- If the len of cut_list is k, then the whole program (including
2655
- backward part) will be splited to 2*k-1 sections. So the length of place_list
2656
- and concurrency_list must be also 2*k-1.
2657
- Note: Though the asynchronous mode is applied in pipeline training to speed up,
2653
+
2654
+ Train with pipeline mode. The program will be splited by cut_list.
2655
+
2656
+ If the len of cut_list is k, then the whole program (including \
2657
+ backward part) will be splited to 2*k-1 sections.
2658
+
2659
+ So the length of place_list and concurrency_list must be also 2*k-1.
2660
+
2661
+ Note: Though the asynchronous mode is applied in pipeline training to speed up, \
2658
2662
the final performance depends on the training progress of each pipeline heavily.
2659
- And we will try the synchronous mode in the future
2663
+
2664
+ And we will try the synchronous mode in the future.
2665
+
2660
2666
Args:
2661
- optimizer (Optimizer): The based optimizer, such as SGD
2662
- cut_list (list of Variable list): The cut variable of the main_program
2663
- place_list (list of Place): The place where the section will run on
2664
- concurrency_list (list of int): The concurrency degree
2667
+ optimizer (Optimizer): The based optimizer, such as SGD.
2668
+ cut_list (list of Variable list): The cut variable of the main_program.
2669
+ place_list (list of Place): The place where the section will run on.
2670
+ concurrency_list (list of int): The concurrency degree.
2665
2671
queue_size (int): Each section will consume scopes from its in-scope queue
2666
2672
and produce scopes to out-scope queue. And this parameter
2667
- specify the scope queue size. [Optional. Default: 30]
2668
- sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]
2669
- start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]
2673
+ specify the scope queue size. [Optional. Default: 30].
2674
+ sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1].
2675
+ start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0].
2676
+
2670
2677
Examples:
2671
2678
.. code-block:: python
2672
- x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
2673
- y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
2674
- emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
2675
- emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
2676
- concat = layers.concat([emb_x, emb_y], axis=1)
2677
- fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
2678
- loss = layers.reduce_mean(fc)
2679
- optimizer = fluid.optimizer.SGD(learning_rate=0.5)
2680
- optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
2681
- cut_list=[[emb_x, emb_y], [loss]],
2682
- place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
2683
- concurrency_list=[1, 1, 4],
2684
- queue_size=2,
2685
- sync_steps=1,
2686
- )
2687
- optimizer.minimize(loss)
2688
- place = fluid.CPUPlace()
2689
- exe = fluid.Executor(place)
2690
- exe.run(fluid.default_startup_program())
2691
- filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
2692
- dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
2693
- dataset.set_use_var([x,y])
2694
- dataset.set_batch_size(batch_size)
2695
- dataset.set_filelist(filelist)
2696
- exe.train_from_dataset(
2697
- fluid.default_main_program(),
2698
- dataset,
2699
- thread=2,
2700
- debug=False,
2701
- fetch_list=[],
2702
- fetch_info=[],
2703
- print_period=1)
2679
+
2680
+ import paddle.fluid.layers as layers
2681
+
2682
+ x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
2683
+ y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
2684
+ emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
2685
+ emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
2686
+ concat = layers.concat([emb_x, emb_y], axis=1)
2687
+ fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
2688
+ loss = layers.reduce_mean(fc)
2689
+ optimizer = fluid.optimizer.SGD(learning_rate=0.5)
2690
+ optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
2691
+ cut_list=[[emb_x, emb_y], [loss]],
2692
+ place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
2693
+ concurrency_list=[1, 1, 4],
2694
+ queue_size=2,
2695
+ sync_steps=1,
2696
+ )
2697
+ optimizer.minimize(loss)
2698
+ place = fluid.CPUPlace()
2699
+ exe = fluid.Executor(place)
2700
+ exe.run(fluid.default_startup_program())
2701
+ filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
2702
+ dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
2703
+ dataset.set_use_var([x,y])
2704
+ dataset.set_batch_size(batch_size)
2705
+ dataset.set_filelist(filelist)
2706
+ exe.train_from_dataset(
2707
+ fluid.default_main_program(),
2708
+ dataset,
2709
+ thread=2,
2710
+ debug=False,
2711
+ fetch_list=[],
2712
+ fetch_info=[],
2713
+ print_period=1)
2704
2714
"""
2705
2715
2706
2716
def __init__ (self ,
@@ -2720,7 +2730,7 @@ def __init__(self,
2720
2730
self ._sync_steps = sync_steps
2721
2731
self ._start_cpu_core_id = start_cpu_core_id
2722
2732
2723
- def create_vars (self , block , main_program ):
2733
+ def _create_vars (self , block , main_program ):
2724
2734
used_var_set = set ()
2725
2735
for op_idx in range (block .desc .op_size ()):
2726
2736
op_desc = block .desc .op (op_idx )
@@ -2732,7 +2742,7 @@ def create_vars(self, block, main_program):
2732
2742
source_var = main_program .block (0 ).var (str (var ))
2733
2743
block ._clone_variable (source_var , False )
2734
2744
2735
- def extract_section_opt_ops (self , ops , cut_point_name ):
2745
+ def _extract_section_opt_ops (self , ops , cut_point_name ):
2736
2746
"""
2737
2747
Extract opt ops in the given section
2738
2748
"""
@@ -2748,7 +2758,7 @@ def extract_section_opt_ops(self, ops, cut_point_name):
2748
2758
op_path = [ops [i ] for i in range (len (ops )) if relevant_op_flags [i ]]
2749
2759
return op_path
2750
2760
2751
- def find_input_output (self , ops , name , is_forward = True ):
2761
+ def _find_input_output (self , ops , name , is_forward = True ):
2752
2762
"""
2753
2763
Find the inputs or outputs of a section
2754
2764
"""
@@ -2763,7 +2773,7 @@ def find_input_output(self, ops, name, is_forward=True):
2763
2773
all_set .update (op .desc .input_arg_names ())
2764
2774
return all_set - part_set
2765
2775
2766
- def find_persistable_vars (self , ops , whole_parameters ):
2776
+ def _find_persistable_vars (self , ops , whole_parameters ):
2767
2777
"""
2768
2778
find the persistable input vars in current section
2769
2779
"""
@@ -2791,7 +2801,7 @@ def _is_lr_role_op(self, op):
2791
2801
return True
2792
2802
return False
2793
2803
2794
- def extract_section_ops (self , ops , cut_point_name ):
2804
+ def _extract_section_ops (self , ops , cut_point_name ):
2795
2805
"""
2796
2806
Extract ops in the given section
2797
2807
"""
@@ -2811,11 +2821,11 @@ def extract_section_ops(self, ops, cut_point_name):
2811
2821
op_path = [ops [i ] for i in range (len (ops )) if relevant_op_flags [i ]]
2812
2822
return op_path
2813
2823
2814
- def find_section_opt (self , ops , params ):
2815
- res = self .extract_section_opt_ops (ops , params )
2824
+ def _find_section_opt (self , ops , params ):
2825
+ res = self ._extract_section_opt_ops (ops , params )
2816
2826
return res
2817
2827
2818
- def split_program (self , main_program , cut_list ):
2828
+ def _split_program (self , main_program , cut_list ):
2819
2829
programs = []
2820
2830
block = main_program .block (0 )
2821
2831
whole_parameters = [e .name for e in block .all_parameters ()]
@@ -2836,24 +2846,24 @@ def split_program(self, main_program, cut_list):
2836
2846
"input_set" : set (),
2837
2847
"output_set" : set ()
2838
2848
}
2839
- cur_ops = self .extract_section_ops (ops , cut_vars )
2849
+ cur_ops = self ._extract_section_ops (ops , cut_vars )
2840
2850
if i == 0 :
2841
2851
for op in ops :
2842
2852
if self ._is_lr_role_op (op ):
2843
2853
cur_ops .append (op )
2844
2854
#prevent inplace in/out
2845
2855
program ["input_set" ].update (
2846
- self .find_input_output (
2856
+ self ._find_input_output (
2847
2857
cur_ops , [], is_forward = True ))
2848
2858
for e in cur_ops :
2849
2859
ops .remove (e )
2850
2860
2851
2861
if i < cut_len :
2852
2862
sec_params .append (
2853
- self .find_persistable_vars (cur_ops , whole_parameters ))
2863
+ self ._find_persistable_vars (cur_ops , whole_parameters ))
2854
2864
if i >= cut_len - 1 :
2855
- opt_ops = self .find_section_opt ( ops ,
2856
- sec_params [2 * cut_len - 2 - i ])
2865
+ opt_ops = self ._find_section_opt (
2866
+ ops , sec_params [2 * cut_len - 2 - i ])
2857
2867
2858
2868
for e in opt_ops :
2859
2869
ops .remove (e )
@@ -2864,11 +2874,11 @@ def split_program(self, main_program, cut_list):
2864
2874
ap_op = program ["program" ].block (0 ).desc .append_op ()
2865
2875
ap_op .copy_from (op_desc )
2866
2876
program ["input_set" ].update (
2867
- self .find_input_output (
2877
+ self ._find_input_output (
2868
2878
cur_ops , cut_vars , is_forward = True ))
2869
2879
program ["input_set" ].update (sec_params [min (i , 2 * cut_len - 2 - i )])
2870
2880
program ["output_set" ].update (
2871
- self .find_input_output (
2881
+ self ._find_input_output (
2872
2882
cur_ops , cut_vars , is_forward = False ))
2873
2883
programs .append (program )
2874
2884
program = {
@@ -2883,7 +2893,7 @@ def split_program(self, main_program, cut_list):
2883
2893
program ["input_set" ].update (
2884
2894
[cut_var .name + "@GRAD" for cut_var in cut_list [0 ]])
2885
2895
program ["input_set" ].update (
2886
- self .find_input_output (
2896
+ self ._find_input_output (
2887
2897
ops , [], is_forward = True ))
2888
2898
program ["input_set" ].update (sec_params [0 ])
2889
2899
programs .append (program )
@@ -2904,9 +2914,9 @@ def minimize(self,
2904
2914
self ._optimizer .minimize (loss , startup_program , parameter_list ,
2905
2915
no_grad_set )
2906
2916
program = loss .block .program
2907
- program_list = self .split_program (program , self ._cut_list )
2917
+ program_list = self ._split_program (program , self ._cut_list )
2908
2918
for p in program_list :
2909
- self .create_vars (p ["program" ].block (0 ), program )
2919
+ self ._create_vars (p ["program" ].block (0 ), program )
2910
2920
whole_parameters = [e .name for e in program .block (0 ).all_parameters ()]
2911
2921
param_need_sync = []
2912
2922
for i , section_p in enumerate (program_list ):
0 commit comments