Skip to content

Commit ee4c51a

Browse files
committed
refine downpour sgd API with pslib
1 parent c583fd3 commit ee4c51a

File tree

4 files changed

+1526
-25
lines changed

4 files changed

+1526
-25
lines changed

python/paddle/fluid/distributed/__init__.py

Whitespace-only changes.
Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,32 @@
1-
import paddle.fluid as fluid
2-
import pslib_pb2 as pslib
31
from .node import DownpourServer
42
from .node import DownpourWorker
3+
from ..backward import append_backward
4+
import ps_pb2 as pslib
55
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
6+
from google.protobuf import text_format
67

78
class DownpourSGD(object):
8-
def __init__(self, optimizer=opt, learning_rate=0.001, window=1):
9+
def __init__(self, learning_rate=0.001, window=1):
910
# todo(guru4elephant): if optimizer is not None, will warning here
10-
self.learning_rate_ = opt.learning_rate
11+
self.learning_rate_ = learning_rate
1112
self.window_ = window
1213

13-
def minimize(self, loss, startup_program=None,
14-
parameter_list=None, no_grad_set=None,
14+
def minimize(self, loss, startup_program=None,
15+
parameter_list=None, no_grad_set=None,
1516
prefetch_slots=None, prefetch_slots_emb=None):
1617
params_grads = sorted(append_backward(loss), key=lambda x:x[0].name)
17-
table_name = fluid_distributed_lookup_table(loss.block.program)
18+
table_name = find_distributed_lookup_table(loss.block.program)
1819
server = DownpourServer()
19-
worker = DownpourWorker()
20-
server.add_sparse_table(0, learning_rate,
20+
worker = DownpourWorker(self.window_)
21+
server.add_sparse_table(0, learning_rate,
2122
prefetch_slots, prefetch_slots_emb)
2223
server.add_dense_table(1, learning_rate, params, grads)
23-
worker.add_sparse_table(0, learning_rate,
24+
worker.add_sparse_table(0, learning_rate,
2425
prefetch_slots, prefetch_slots_emb)
2526
worker.add_dense_table(1, learning_rate, params, grads)
26-
2727
ps_param = pslib.PSParameter()
2828
ps_param.server_param.CopyFrom(server.get_desc())
29-
ps_param.worker_param.CopyFrom(worker.get_desc())
29+
#ps_param.worker_param.CopyFrom(worker.get_desc())
3030
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
31-
32-
return [solver_desc, parallel_desc]
33-
34-
31+
ps_param_str = text_format.MessageToString(ps_param)
32+
return [ps_param_str, worker_skipped_ops]

python/paddle/fluid/distributed/node.py

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
import paddle.fluid as fluid
2-
import pslib_pb2 as pslib
1+
import ps_pb2 as pslib
32

43
class Server(object):
54
def __init__(self):
@@ -13,11 +12,13 @@ def __init__(self):
1312

1413
class DownpourServer(Server):
1514
def __init__(self):
16-
self.server_ = pslib.ServerParameter().downpour_server_param
15+
#self.server_ = pslib.ServerParameter().downpour_server_param
16+
self.server_ = pslib.ServerParameter()
1717

1818
def add_sparse_table(self, table_id, learning_rate,
1919
slot_key, slot_value_var, slot_grad_var):
20-
table = self.server_.downpour_table_param.add()
20+
#table = self.server_.downpour_table_param.add()
21+
table = self.server_.downpour_server_param.downpour_table_param.add()
2122
table.table_id = table_id
2223
table.type = PS_SPARSE_TABLE
2324
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
@@ -26,12 +27,14 @@ def add_sparse_table(self, table_id, learning_rate,
2627

2728
def add_dense_table(self, table_id, learning_rate,
2829
param_var, grad_var):
29-
table = self.server_.downpour_table_param.add()
30+
#table = self.server_.downpour_table_param.add()
31+
table = self.server_.downpour_server_param.downpour_table_param.add()
3032
table.table_id = table_id
3133
table.type = PS_DENSE_TABLE
3234
table.accessor.accessor_class = "DownpourDenseValueAccessor"
3335
table.accessor.sparse_sgd_param.learning_rate = learning_rate
34-
table.accessor.fea_dim = reduce(lambda x, y: x.shape, 1 for x in param_var)
36+
table.accessor.fea_dim = 1
37+
#table.accessor.fea_dim = reduce(lambda x, y: x.shape, 1 for x in param_var)
3538

3639
def get_desc(self):
3740
return self.server_
@@ -40,19 +43,28 @@ def get_desc(self):
4043
class DownpourWorker(Worker):
4144
def __init__(self, window):
4245
self.window = window
43-
self.worker_ = pslib.WorkerParameter().downpour_worker_param
46+
#self.worker_ = pslib.WorkerParameter().downpour_worker_param
47+
#self.worker_ = pslib.WorkerParameter()
48+
self.worker_ = pslib.DownpourTrainerParameter()
49+
#self.worker_.pull_dense_per_batch = window
50+
#self.worker_.push_dense_per_batch = window
51+
#self.worker_.downpour_worker_param.pull_dense_per_batch = window
52+
#self.worker_.downpour_worker_param.push_dense_per_batch = window
4453
self.worker_.pull_dense_per_batch = window
4554
self.worker_.push_dense_per_batch = window
55+
print(self.worker_)
4656

4757
def add_sparse_table(self, table_id,
4858
slot_keys, slot_value_vars, slot_grad_vars):
49-
table = self.worker_.sparse_table.add()
59+
#table = self.worker_.sparse_table.add()
60+
table = self.worker_.downpour_worker_param.sparse_table.add()
5061
table.table_id = table_id
5162
table.slot.extend(slot_keys)
5263
self.worker_.extend([grad.name for grad in slot_grad_vars])
5364

5465
def add_dense_table(self, table_id, param_vars, grad_vars):
55-
table = self.worker_.dense_table.add()
66+
#table = self.worker_.dense_table.add()
67+
table = self.worker_.downpour_worker_param.dense_table.add()
5668
table.table_id = table_id
5769
table.dense_variable_name.extend([p.name for p in param_vars])
5870
table.dense_gradient_variable_name.extend([g.name for g in grad_vars])

0 commit comments

Comments
 (0)