Skip to content

Commit ff4612a

Browse files
authored
[Cherry pick] cherry-pick #31102 #30750 #30626 (#31336)
* solve build gpu task core (#30626) * build gpu task core * format * dump to cpu (#30750) * dump to cpu * format * format * format * support multi node in heterps (#31102) * push multi node * multi node * MultiThread * remove log * solve bug in 30829 * optimizer
1 parent a891032 commit ff4612a

19 files changed

+710
-37
lines changed

paddle/fluid/framework/fleet/fleet_wrapper.cc

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,14 +27,9 @@ See the License for the specific language governing permissions and
2727
limitations under the License. */
2828

2929
#include "paddle/fluid/framework/fleet/fleet_wrapper.h"
30-
#include <algorithm>
31-
#include <utility>
32-
#include "paddle/fluid/framework/channel.h"
33-
#include "paddle/fluid/framework/data_feed.h"
34-
#include "paddle/fluid/framework/io/fs.h"
30+
31+
#include "glog/logging.h"
3532
#include "paddle/fluid/framework/op_registry.h"
36-
#include "paddle/fluid/framework/scope.h"
37-
#include "paddle/fluid/platform/timer.h"
3833

3934
namespace paddle {
4035
namespace framework {

paddle/fluid/framework/fleet/heter_context.h

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,69 @@ namespace framework {
2929

3030
class HeterContext {
3131
public:
32+
~HeterContext() {
33+
for (size_t i = 0; i < mutex_.size(); ++i) {
34+
delete mutex_[i];
35+
}
36+
mutex_.clear();
37+
}
3238
Scope* scope_{nullptr};
3339
std::vector<std::vector<FeatureKey>> feature_keys_;
3440
std::vector<std::vector<paddle::ps::DownpourFixedFeatureValue*>> value_ptr_;
35-
std::vector<std::vector<FeatureValue>> feature_values_;
41+
std::vector<std::vector<FeatureValue>> device_values_;
42+
std::vector<std::vector<FeatureKey>> device_keys_;
43+
std::vector<std::mutex*> mutex_;
44+
45+
uint32_t shard_num_ = 37;
3646
uint64_t size() {
3747
uint64_t total_size = 0;
3848
for (auto& keys : feature_keys_) {
3949
total_size += keys.size();
4050
}
4151
return total_size;
4252
}
53+
void SetShardNum(uint32_t shard_num) { shard_num_ = shard_num; }
54+
uint32_t ShardNum() { return shard_num_; }
55+
void init(int shard_num, int device_num) {
56+
shard_num_ = shard_num;
57+
feature_keys_.resize(shard_num_);
58+
value_ptr_.resize(shard_num_);
59+
60+
device_values_.resize(device_num);
61+
device_keys_.resize(device_num);
62+
mutex_.resize(device_num);
63+
for (size_t i = 0; i < mutex_.size(); ++i) {
64+
mutex_[i] = new std::mutex();
65+
}
66+
}
67+
void batch_add_keys(const std::vector<std::vector<uint64_t>>& thread_keys) {
68+
assert(thread_keys.size() == feature_keys_.size());
69+
70+
for (uint32_t i = 0; i < shard_num_; i++) {
71+
int idx = 0;
72+
idx = feature_keys_[i].size();
73+
feature_keys_[i].resize(feature_keys_[i].size() + thread_keys[i].size());
74+
for (uint64_t j = 0; j < thread_keys[i].size(); j++) {
75+
feature_keys_[i][idx + j] = thread_keys[i][j];
76+
}
77+
}
78+
}
79+
void UniqueKeys() {
80+
std::vector<std::thread> threads;
81+
auto unique_func = [this](int i) {
82+
auto& cur_keys = feature_keys_[i];
83+
std::sort(cur_keys.begin(), cur_keys.end());
84+
std::vector<FeatureKey>::iterator it;
85+
it = std::unique(cur_keys.begin(), cur_keys.end());
86+
cur_keys.resize(std::distance(cur_keys.begin(), it));
87+
};
88+
for (uint32_t i = 0; i < shard_num_; i++) {
89+
threads.push_back(std::thread(unique_func, i));
90+
}
91+
for (std::thread& t : threads) {
92+
t.join();
93+
}
94+
}
4395
};
4496

4597
} // end namespace framework

paddle/fluid/framework/fleet/heter_ps/feature_value.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ struct FeatureValue {
3333
float lr_g2sum;
3434
int mf_size;
3535
float mf[MF_DIM + 1];
36+
uint64_t cpu_ptr;
3637

3738
friend std::ostream& operator<<(std::ostream& out, FeatureValue& val) {
3839
out << "show: " << val.show << " clk: " << val.clk << " slot: " << val.slot

paddle/fluid/framework/fleet/heter_ps/hashtable.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ See the License for the specific language governing permissions and
1313
limitations under the License. */
1414

1515
#pragma once
16+
#include <glog/logging.h>
1617
#include <limits>
1718
#include <memory>
1819
#include <vector>
20+
#include "common_value.h" // NOLINT
1921
#include "thrust/pair.h"
2022
//#include "cudf/concurrent_unordered_map.cuh.h"
2123
#include "paddle/fluid/framework/fleet/heter_ps/cudf/concurrent_unordered_map.cuh.h"
@@ -47,6 +49,7 @@ class HashTable {
4749
void get(const KeyType* d_keys, ValType* d_vals, size_t len,
4850
cudaStream_t stream);
4951
void show();
52+
void dump_to_cpu(int devid, cudaStream_t stream);
5053

5154
template <typename GradType, typename Sgd>
5255
void update(const KeyType* d_keys, const GradType* d_grads, size_t len,
@@ -60,5 +63,5 @@ class HashTable {
6063
};
6164
} // end namespace framework
6265
} // end namespace paddle
63-
#include "hashtable.tpp"
66+
#include "hashtable_inl.h"
6467
#endif

paddle/fluid/framework/fleet/heter_ps/hashtable.tpp renamed to paddle/fluid/framework/fleet/heter_ps/hashtable_inl.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,41 @@ void HashTable<KeyType, ValType>::insert(const KeyType* d_keys,
108108
d_vals, len);
109109
}
110110

111+
template <typename KeyType, typename ValType>
112+
void HashTable<KeyType, ValType>::dump_to_cpu(int devid, cudaStream_t stream) {
113+
container_->prefetch(cudaCpuDeviceId, stream);
114+
size_t num = container_->size();
115+
KeyType unuse_key = std::numeric_limits<KeyType>::max();
116+
thrust::pair<KeyType, ValType>* kv = container_->data();
117+
for (size_t i = 0; i < num; ++i) {
118+
if (kv[i].first == unuse_key) {
119+
continue;
120+
}
121+
ValType& gpu_val = kv[i].second;
122+
auto* downpour_value =
123+
(paddle::ps::DownpourFixedFeatureValue*)(gpu_val.cpu_ptr);
124+
int downpour_value_size = downpour_value->size();
125+
if (gpu_val.mf_size > 0 && downpour_value_size == 7) {
126+
downpour_value->resize(gpu_val.mf_size + downpour_value_size);
127+
}
128+
float* cpu_val = downpour_value->data();
129+
cpu_val[0] = 0;
130+
cpu_val[1] = gpu_val.delta_score;
131+
cpu_val[2] = gpu_val.show;
132+
cpu_val[3] = gpu_val.clk;
133+
cpu_val[4] = gpu_val.lr;
134+
cpu_val[5] = gpu_val.lr_g2sum;
135+
cpu_val[6] = gpu_val.slot;
136+
if (gpu_val.mf_size > 0) {
137+
for (int x = 0; x < gpu_val.mf_size; x++) {
138+
cpu_val[x + 7] = gpu_val.mf[x];
139+
}
140+
}
141+
}
142+
143+
container_->prefetch(devid, stream);
144+
}
145+
111146
template <typename KeyType, typename ValType>
112147
template <typename GradType, typename Sgd>
113148
void HashTable<KeyType, ValType>::update(const KeyType* d_keys,

paddle/fluid/framework/fleet/heter_ps/heter_comm.h

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,15 @@ See the License for the specific language governing permissions and
1313
limitations under the License. */
1414

1515
#pragma once
16+
#include <thread>
1617
#include <vector>
1718
#include "cub/cub.cuh"
1819
#include "hashtable.h"
1920
#include "heter_resource.h"
20-
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh"
21+
#include "paddle/fluid/framework/fleet/heter_ps/optimizer.cuh.h"
2122
#include "paddle/fluid/memory/memory.h"
2223
#include "paddle/fluid/platform/cuda_device_guard.h"
24+
#include "paddle/fluid/platform/dynload/nccl.h"
2325
#include "paddle/fluid/platform/place.h"
2426
#include "thrust/pair.h"
2527

@@ -67,11 +69,38 @@ class HeterComm {
6769
void push_sparse(int num, KeyType* d_keys, GradType* d_grads, size_t len,
6870
Sgd& sgd);
6971

72+
template <typename Sgd>
73+
void push_sparse_multi_node(int num, KeyType* d_keys, GradType* d_grads,
74+
size_t len, Sgd& sgd);
75+
76+
template <typename Sgd>
77+
void update_one_table(int num, KeyType* d_keys, GradType* d_grads, size_t len,
78+
Sgd& sgd);
79+
80+
int gather_one_node_grad(int num, KeyType* d_keys, GradType* d_grads,
81+
int len);
82+
83+
int gather_multi_node_grad(int num, KeyType* d_keys, GradType* d_grads,
84+
int len);
85+
7086
int log2i(int x);
87+
88+
void set_nccl_comm_and_size(const std::vector<ncclComm_t>& inner_comms,
89+
const std::vector<ncclComm_t>& inter_comms,
90+
int comm_size) {
91+
nccl_inner_comms_ = inner_comms;
92+
nccl_inter_comms_ = inter_comms;
93+
node_size_ = comm_size;
94+
}
95+
7196
bool need_transfer(int send_id, int receive_id) {
7297
return ((send_id / 4 != receive_id / 4) && (send_id + 4) % 8 != receive_id);
7398
}
7499

100+
// void dump_to_cpu(int index);
101+
102+
void end_pass();
103+
75104
int get_transfer_devid(int send_id) { return (send_id + 4) % 8; }
76105

77106
struct Node {
@@ -89,6 +118,44 @@ class HeterComm {
89118
std::vector<Node> nodes_;
90119
};
91120

121+
struct LocalStorage {
122+
LocalStorage() {}
123+
void init(int size, int dev_id) {
124+
place_ = platform::CUDAPlace(dev_id);
125+
alloc(size, true);
126+
}
127+
128+
void alloc(int size, bool force = false) {
129+
if (force || size > all_keys_mem->size()) {
130+
all_keys_mem.reset();
131+
all_grads_mem.reset();
132+
all_keys_mem = memory::AllocShared(place_, size * sizeof(KeyType));
133+
all_grads_mem = memory::AllocShared(place_, size * sizeof(GradType));
134+
all_keys = reinterpret_cast<KeyType*>(all_keys_mem->ptr());
135+
all_grads = reinterpret_cast<GradType*>(all_grads_mem->ptr());
136+
}
137+
if (force || size > local_keys_mem->size()) {
138+
local_keys_mem.reset();
139+
local_grads_mem.reset();
140+
local_keys_mem = memory::AllocShared(place_, size * sizeof(KeyType));
141+
local_grads_mem = memory::AllocShared(place_, size * sizeof(GradType));
142+
local_keys = reinterpret_cast<KeyType*>(local_keys_mem->ptr());
143+
local_grads = reinterpret_cast<GradType*>(local_grads_mem->ptr());
144+
}
145+
}
146+
147+
platform::CUDAPlace place_;
148+
std::shared_ptr<memory::Allocation> all_keys_mem;
149+
std::shared_ptr<memory::Allocation> all_grads_mem;
150+
KeyType* all_keys;
151+
GradType* all_grads;
152+
153+
std::shared_ptr<memory::Allocation> local_keys_mem;
154+
std::shared_ptr<memory::Allocation> local_grads_mem;
155+
KeyType* local_keys;
156+
GradType* local_grads;
157+
};
158+
92159
void init_path();
93160
void create_storage(
94161
int start_index, int end_index, int keylen, int vallen,
@@ -106,9 +173,15 @@ class HeterComm {
106173
CustomGradMerger merger_;
107174
int topo_aware_{1};
108175
std::vector<std::vector<Path>> path_;
176+
std::vector<LocalStorage> storage_;
177+
int feanum_{1800 * 2048};
178+
int multi_node_{1};
179+
std::vector<ncclComm_t> nccl_inner_comms_;
180+
std::vector<ncclComm_t> nccl_inter_comms_;
181+
int node_size_;
109182
};
110183

111184
} // end namespace framework
112185
} // end namespace paddle
113-
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm.tpp"
186+
#include "paddle/fluid/framework/fleet/heter_ps/heter_comm_inl.h"
114187
#endif

0 commit comments

Comments
 (0)