Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions csrc/dispatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class Val;
f(ShareMemHandles); \
f(HirAliasSelect); \
f(ShardByStream); \
f(Allocate); \
f(Deallocate); \
f(ForLoop); \
f(SymmetricContiguousView);
Expand Down
13 changes: 2 additions & 11 deletions csrc/host_ir/allocate_and_deallocate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <unordered_set>
#include <vector>

#include "host_ir/ir.h"
#include "ir/builder.h"
#include "ir/utils.h"

Expand Down Expand Up @@ -215,7 +216,7 @@ void insertAllocations(hir::HostIrContainer& hic) {

if (needsOutputPreallocation(e)) {
auto* allocate =
IrBuilder::create<kir::Allocate>(out, out->getMemoryType());
IrBuilder::create<hir::Allocate>(out, out->getMemoryType());
node->scope()->insert(node->iterator(), allocate);
}

Expand Down Expand Up @@ -255,13 +256,6 @@ class LowestCommonAncestor {
NVF_ERROR(depth_.insert({node, current_depth}).second);
Expr* e = node->getExpr();

// Temporary special-case for kir::Allocate. We will switch
// inserting a new `hir::Allocate` in host IR lowering where
// the allocated `tv` will be the expr input.
if (auto* alloc = dynamic_cast<kir::Allocate*>(e)) {
auto* tv = alloc->buffer()->as<TensorView>();
lca_[tv] = findLca(lca_[tv], node);
}
for (auto* tv : ir_utils::filterByType<TensorView>(e->inputs())) {
lca_[tv] = findLca(lca_[tv], node);
}
Expand Down Expand Up @@ -337,9 +331,6 @@ void checkMemoryLeak(hir::HostIrContainer& hic) {
/*pre_fn=*/
[&](const Node* node) {
Expr* e = node->getExpr();
if (auto* alloc = dynamic_cast<kir::Allocate*>(e)) {
allocated.insert(alloc->buffer()->as<TensorView>());
}
for (auto* tv : ir_utils::filterByType<TensorView>(e->inputs())) {
allocated.insert(tv);
}
Expand Down
56 changes: 42 additions & 14 deletions csrc/host_ir/evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ KernelArgumentHolder HostIrEvaluator::runWithInputs(
expr_evaluator_ = ExpressionEvaluator();
expr_evaluator_.bind("numberOfStreams", params_.number_of_streams);
NVF_ERROR(args.getCacheId().has_value());
expr_evaluator_.bind("cacheId", static_cast<int64_t>(*args.getCacheId()));
expr_evaluator_.bind(
"cacheId", static_cast<int64_t>(args.getCacheId().value()));

NVF_ERROR_EQ(std::ssize(container_->inputs()), args.size());
for (auto&& [in_val, arg] : zip(container_->inputs(), args)) {
Expand Down Expand Up @@ -329,16 +330,18 @@ void HostIrEvaluator::handle(Communication* communication) {
CommunicatorBackend backend_type = communication->backend();
if (backend_type == CommunicatorBackend::kCuda) {
const auto current_stream = static_cast<CUstream>(
c10::cuda::getCurrentCUDAStream(my_local_device_index_).stream());
c10::cuda::getCurrentCUDAStream(
static_cast<c10::DeviceIndex>(my_local_device_index_))
.stream());
NVF_ERROR(
communication->type() == CommunicationType::Broadcast ||
communication->type() == CommunicationType::Allgather,
"Invalid communication type, expected Broadcast or Allgather, got: ",
communication->type());
int64_t root_val =
expr_evaluator_.evaluate(communication->root()).as<int64_t>();
SymmetricMemoryHandle* multicast_handle =
multicast_handle_cache_.get({output_tensor, communication, root_val});
SymmetricMemoryHandle* multicast_handle = multicast_handle_cache_.get(
{.buffer = output_tensor, .expr = communication, .root = root_val});
postWithCudaBackend(
communication,
input_tensor,
Expand Down Expand Up @@ -369,7 +372,9 @@ void HostIrEvaluator::handle(P2PCommunication* communication) {
if (backend_type == CommunicatorBackend::kCuda) {
const P2pIpcHandle& p2p_ipc_handle = ipc_handle_cache_.get(communication);
const auto current_stream = static_cast<CUstream>(
c10::cuda::getCurrentCUDAStream(my_local_device_index_).stream());
c10::cuda::getCurrentCUDAStream(
static_cast<c10::DeviceIndex>(my_local_device_index_))
.stream());
auto count = buffer.numel() * buffer.element_size();
if (communication->type() == P2PCommunicationType::RECV) {
recvPost(p2p_ipc_handle, count, current_stream);
Expand Down Expand Up @@ -444,7 +449,9 @@ void HostIrEvaluator::handle(Wait* wait) {
auto* p2p_comm = dynamic_cast<P2PCommunication*>(expr);
auto* communication = dynamic_cast<Communication*>(expr);
const auto current_stream = static_cast<CUstream>(
c10::cuda::getCurrentCUDAStream(my_local_device_index_).stream());
c10::cuda::getCurrentCUDAStream(
static_cast<c10::DeviceIndex>(my_local_device_index_))
.stream());
if (p2p_comm && p2p_comm->backend() == CommunicatorBackend::kCuda) {
const P2pIpcHandle& ipc_handles = ipc_handle_cache_.get(p2p_comm);
if (p2p_comm->type() == P2PCommunicationType::SEND) {
Expand All @@ -463,8 +470,8 @@ void HostIrEvaluator::handle(Wait* wait) {
at::Tensor output_tensor = getKnownTensorOrUndefined(communication->out());
int64_t root_val =
expr_evaluator_.evaluate(communication->root()).as<int64_t>();
SymmetricMemoryHandle* multicast_handle =
multicast_handle_cache_.get({output_tensor, communication, root_val});
SymmetricMemoryHandle* multicast_handle = multicast_handle_cache_.get(
{.buffer = output_tensor, .expr = communication, .root = root_val});
waitWithCudaBackend(
communication, multicast_handle, current_stream, root_val);
} else {
Expand Down Expand Up @@ -714,6 +721,28 @@ void HostIrEvaluator::handle(kir::Allocate* allocate) {
expr_evaluator_.bind(tv, tensor);
}

void HostIrEvaluator::handle(hir::Allocate* allocate) {
FUSER_PERF_SCOPE("HostIrEvaluator::handle(Allocate)");
TensorView* tv = allocate->in();

GlobalBufferInfo info =
getBufferInfos(expr_evaluator_, PrimDataType::Int, {tv}).at(0);
c10::Device device =
communicator_ ? communicator_->device() : at::Device("cuda:0");
at::Tensor tensor = at::native::empty_strided_cuda(
info.shape_info.logical_sizes,
info.shape_info.logical_strides,
info.type,
c10::nullopt,
device,
c10::nullopt);

if (allocate->zeroInit()) {
tensor.zero_();
}
expr_evaluator_.bind(tv, tensor);
}

void HostIrEvaluator::handle(HirAliasSelect* hir_alias_select) {
auto indexed_id =
hir_alias_select->in()->getLogicalDomain().at(hir_alias_select->axis());
Expand Down Expand Up @@ -848,10 +877,8 @@ void HostIrEvaluator::handle(ShardByStream* shard) {

const std::vector<IterDomain*>& allocation_domain =
out_tv->getMaybeAllocationDomain();
auto i = std::find_if(
allocation_domain.begin(),
allocation_domain.end(),
std::mem_fn(&IterDomain::isStream));
auto i = std::ranges::find_if(
allocation_domain, std::mem_fn(&IterDomain::isStream));
NVF_ERROR(
i != allocation_domain.end(),
"Stream axis not found in allocation domain: ",
Expand Down Expand Up @@ -915,8 +942,9 @@ void HostIrEvaluator::handle(
at::Tensor in_tensor = getKnownConcreteValue(in_tv).as<at::Tensor>();

// Get or create SymMemForContiguousView from the cache
SymMemForContiguousView* handle = static_cast<SymMemForContiguousView*>(
multicast_handle_cache_.get({in_tensor, symmetric_contiguous_view}));
SymMemForContiguousView* handle =
static_cast<SymMemForContiguousView*>(multicast_handle_cache_.get(
{.buffer = in_tensor, .expr = symmetric_contiguous_view}));

// Bind the symmetric_contiguous_viewed tensor to the output
expr_evaluator_.bind(out_tv, handle->tensor());
Expand Down
3 changes: 2 additions & 1 deletion csrc/host_ir/evaluator.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ class NVF_API HostIrEvaluator final : public OptOutDispatch {
void handle(MatmulOp*) override;
void handle(LinearOp*) override;
void handle(kir::Allocate*) override;
void handle(hir::Allocate*) override;
void handle(LoadStoreOp*) override;
void handle(BinaryOp*) override;
void handle(ReductionOp*) override;
Expand Down Expand Up @@ -138,7 +139,7 @@ class NVF_API HostIrEvaluator final : public OptOutDispatch {
using StreamKey = std::variant<int64_t, Stream*>;
std::unordered_map<StreamKey, c10::cuda::CUDAStream> streams_;
std::unordered_map<Expr*, c10::intrusive_ptr<c10d::Work>> works_;
const int64_t my_local_device_index_;
int64_t my_local_device_index_;
IpcHandleCache ipc_handle_cache_;
SymmetricMemoryHandleCache multicast_handle_cache_;
// Allocation cache
Expand Down
70 changes: 52 additions & 18 deletions csrc/host_ir/ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ std::string PostOnStream::toString(int indent_size) const {
std::for_each(outputs().begin(), outputs().end(), [&ss](auto output) {
ss << output->toString(0) << ", ";
});
ss << "})" << std::endl;
ss << "})\n";
return ss.str();
}

Expand Down Expand Up @@ -149,13 +149,13 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(LaunchKernel)

std::string LaunchKernel::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "LaunchKernel(" << std::endl;
indent(ss, indent_size + 1) << "Group ID: " << groupId() << "," << std::endl;
indent(ss, indent_size) << "LaunchKernel(\n";
indent(ss, indent_size + 1) << "Group ID: " << groupId() << ",\n";
indent(ss, indent_size + 1)
<< "Inputs: {" << toDelimitedString(inputs()) << "}," << std::endl;
<< "Inputs: {" << toDelimitedString(inputs()) << "},\n";
indent(ss, indent_size + 1)
<< "Outputs: {" << toDelimitedString(outputs()) << "}," << std::endl;
indent(ss, indent_size) << ")" << std::endl;
<< "Outputs: {" << toDelimitedString(outputs()) << "},\n";
indent(ss, indent_size) << ")\n";
return ss.str();
}

Expand All @@ -172,9 +172,9 @@ TensorView* Deallocate::buffer() const {

std::string Deallocate::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "Deallocate {" << std::endl;
ss << buffer()->toString(indent_size + 1) << std::endl;
indent(ss, indent_size) << "}" << std::endl;
indent(ss, indent_size) << "Deallocate {\n";
ss << buffer()->toString(indent_size + 1) << '\n';
indent(ss, indent_size) << "}\n";
return ss.str();
}

Expand Down Expand Up @@ -230,8 +230,8 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(SetCurrentStream)

std::string SetCurrentStream::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "SetCurrentStream(" << stream()->toString() << ")"
<< std::endl;
indent(ss, indent_size) << "SetCurrentStream(" << stream()->toString()
<< ")\n";
return ss.str();
}

Expand All @@ -246,7 +246,7 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(GetCurrentStream)
std::string GetCurrentStream::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << stream()->toInlineString()
<< " = GetCurrentStream()" << std::endl;
<< " = GetCurrentStream()\n";
return ss.str();
}

Expand Down Expand Up @@ -319,7 +319,7 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(StartCoalescing)

std::string StartCoalescing::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "StartCoalescing" << std::endl;
indent(ss, indent_size) << "StartCoalescing\n";
return ss.str();
}

Expand All @@ -339,7 +339,7 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(EndCoalescing)

std::string EndCoalescing::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "EndCoalescing " << name() << std::endl;
indent(ss, indent_size) << "EndCoalescing " << name() << '\n';
return ss.str();
}

Expand Down Expand Up @@ -367,7 +367,7 @@ std::string ShareMemHandles::toString(int indent_size) const {
for (auto communication : communications()) {
ss << communication->toInlineString() << ", ";
}
ss << std::endl;
ss << '\n';
return ss.str();
}

Expand Down Expand Up @@ -440,7 +440,7 @@ std::string ShardByStream::toString(int indent_size) const {
indent(ss, indent_size) << out()->toString() << " = ShardByStream("
<< in()->toString()
<< ", stream_index=" << stream_index()->toString()
<< ")" << std::endl;
<< ")\n";
return ss.str();
}

Expand All @@ -462,7 +462,7 @@ NVFUSER_DEFINE_CLONE_AND_CREATE(SymmetricContiguousView)
std::string SymmetricContiguousView::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << out()->toString() << " = SymmetricContiguousView("
<< in()->toString() << ")" << std::endl;
<< in()->toString() << ")\n";
return ss.str();
}

Expand All @@ -484,7 +484,7 @@ std::string ForLoop::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << "FOR " << index()->toString() << " from "
<< start()->toInlineString() << " to "
<< stop()->toInlineString() << ":" << std::endl
<< stop()->toInlineString() << ":\n"
<< body().toString(indent_size + 1);
return ss.str();
}
Expand All @@ -504,4 +504,38 @@ std::string ForLoop::toInlineString(int indent_size) const {
index, iter_domain->start(), iter_domain->stop());
}

Allocate::Allocate(
IrBuilderPasskey passkey,
TensorView* in,
MemoryType memory_type,
bool zero_init)
: Expr(passkey) {
NVF_ERROR(passkey.ir_container_ != nullptr);
NVF_ERROR(passkey.ir_container_->isA<HostIrContainer>());

addInput(in);
addDataAttribute(memory_type);
addDataAttribute(zero_init);
}

NVFUSER_DEFINE_CLONE_AND_CREATE(Allocate)

std::string Allocate::toString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << in()->toString() << " = ALLOCATE("
<< "mem_type=" << memoryType() << ", "
<< "zero_init=" << std::boolalpha << zeroInit()
<< ")\n";
return ss.str();
}

std::string Allocate::toInlineString(int indent_size) const {
std::stringstream ss;
indent(ss, indent_size) << in()->toInlineString() << " = ALLOCATE("
<< "mem_type=" << memoryType() << ", "
<< "zero_init=" << std::boolalpha << zeroInit()
<< ")";
return ss.str();
}

} // namespace nvfuser::hir
36 changes: 36 additions & 0 deletions csrc/host_ir/ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,42 @@ class LaunchKernel : public Expr {
CompiledKernel* compiled_kernel_ = nullptr;
};

class Allocate : public Expr {
public:
using Expr::Expr;

explicit Allocate(
IrBuilderPasskey passkey,
TensorView* in,
MemoryType memory_type,
bool zero_init = false);

Allocate(const Allocate& other) = delete;
Allocate& operator=(const Allocate& other) = delete;
Allocate(Allocate&& other) = delete;
Allocate& operator=(Allocate&& other) = delete;

NVFUSER_DECLARE_CLONE_AND_CREATE

std::string toString(int indent_size = 0) const override;
std::string toInlineString(int indent_size = 0) const override;
const char* getOpString() const override {
return "hir::Allocate";
}

TensorView* in() const {
return inputs().at(0)->as<TensorView>();
}

MemoryType memoryType() const {
return attribute<MemoryType>(0);
}

bool zeroInit() const {
return attribute<bool>(1);
}
};

class Deallocate : public Expr {
public:
using Expr::Expr;
Expand Down
Loading
Loading