Skip to content

Commit a391762

Browse files
graph partition (#42472)
* enable graph-engine to return all id (#42319) * enable graph-engine to return all id * change vector's dimension * change vector's dimension * enlarge returned ids dimensions * change sample result's structure to fit training (#42426) * enable graph-engine to return all id * change vector's dimension * change vector's dimension * enlarge returned ids dimensions * add actual_val * change vlog * fix bug * bug fix * bug fix * fix display test * singleton of gpu_graph_wrapper * change sample result's structure to fit training * recover sample code * fix * secondary sample * add graph partition * fix pybind Co-authored-by: DesmonDay <[email protected]> Co-authored-by: DesmonDay <[email protected]>
1 parent a574586 commit a391762

File tree

11 files changed

+819
-96
lines changed

11 files changed

+819
-96
lines changed

paddle/fluid/distributed/ps/table/common_graph_table.cc

Lines changed: 400 additions & 22 deletions
Large diffs are not rendered by default.

paddle/fluid/distributed/ps/table/common_graph_table.h

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,13 @@ class GraphShard {
6363
}
6464
return res;
6565
}
66-
66+
std::vector<int64_t> get_all_id() {
67+
std::vector<int64_t> res;
68+
for (int i = 0; i < (int)bucket.size(); i++) {
69+
res.push_back(bucket[i]->get_id());
70+
}
71+
return res;
72+
}
6773
GraphNode *add_graph_node(int64_t id);
6874
GraphNode *add_graph_node(Node *node);
6975
FeatureNode *add_feature_node(int64_t id);
@@ -420,6 +426,10 @@ class GraphTable : public Table {
420426
use_cache = false;
421427
shard_num = 0;
422428
rw_lock.reset(new pthread_rwlock_t());
429+
#ifdef PADDLE_WITH_HETERPS
430+
next_partition = 0;
431+
total_memory_cost = 0;
432+
#endif
423433
}
424434
virtual ~GraphTable();
425435

@@ -465,6 +475,8 @@ class GraphTable : public Table {
465475
int32_t load_edges(const std::string &path, bool reverse,
466476
const std::string &edge_type);
467477

478+
std::vector<std::vector<int64_t>> get_all_id(int type, int idx,
479+
int slice_num);
468480
int32_t load_nodes(const std::string &path, std::string node_type);
469481

470482
int32_t add_graph_node(int idx, std::vector<int64_t> &id_list,
@@ -513,7 +525,7 @@ class GraphTable : public Table {
513525
const std::vector<std::vector<std::string>> &res);
514526

515527
size_t get_server_num() { return server_num; }
516-
528+
void clear_graph(int idx);
517529
virtual int32_t make_neighbor_sample_cache(size_t size_limit, size_t ttl) {
518530
{
519531
std::unique_lock<std::mutex> lock(mutex_);
@@ -538,15 +550,33 @@ class GraphTable : public Table {
538550
// graph_sampler->set_graph_sample_callback(callback);
539551
// return 0;
540552
// }
553+
virtual void make_partitions(int idx, int64_t gb_size, int device_len);
541554
virtual char *random_sample_neighbor_from_ssd(
542555
int idx, int64_t id, int sample_size,
543556
const std::shared_ptr<std::mt19937_64> rng, int &actual_size);
544557
virtual int32_t add_node_to_ssd(int type_id, int idx, int64_t src_id,
545558
char *data, int len);
546559
virtual paddle::framework::GpuPsCommGraph make_gpu_ps_graph(
547560
int idx, std::vector<int64_t> ids);
561+
int32_t Load_to_ssd(const std::string &path, const std::string &param);
562+
int64_t load_graph_to_memory_from_ssd(int idx, std::vector<int64_t> &ids);
563+
int32_t make_complementary_graph(int idx, int64_t byte_size);
564+
int32_t dump_edges_to_ssd(int idx);
565+
int32_t get_partition_num(int idx) { return partitions[idx].size(); }
566+
std::vector<int64_t> get_partition(int idx, int index) {
567+
if (idx >= partitions.size() || index >= partitions[idx].size())
568+
return std::vector<int64_t>();
569+
return partitions[idx][index];
570+
}
571+
int32_t load_edges_to_ssd(const std::string &path, bool reverse_edge,
572+
const std::string &edge_type);
573+
int32_t load_next_partition(int idx);
574+
void set_search_level(int search_level) { this->search_level = search_level; }
548575
// virtual GraphSampler *get_graph_sampler() { return graph_sampler.get(); }
549576
int search_level;
577+
int64_t total_memory_cost;
578+
std::vector<std::vector<std::vector<int64_t>>> partitions;
579+
int next_partition;
550580
#endif
551581
virtual int32_t add_comm_edge(int idx, int64_t src_id, int64_t dst_id);
552582
virtual int32_t build_sampler(int idx, std::string sample_type = "random");

paddle/fluid/framework/fleet/heter_ps/gpu_graph_node.h

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,36 +24,46 @@ namespace paddle {
2424
namespace framework {
2525
struct GpuPsGraphNode {
2626
int64_t node_id;
27-
int neighbor_size, neighbor_offset;
27+
unsigned int neighbor_size, neighbor_offset;
2828
// this node's neighbor is stored on [neighbor_offset,neighbor_offset +
2929
// neighbor_size) of int64_t *neighbor_list;
3030
};
3131

3232
struct GpuPsCommGraph {
3333
int64_t *neighbor_list;
3434
GpuPsGraphNode *node_list;
35-
int neighbor_size, node_size;
35+
unsigned int neighbor_size, node_size;
3636
// the size of neighbor array and graph_node_list array
3737
GpuPsCommGraph()
3838
: neighbor_list(NULL), node_list(NULL), neighbor_size(0), node_size(0) {}
3939
GpuPsCommGraph(int64_t *neighbor_list_, GpuPsGraphNode *node_list_,
40-
int neighbor_size_, int node_size_)
40+
unsigned int neighbor_size_, unsigned int node_size_)
4141
: neighbor_list(neighbor_list_),
4242
node_list(node_list_),
4343
neighbor_size(neighbor_size_),
4444
node_size(node_size_) {}
45+
void init_on_cpu(unsigned int neighbor_size, unsigned int node_size) {
46+
this->neighbor_size = neighbor_size;
47+
this->node_size = node_size;
48+
this->neighbor_list = new int64_t[neighbor_size];
49+
this->node_list = new paddle::framework::GpuPsGraphNode[node_size];
50+
}
51+
void release_on_cpu() {
52+
delete[] neighbor_list;
53+
delete[] node_list;
54+
}
4555
void display_on_cpu() {
4656
VLOG(0) << "neighbor_size = " << neighbor_size;
4757
VLOG(0) << "node_size = " << node_size;
48-
for (int i = 0; i < neighbor_size; i++) {
58+
for (size_t i = 0; i < neighbor_size; i++) {
4959
VLOG(0) << "neighbor " << i << " " << neighbor_list[i];
5060
}
51-
for (int i = 0; i < node_size; i++) {
61+
for (size_t i = 0; i < node_size; i++) {
5262
VLOG(0) << "node i " << node_list[i].node_id
5363
<< " neighbor_size = " << node_list[i].neighbor_size;
5464
std::string str;
5565
int offset = node_list[i].neighbor_offset;
56-
for (int j = 0; j < node_list[i].neighbor_size; j++) {
66+
for (size_t j = 0; j < node_list[i].neighbor_size; j++) {
5767
if (j > 0) str += ",";
5868
str += std::to_string(neighbor_list[j + offset]);
5969
}
@@ -139,12 +149,18 @@ struct NeighborSampleQuery {
139149
};
140150
struct NeighborSampleResult {
141151
int64_t *val;
152+
int64_t *actual_val;
142153
int *actual_sample_size, sample_size, key_size;
154+
int total_sample_size;
143155
std::shared_ptr<memory::Allocation> val_mem, actual_sample_size_mem;
156+
std::shared_ptr<memory::Allocation> actual_val_mem;
144157
int64_t *get_val() { return val; }
158+
int64_t get_actual_val() { return (int64_t)actual_val; }
145159
int *get_actual_sample_size() { return actual_sample_size; }
146160
int get_sample_size() { return sample_size; }
147161
int get_key_size() { return key_size; }
162+
void set_total_sample_size(int s) { total_sample_size = s; }
163+
int get_len() { return total_sample_size; }
148164
void initialize(int _sample_size, int _key_size, int dev_id) {
149165
sample_size = _sample_size;
150166
key_size = _key_size;
@@ -165,18 +181,30 @@ struct NeighborSampleResult {
165181
int *ac_size = new int[key_size];
166182
cudaMemcpy(ac_size, actual_sample_size, key_size * sizeof(int),
167183
cudaMemcpyDeviceToHost); // 3, 1, 3
184+
int total_sample_size = 0;
185+
for (int i = 0; i < key_size; i++) {
186+
total_sample_size += ac_size[i];
187+
}
188+
int64_t *res2 = new int64_t[total_sample_size]; // r
189+
cudaMemcpy(res2, actual_val, total_sample_size * sizeof(int64_t),
190+
cudaMemcpyDeviceToHost); // r
168191

192+
int start = 0;
169193
for (int i = 0; i < key_size; i++) {
170194
VLOG(0) << "actual sample size for " << i << "th key is " << ac_size[i];
171195
VLOG(0) << "sampled neighbors are ";
172-
std::string neighbor;
196+
std::string neighbor, neighbor2;
173197
for (int j = 0; j < ac_size[i]; j++) {
174-
if (neighbor.size() > 0) neighbor += ";";
175-
neighbor += std::to_string(res[i * sample_size + j]);
198+
// if (neighbor.size() > 0) neighbor += ";";
199+
if (neighbor2.size() > 0) neighbor2 += ";"; // r
200+
// neighbor += std::to_string(res[i * sample_size + j]);
201+
neighbor2 += std::to_string(res2[start + j]); // r
176202
}
177-
VLOG(0) << neighbor;
203+
VLOG(0) << neighbor << " " << neighbor2;
204+
start += ac_size[i]; // r
178205
}
179206
delete[] res;
207+
delete[] res2; // r
180208
delete[] ac_size;
181209
VLOG(0) << " ------------------";
182210
}

paddle/fluid/framework/fleet/heter_ps/graph_gpu_ps_table.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,18 @@
2323
#ifdef PADDLE_WITH_HETERPS
2424
namespace paddle {
2525
namespace framework {
26-
class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
26+
class GpuPsGraphTable : public HeterComm<int64_t, unsigned int, int> {
2727
public:
2828
GpuPsGraphTable(std::shared_ptr<HeterPsResource> resource, int topo_aware)
29-
: HeterComm<int64_t, int, int>(1, resource) {
29+
: HeterComm<int64_t, unsigned int, int>(1, resource) {
3030
load_factor_ = 0.25;
3131
rw_lock.reset(new pthread_rwlock_t());
3232
gpu_num = resource_->total_device();
33+
for (int i = 0; i < gpu_num; i++) {
34+
gpu_graph_list.push_back(GpuPsCommGraph());
35+
sample_status.push_back(NULL);
36+
tables_.push_back(NULL);
37+
}
3338
cpu_table_status = -1;
3439
if (topo_aware) {
3540
int total_gpu = resource_->total_device();
@@ -82,6 +87,8 @@ class GpuPsGraphTable : public HeterComm<int64_t, int, int> {
8287
// end_graph_sampling();
8388
// }
8489
}
90+
void build_graph_on_single_gpu(GpuPsCommGraph &g, int gpu_id);
91+
void clear_graph_info(int gpu_id);
8592
void build_graph_from_cpu(std::vector<GpuPsCommGraph> &cpu_node_list);
8693
NodeQueryResult graph_node_sample(int gpu_id, int sample_size);
8794
NeighborSampleResult graph_neighbor_sample_v3(NeighborSampleQuery q,

0 commit comments

Comments
 (0)