@@ -2640,6 +2640,61 @@ def restore(self, executor):
2640
2640
2641
2641
2642
2642
class PipelineOptimizer (object ):
2643
+ """
2644
+ Pipeline Optimizer
2645
+ Train with pipeline mode. The program will be splited by cut_list.
2646
+ If the len of cut_list is k, then the whole program (including
2647
+ backward part) will be splited to 2*k-1 sections. So the length of place_list
2648
+ and concurrency_list must be also 2*k-1.
2649
+ Note: Though the asynchronous mode is applied in pipeline training to speed up,
2650
+ the final performance depends on the training progress of each pipeline heavily.
2651
+ And we will try the synchronous mode in the future
2652
+ Args:
2653
+ optimizer (Optimizer): The based optimizer, such as SGD
2654
+ cut_list (list of Variable list): The cut variable of the main_program
2655
+ place_list (list of Place): The place where the section will run on
2656
+ concurrency_list (list of int): The concurrency degree
2657
+ queue_size (int): Each section will consume scopes from its in-scope queue
2658
+ and produce scopes to out-scope queue. And this parameter
2659
+ specify the scope queue size. [Optional. Default: 30]
2660
+ sync_steps (int): The synchronization steps between different cards. [Optional. Default: 1]
2661
+ start_cpu_core_id (int): specify the first cpu core id. [Optional. Default:0]
2662
+ Examples:
2663
+ .. code-block:: python
2664
+ x = fluid.layers.data(name='x', shape=[1], dtype='int64', lod_level=0)
2665
+ y = fluid.layers.data(name='y', shape=[1], dtype='int64', lod_level=0)
2666
+ emb_x = layers.embedding(input=x, param_attr=fluid.ParamAttr(name="embx"), size=[10,2], is_sparse=False)
2667
+ emb_y = layers.embedding(input=y, param_attr=fluid.ParamAttr(name="emby",learning_rate=0.9), size=[10,2], is_sparse=False)
2668
+ concat = layers.concat([emb_x, emb_y], axis=1)
2669
+ fc = layers.fc(input=concat, name="fc", size=1, num_flatten_dims=1, bias_attr=False)
2670
+ loss = layers.reduce_mean(fc)
2671
+ optimizer = fluid.optimizer.SGD(learning_rate=0.5)
2672
+ optimizer = fluid.optimizer.PipelineOptimizer(optimizer,
2673
+ cut_list=[[emb_x, emb_y], [loss]],
2674
+ place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
2675
+ concurrency_list=[1, 1, 4],
2676
+ queue_size=2,
2677
+ sync_steps=1,
2678
+ )
2679
+ optimizer.minimize(loss)
2680
+ place = fluid.CPUPlace()
2681
+ exe = fluid.Executor(place)
2682
+ exe.run(fluid.default_startup_program())
2683
+ filelist = [] # you should set your own filelist, e.g. filelist = ["dataA.txt"]
2684
+ dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
2685
+ dataset.set_use_var([x,y])
2686
+ dataset.set_batch_size(batch_size)
2687
+ dataset.set_filelist(filelist)
2688
+ exe.train_from_dataset(
2689
+ fluid.default_main_program(),
2690
+ dataset,
2691
+ thread=2,
2692
+ debug=False,
2693
+ fetch_list=[],
2694
+ fetch_info=[],
2695
+ print_period=1)
2696
+ """
2697
+
2643
2698
def __init__ (self ,
2644
2699
optimizer ,
2645
2700
cut_list = None ,
0 commit comments