Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 3 additions & 28 deletions include/infinicore/memory.hpp
Original file line number Diff line number Diff line change
@@ -1,30 +1,5 @@
#pragma once

#include "device.hpp"

#include <cstddef>
#include <functional>

namespace infinicore {

class Memory {
public:
using Deleter = std::function<void(std::byte *)>;

Memory(std::byte *data, size_t size, Device device, Deleter deleter, bool pin_memory = false);
~Memory();

std::byte *data();
Device device() const;
size_t size() const;
bool is_pinned() const;

private:
std::byte *data_;
size_t size_;
Device device_;
Deleter deleter_;
bool is_pinned_;
};

} // namespace infinicore
#include "memory/memory_block.hpp"
#include "memory/memory_pool.hpp"
#include "memory/stats.hpp"
39 changes: 39 additions & 0 deletions include/infinicore/memory/memory_block.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#pragma once

#include "../device.hpp"

#include <cstddef>
#include <functional>
#include <memory>

namespace infinicore {

class Memory {
public:
using Deleter = std::function<void(std::byte *)>;

Memory(std::byte *data, size_t size, Device device, Deleter deleter, bool pin_memory = false);
~Memory();

// Copy constructor and copy assignment with reference counting
Memory(const Memory& other);
Memory& operator=(const Memory& other);

// Move constructor and move assignment
Memory(Memory&& other) noexcept;
Memory& operator=(Memory&& other) noexcept;

std::byte *data() const;
Device device() const;
size_t size() const;
bool is_pinned() const;

private:
std::byte *data_;
size_t size_;
Device device_;
Deleter deleter_;
bool is_pinned_;
};

} // namespace infinicore
52 changes: 52 additions & 0 deletions include/infinicore/memory/memory_pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#pragma once

#include <memory>
#include <unordered_map>
#include <mutex>
#include <atomic>
#include <cstddef>
#include <functional>

namespace infinicore {

struct MemoryInfo {
std::byte* ptr;
size_t size;
std::atomic<int> ref_count;
bool is_freed;

MemoryInfo(std::byte* p, size_t s)
: ptr(p), size(s), ref_count(1), is_freed(false) {}
};

class MemoryPool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

include目录下都是会暴露给外部的接口,这个设计是要把memory pool的接口暴露给外部吗?什么情况下用户或者上层框架需要直接和memory pool交互?

public:
static MemoryPool& instance();

// Register a memory allocation
void registerMemory(std::byte* ptr, size_t size);

// Increment reference count
void addRef(std::byte* ptr);

// Decrement reference count and potentially free memory
void releaseMemory(std::byte* ptr, std::function<void(std::byte*)> actual_deleter);

// Get reference count
int getRefCount(std::byte* ptr) const;

// Check if memory is registered
bool isRegistered(std::byte* ptr) const;

// Check if memory is already freed
bool isFreed(std::byte* ptr) const;

private:
MemoryPool() = default;
~MemoryPool() = default;

mutable std::mutex mutex_;
std::unordered_map<std::byte*, std::shared_ptr<MemoryInfo>> memory_map_;
};

} // namespace infinicore
Empty file.
18 changes: 18 additions & 0 deletions python/infinicore/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@
zeros,
)

from .lib._infinicore import (
get_device_memory_stats,
get_pinned_host_memory_stats,
get_device_memory_stats_by_device,
get_pinned_host_memory_stats_by_device,
Stat,
StatType,
DeviceStats,
)

__all__ = [
# Classes.
"device",
Expand Down Expand Up @@ -71,4 +81,12 @@
"strided_empty",
"strided_from_blob",
"zeros",
# Memory Statistics.
"get_device_memory_stats",
"get_pinned_host_memory_stats",
"get_device_memory_stats_by_device",
"get_pinned_host_memory_stats_by_device",
"Stat",
"StatType",
"DeviceStats",
]
4 changes: 2 additions & 2 deletions src/infinicore-test/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ int main(int argc, char *argv[]) {
}

if (args.run_concurrency) {
runner.addTest(std::make_unique<infinicore::test::ConcurrencyTest>());
runner.addTest(std::make_unique<infinicore::test::ConcurrencyTest>(args.num_threads));
}

