Skip to content

Commit 0a3d8a2

Browse files
Merge pull request #1 from guru4elephant/for_pslib
For pslib
2 parents e650b42 + ee4c51a commit 0a3d8a2

File tree

5 files changed

+1616
-0
lines changed

5 files changed

+1616
-0
lines changed

python/paddle/fluid/distributed/__init__.py

Whitespace-only changes.
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
from .node import DownpourServer
2+
from .node import DownpourWorker
3+
from ..backward import append_backward
4+
import ps_pb2 as pslib
5+
from paddle.fluid.distribute_lookup_table import find_distributed_lookup_table
6+
from google.protobuf import text_format
7+
8+
class DownpourSGD(object):
9+
def __init__(self, learning_rate=0.001, window=1):
10+
# todo(guru4elephant): if optimizer is not None, will warning here
11+
self.learning_rate_ = learning_rate
12+
self.window_ = window
13+
14+
def minimize(self, loss, startup_program=None,
15+
parameter_list=None, no_grad_set=None,
16+
prefetch_slots=None, prefetch_slots_emb=None):
17+
params_grads = sorted(append_backward(loss), key=lambda x:x[0].name)
18+
table_name = find_distributed_lookup_table(loss.block.program)
19+
server = DownpourServer()
20+
worker = DownpourWorker(self.window_)
21+
server.add_sparse_table(0, learning_rate,
22+
prefetch_slots, prefetch_slots_emb)
23+
server.add_dense_table(1, learning_rate, params, grads)
24+
worker.add_sparse_table(0, learning_rate,
25+
prefetch_slots, prefetch_slots_emb)
26+
worker.add_dense_table(1, learning_rate, params, grads)
27+
ps_param = pslib.PSParameter()
28+
ps_param.server_param.CopyFrom(server.get_desc())
29+
#ps_param.worker_param.CopyFrom(worker.get_desc())
30+
worker_skipped_ops = ["lookup_table", "lookup_table_grad"]
31+
ps_param_str = text_format.MessageToString(ps_param)
32+
return [ps_param_str, worker_skipped_ops]
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from mpi4py import MPI
2+
3+
class MPIHelper(object):
4+
def __init__(self):
5+
self.comm = MPI.COMM_WORLD
6+
7+
def get_rank(self):
8+
return self.comm.Get_rank()
9+
10+
def get_size(self):
11+
return self.comm.Get_size()
12+
13+
def get_ip(self):
14+
import socket
15+
local_ip = socket.gethostbyname(socket.gethostname())
16+
return local_ip
17+
18+
def get_hostname(self):
19+
import socket
20+
return socket.gethostname()
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
import ps_pb2 as pslib
2+
3+
class Server(object):
4+
def __init__(self):
5+
pass
6+
7+
8+
class Worker(object):
9+
def __init__(self):
10+
pass
11+
12+
13+
class DownpourServer(Server):
14+
def __init__(self):
15+
#self.server_ = pslib.ServerParameter().downpour_server_param
16+
self.server_ = pslib.ServerParameter()
17+
18+
def add_sparse_table(self, table_id, learning_rate,
19+
slot_key, slot_value_var, slot_grad_var):
20+
#table = self.server_.downpour_table_param.add()
21+
table = self.server_.downpour_server_param.downpour_table_param.add()
22+
table.table_id = table_id
23+
table.type = PS_SPARSE_TABLE
24+
table.accessor.accessor_class = "DownpourFeatureValueAccessor"
25+
table.accessor.dense_sgd_param.adam.learning_rate = learning_rate
26+
table.accessor.fea_dim = slot_value_var[0].shape[1]
27+
28+
def add_dense_table(self, table_id, learning_rate,
29+
param_var, grad_var):
30+
#table = self.server_.downpour_table_param.add()
31+
table = self.server_.downpour_server_param.downpour_table_param.add()
32+
table.table_id = table_id
33+
table.type = PS_DENSE_TABLE
34+
table.accessor.accessor_class = "DownpourDenseValueAccessor"
35+
table.accessor.sparse_sgd_param.learning_rate = learning_rate
36+
table.accessor.fea_dim = 1
37+
#table.accessor.fea_dim = reduce(lambda x, y: x.shape, 1 for x in param_var)
38+
39+
def get_desc(self):
40+
return self.server_
41+
42+
43+
class DownpourWorker(Worker):
44+
def __init__(self, window):
45+
self.window = window
46+
#self.worker_ = pslib.WorkerParameter().downpour_worker_param
47+
#self.worker_ = pslib.WorkerParameter()
48+
self.worker_ = pslib.DownpourTrainerParameter()
49+
#self.worker_.pull_dense_per_batch = window
50+
#self.worker_.push_dense_per_batch = window
51+
#self.worker_.downpour_worker_param.pull_dense_per_batch = window
52+
#self.worker_.downpour_worker_param.push_dense_per_batch = window
53+
self.worker_.pull_dense_per_batch = window
54+
self.worker_.push_dense_per_batch = window
55+
print(self.worker_)
56+
57+
def add_sparse_table(self, table_id,
58+
slot_keys, slot_value_vars, slot_grad_vars):
59+
#table = self.worker_.sparse_table.add()
60+
table = self.worker_.downpour_worker_param.sparse_table.add()
61+
table.table_id = table_id
62+
table.slot.extend(slot_keys)
63+
self.worker_.extend([grad.name for grad in slot_grad_vars])
64+
65+
def add_dense_table(self, table_id, param_vars, grad_vars):
66+
#table = self.worker_.dense_table.add()
67+
table = self.worker_.downpour_worker_param.dense_table.add()
68+
table.table_id = table_id
69+
table.dense_variable_name.extend([p.name for p in param_vars])
70+
table.dense_gradient_variable_name.extend([g.name for g in grad_vars])
71+
72+
def get_desc(self):
73+
return self.worker_

0 commit comments

Comments
 (0)