-
Notifications
You must be signed in to change notification settings - Fork 80
Expand file tree
/
Copy pathcommunicator.h
More file actions
171 lines (142 loc) · 5.38 KB
/
communicator.h
File metadata and controls
171 lines (142 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// clang-format off
/*
* SPDX-FileCopyrightText: Copyright (c) 2023-present NVIDIA CORPORATION & AFFILIATES.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
// clang-format on
#pragma once
#include <ATen/core/TensorBody.h>
#include <ATen/core/ivalue.h>
#include <c10/util/intrusive_ptr.h>
#ifdef NVFUSER_DISTRIBUTED
#include <torch/csrc/distributed/c10d/Backend.hpp>
#include <torch/csrc/distributed/c10d/Store.hpp>
#include <torch/csrc/distributed/c10d/TCPStore.hpp>
#include <torch/csrc/distributed/c10d/Work.hpp>
#else
#include "multidevice/c10d_mock.h"
#endif
#include "multidevice/multidevice.h"
#include "visibility.h"
namespace nvfuser {
// This file implements the class Communicator which sets up the inter-process
// Backend. This class contains inter-process information, such as the rank, the
// world size, as well as the Process Group that can be called to perform
// inter-process communications.
//
// Each process is associated with a unique deviceId and device. The actual MPI
// rank remains private to the class and should not be used by the user. The
// communicator class holds privately the mappings ranks <-> device IDs <->
// device.
using RankType = DeviceIdxType;
std::ostream& operator<<(std::ostream& out, const CommunicatorBackend& cb);
#ifdef USE_C10D_NCCL
constexpr CommunicatorBackend comm_backend_default = CommunicatorBackend::kNccl;
#else
constexpr CommunicatorBackend comm_backend_default = CommunicatorBackend::kUcc;
#endif
constexpr int comm_server_local_rank_default = 0;
class NVF_API Communicator {
public:
static Communicator& getInstance();
Communicator(const Communicator&) = delete;
Communicator& operator=(const Communicator&) = delete;
~Communicator() = delete;
// As said in `getInstance`, the user of this class is supposed to call this
// method to clean up the singleton. This obviously can only be called once.
void cleanup();
// returns if distributed config is available
bool is_available() const {
return is_available_;
}
// returns the number of processes in the communicator
int64_t size() const {
return size_;
}
// returns the local number of processes in the communicator (within the node)
int64_t local_size() const {
return local_size_;
}
// sets the communicator's default backend
void setDefaultBackend(CommunicatorBackend backend) {
default_backend_ = backend;
}
// performs a blocking barrier in the communicator
void barrier(std::optional<CommunicatorBackend> backend = std::nullopt);
// returns the backend associated with a team
// the argument "prefix" is prepended to the key used to retrieve preexisting
// backends. Prefix is used to distinguish between different backends with the
// same team
c10d::Backend* getBackendForTeam(
const Team& team,
std::optional<CommunicatorBackend> backend,
const std::string& prefix = "");
// returns the device associated with the current process
at::Device device() const {
return at::Device("cuda:" + std::to_string(local_rank_));
}
// returns the device Id associated with the current process
DeviceIdxType deviceId() const {
return rankToDiD(rank_);
}
// returns local rank associted with the current process,
// i.e. the rank within a machine/node as opposed to the rank within the
// world.
RankType local_rank() const {
return local_rank_;
}
// returns world backend for communicator backend or default backend if not
// specified.
c10d::Backend* getWorld(
std::optional<CommunicatorBackend> backend = std::nullopt);
// returns if a backend is available for creation
bool isBackendAvailable(CommunicatorBackend backend) const {
if (backend == CommunicatorBackend::kUcc) {
return ucc_available_;
} else if (backend == CommunicatorBackend::kNccl) {
return nccl_available_;
}
return false;
}
c10d::TCPStore* getTcpStore() {
return store_.get();
}
#ifdef NVFUSER_DISTRIBUTED
// Returns the store as an intrusive_ptr for use with PyTorch symmetric
// memory (c10d::symmetric_memory::set_group_info).
c10::intrusive_ptr<c10d::Store> getStore() const;
// Returns the world backend as an intrusive_ptr so it can be registered with
// c10d::register_process_group (e.g. for PyTorch symmetric memory NCCL
// rendezvous, which resolves the group by name).
c10::intrusive_ptr<c10d::Backend> getWorldBackendIntrusivePtr(
std::optional<CommunicatorBackend> backend = std::nullopt);
#endif
private:
Communicator(
CommunicatorBackend backend = comm_backend_default,
RankType server_local_rank = comm_server_local_rank_default);
// returns the rank corresponding to a device index
RankType dIdToRank(DeviceIdxType d_id) const {
return static_cast<RankType>(d_id);
}
// returns the device index corresponding to a rank
DeviceIdxType rankToDiD(RankType rank) const {
return static_cast<DeviceIdxType>(rank);
}
bool is_available_;
CommunicatorBackend default_backend_;
RankType rank_;
int64_t size_;
RankType local_rank_;
int64_t local_size_;
std::string master_addr_;
int master_port_;
bool ucc_available_;
bool nccl_available_;
// stores the world's store used for the backend init
c10::intrusive_ptr<c10d::TCPStore> store_;
// cache for the created backends. The keys are strings generated from Teams
std::unordered_map<std::string, c10::intrusive_ptr<c10d::Backend>> backends_;
};
} // namespace nvfuser