if (args.run_exception_safety) {
Expand All @@ -188,7 +188,7 @@ int main(int argc, char *argv[]) {
}

if (args.run_stress) {
runner.addTest(std::make_unique<infinicore::test::StressTest>());
runner.addTest(std::make_unique<infinicore::test::StressTest>(args.iterations));
}

spdlog::debug("About to run all tests");
Expand Down
33 changes: 19 additions & 14 deletions src/infinicore-test/memory_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,19 +96,20 @@ TestResult ConcurrencyTest::run() {
std::cerr << "Concurrent allocations test failed: " << result1.error_message << std::endl;
return false;
}
std::cout << "Concurrent allocations test passed" << std::endl;

auto result2 = testConcurrentDeviceSwitching();
if (!result2.passed) {
std::cerr << "Concurrent device switching test failed: " << result2.error_message << std::endl;
return false;
}

std::cout << "Concurrent device switching test passed" << std::endl;
auto result3 = testMemoryAllocationRace();
if (!result3.passed) {
std::cerr << "Memory allocation race test failed: " << result3.error_message << std::endl;
return false;
}

std::cout << "Memory allocation race test passed" << std::endl;
return true;
} catch (const std::exception &e) {
std::cerr << "ConcurrencyTest failed with exception: " << e.what() << std::endl;
Expand All @@ -119,19 +120,20 @@ TestResult ConcurrencyTest::run() {

TestResult ConcurrencyTest::testConcurrentAllocations() {
return measureTime("ConcurrentAllocations", [this]() -> bool {
const int num_threads = 8;
const int allocations_per_thread = 100;
std::vector<std::thread> threads;
std::atomic<int> success_count{0};
std::atomic<int> failure_count{0};

for (int i = 0; i < num_threads; ++i) {
for (int i = 0; i < num_threads_; ++i) {
threads.emplace_back([&, i]() {
try {
for (int j = 0; j < allocations_per_thread; ++j) {
// Allocate memory of random size
size_t size = 64 + (j % 1024);
spdlog::debug("Thread {}: ConcurrentAllocations: Allocating memory of size {}", i, size);
auto memory = context::allocateMemory(size);
spdlog::debug("Thread {}: ConcurrentAllocations: Memory allocated successfully", i);
if (memory && memory->size() == size) {
success_count++;
} else {
Expand All @@ -152,7 +154,7 @@ TestResult ConcurrencyTest::testConcurrentAllocations() {
thread.join();
}

int total_expected = num_threads * allocations_per_thread;
int total_expected = num_threads_ * allocations_per_thread;
if (success_count.load() != total_expected) {
std::cerr << "Concurrent allocation test failed: expected " << total_expected
<< " successes, got " << success_count.load()
Expand All @@ -166,7 +168,6 @@ TestResult ConcurrencyTest::testConcurrentAllocations() {

TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
return measureTime("ConcurrentDeviceSwitching", [this]() -> bool {
const int num_threads = 4;
std::vector<std::thread> threads;
std::atomic<int> success_count{0};
std::atomic<int> failure_count{0};
Expand All @@ -185,7 +186,7 @@ TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
return true;
}

for (int i = 0; i < num_threads; ++i) {
for (int i = 0; i < num_threads_; ++i) {
threads.emplace_back([&, i, devices]() {
try {
for (int j = 0; j < 50; ++j) {
Expand Down Expand Up @@ -239,22 +240,23 @@ TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
TestResult ConcurrencyTest::testMemoryAllocationRace() {
return measureTime("MemoryAllocationRace", [this]() -> bool {
const int num_threads = 16;
const int allocations_per_thread = 1000;
const int allocations_per_thread = 1024;
std::vector<std::thread> threads;
std::atomic<int> success_count{0};
std::atomic<int> failure_count{0};
std::vector<std::shared_ptr<Memory>> all_allocations;
std::vector<Memory> all_allocations;
std::mutex allocations_mutex;

for (int i = 0; i < num_threads; ++i) {
threads.emplace_back([&, i]() {
std::vector<std::shared_ptr<Memory>> thread_allocations;
std::vector<Memory> thread_allocations;
try {
for (int j = 0; j < allocations_per_thread; ++j) {
size_t size = 64 + (j % 1024);
auto memory = context::allocateMemory(size);
if (memory) {
thread_allocations.push_back(memory);
// Copy the Memory object - reference counting will handle the lifecycle
thread_allocations.push_back(*memory);
success_count++;
} else {
failure_count++;
Expand All @@ -264,9 +266,11 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {
if (j % 10 == 0 && !thread_allocations.empty()) {
thread_allocations.pop_back();
}
spdlog::debug("Thread {} iteration {}: Memory allocation race: Allocated memory of size {} done", i, j, size);
}

// Store remaining allocations
// Store remaining allocations - copying Memory objects with reference counting
// prevents double-free while maintaining the original test logic
std::lock_guard<std::mutex> lock(allocations_mutex);
all_allocations.insert(all_allocations.end(),
thread_allocations.begin(),
Expand All @@ -284,7 +288,7 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {

// Verify all allocations are valid
for (const auto &memory : all_allocations) {
if (!memory || !memory->data()) {
if (!memory.data()) {
std::cerr << "Invalid memory allocation found" << std::endl;
return false;
}
Expand All @@ -297,6 +301,7 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {
return false;
}

spdlog::debug("Memory allocation race test: All allocations: {}", all_allocations.size());
return true;
});
}
Expand Down Expand Up @@ -778,7 +783,7 @@ TestResult StressTest::run() {
TestResult StressTest::testHighFrequencyAllocations() {
return measureTime("HighFrequencyAllocations", [this]() -> bool {
try {
const int num_allocations = 100000;
const int num_allocations = iterations_;
std::vector<std::shared_ptr<Memory>> memories;
memories.reserve(num_allocations);

Expand Down
4 changes: 4 additions & 0 deletions src/infinicore-test/memory_test.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ class BasicMemoryTest : public MemoryTestFramework {

class ConcurrencyTest : public MemoryTestFramework {
public:
ConcurrencyTest(int num_threads = 4) : num_threads_(num_threads) {}
TestResult run() override;
std::string getName() const override { return "ConcurrencyTest"; }

private:
int num_threads_;
TestResult testConcurrentAllocations();
TestResult testConcurrentDeviceSwitching();
TestResult testMemoryAllocationRace();
Expand Down Expand Up @@ -201,10 +203,12 @@ class PerformanceTest : public MemoryTestFramework {

class StressTest : public MemoryTestFramework {
public:
StressTest(int iterations = 1000) : iterations_(iterations) {}
TestResult run() override;
std::string getName() const override { return "StressTest"; }

private:
int iterations_;
TestResult testHighFrequencyAllocations();
TestResult testLargeMemoryAllocations();
TestResult testCrossDeviceStress();
Expand Down
29 changes: 29 additions & 0 deletions src/infinicore/context/allocators/device_caching_allocator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,40 @@ DeviceCachingAllocator::DeviceCachingAllocator(Device device) : MemoryAllocator(

std::byte *DeviceCachingAllocator::allocate(size_t size) {
void *ptr = nullptr;
spdlog::debug("DeviceCachingAllocator::allocate() called for size={}", size);

// Update statistics before allocation
stats_.allocation[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
stats_.requested_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);

INFINICORE_CHECK_ERROR(infinirtMallocAsync(&ptr, size, context::getStream()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

你这不还是用的原生的mallocAsync接口,那你的内存池用哪了?


// Update statistics after successful allocation
stats_.segment[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
stats_.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
stats_.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
stats_.active[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
stats_.active_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
stats_.num_device_alloc++;

spdlog::debug("DeviceCachingAllocator::allocate() returned memory={}", static_cast<void *>(ptr));
return (std::byte *)ptr;
}

void DeviceCachingAllocator::deallocate(std::byte *ptr) {
spdlog::debug("DeviceCachingAllocator::deallocate() called for memory={}", static_cast<void *>(ptr));

// Update statistics before deallocation
stats_.active[static_cast<size_t>(StatType::AGGREGATE)].decrease(1);
// Note: We don't know the exact size being deallocated here, so we can't update
// active_bytes, allocated_bytes, etc. This is a limitation of the current design.
// In a more sophisticated implementation, we would track the size of each allocation.

INFINICORE_CHECK_ERROR(infinirtFreeAsync(ptr, context::getStream()));

// Update statistics after successful deallocation
stats_.num_device_free++;

spdlog::debug("DeviceCachingAllocator::deallocate() returned");
}
} // namespace infinicore
Loading