Skip to content

Commit d87d536

Browse files
committed
issue/513: add memoryusage stats from allocator && mem ref count, mem pool implementation
Signed-off-by: Ceng23333 <[email protected]>
1 parent e6dae2e commit d87d536

File tree

23 files changed

+775
-57
lines changed

23 files changed

+775
-57
lines changed

include/infinicore/memory.hpp

Lines changed: 3 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,5 @@
11
#pragma once
22

3-
#include "device.hpp"
4-
5-
#include <cstddef>
6-
#include <functional>
7-
8-
namespace infinicore {
9-
10-
class Memory {
11-
public:
12-
using Deleter = std::function<void(std::byte *)>;
13-
14-
Memory(std::byte *data, size_t size, Device device, Deleter deleter, bool pin_memory = false);
15-
~Memory();
16-
17-
std::byte *data();
18-
Device device() const;
19-
size_t size() const;
20-
bool is_pinned() const;
21-
22-
private:
23-
std::byte *data_;
24-
size_t size_;
25-
Device device_;
26-
Deleter deleter_;
27-
bool is_pinned_;
28-
};
29-
30-
} // namespace infinicore
3+
#include "memory/memory_block.hpp"
4+
#include "memory/memory_pool.hpp"
5+
#include "memory/stats.hpp"
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#pragma once
2+
3+
#include "../device.hpp"
4+
5+
#include <cstddef>
6+
#include <functional>
7+
#include <memory>
8+
9+
namespace infinicore {
10+
11+
class Memory {
12+
public:
13+
using Deleter = std::function<void(std::byte *)>;
14+
15+
Memory(std::byte *data, size_t size, Device device, Deleter deleter, bool pin_memory = false);
16+
~Memory();
17+
18+
// Copy constructor and copy assignment with reference counting
19+
Memory(const Memory& other);
20+
Memory& operator=(const Memory& other);
21+
22+
// Move constructor and move assignment
23+
Memory(Memory&& other) noexcept;
24+
Memory& operator=(Memory&& other) noexcept;
25+
26+
std::byte *data() const;
27+
Device device() const;
28+
size_t size() const;
29+
bool is_pinned() const;
30+
31+
private:
32+
std::byte *data_;
33+
size_t size_;
34+
Device device_;
35+
Deleter deleter_;
36+
bool is_pinned_;
37+
};
38+
39+
} // namespace infinicore
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
#pragma once
2+
3+
#include <memory>
4+
#include <unordered_map>
5+
#include <mutex>
6+
#include <atomic>
7+
#include <cstddef>
8+
#include <functional>
9+
10+
namespace infinicore {
11+
12+
struct MemoryInfo {
13+
std::byte* ptr;
14+
size_t size;
15+
std::atomic<int> ref_count;
16+
bool is_freed;
17+
18+
MemoryInfo(std::byte* p, size_t s)
19+
: ptr(p), size(s), ref_count(1), is_freed(false) {}
20+
};
21+
22+
class MemoryPool {
23+
public:
24+
static MemoryPool& instance();
25+
26+
// Register a memory allocation
27+
void registerMemory(std::byte* ptr, size_t size);
28+
29+
// Increment reference count
30+
void addRef(std::byte* ptr);
31+
32+
// Decrement reference count and potentially free memory
33+
void releaseMemory(std::byte* ptr, std::function<void(std::byte*)> actual_deleter);
34+
35+
// Get reference count
36+
int getRefCount(std::byte* ptr) const;
37+
38+
// Check if memory is registered
39+
bool isRegistered(std::byte* ptr) const;
40+
41+
// Check if memory is already freed
42+
bool isFreed(std::byte* ptr) const;
43+
44+
private:
45+
MemoryPool() = default;
46+
~MemoryPool() = default;
47+
48+
mutable std::mutex mutex_;
49+
std::unordered_map<std::byte*, std::shared_ptr<MemoryInfo>> memory_map_;
50+
};
51+
52+
} // namespace infinicore

include/infinicore/memory/stats.hpp

Whitespace-only changes.

python/infinicore/__init__.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,16 @@
3535
zeros,
3636
)
3737

38+
from .lib._infinicore import (
39+
get_device_memory_stats,
40+
get_pinned_host_memory_stats,
41+
get_device_memory_stats_by_device,
42+
get_pinned_host_memory_stats_by_device,
43+
Stat,
44+
StatType,
45+
DeviceStats,
46+
)
47+
3848
__all__ = [
3949
# Classes.
4050
"device",
@@ -71,4 +81,12 @@
7181
"strided_empty",
7282
"strided_from_blob",
7383
"zeros",
84+
# Memory Statistics.
85+
"get_device_memory_stats",
86+
"get_pinned_host_memory_stats",
87+
"get_device_memory_stats_by_device",
88+
"get_pinned_host_memory_stats_by_device",
89+
"Stat",
90+
"StatType",
91+
"DeviceStats",
7492
]

src/infinicore-test/main.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ int main(int argc, char *argv[]) {
172172
}
173173

174174
if (args.run_concurrency) {
175-
runner.addTest(std::make_unique<infinicore::test::ConcurrencyTest>());
175+
runner.addTest(std::make_unique<infinicore::test::ConcurrencyTest>(args.num_threads));
176176
}
177177

178178
if (args.run_exception_safety) {
@@ -188,7 +188,7 @@ int main(int argc, char *argv[]) {
188188
}
189189

190190
if (args.run_stress) {
191-
runner.addTest(std::make_unique<infinicore::test::StressTest>());
191+
runner.addTest(std::make_unique<infinicore::test::StressTest>(args.iterations));
192192
}
193193

194194
spdlog::debug("About to run all tests");

src/infinicore-test/memory_test.cc

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,20 @@ TestResult ConcurrencyTest::run() {
9696
std::cerr << "Concurrent allocations test failed: " << result1.error_message << std::endl;
9797
return false;
9898
}
99+
std::cout << "Concurrent allocations test passed" << std::endl;
99100

100101
auto result2 = testConcurrentDeviceSwitching();
101102
if (!result2.passed) {
102103
std::cerr << "Concurrent device switching test failed: " << result2.error_message << std::endl;
103104
return false;
104105
}
105-
106+
std::cout << "Concurrent device switching test passed" << std::endl;
106107
auto result3 = testMemoryAllocationRace();
107108
if (!result3.passed) {
108109
std::cerr << "Memory allocation race test failed: " << result3.error_message << std::endl;
109110
return false;
110111
}
111-
112+
std::cout << "Memory allocation race test passed" << std::endl;
112113
return true;
113114
} catch (const std::exception &e) {
114115
std::cerr << "ConcurrencyTest failed with exception: " << e.what() << std::endl;
@@ -119,19 +120,20 @@ TestResult ConcurrencyTest::run() {
119120

120121
TestResult ConcurrencyTest::testConcurrentAllocations() {
121122
return measureTime("ConcurrentAllocations", [this]() -> bool {
122-
const int num_threads = 8;
123123
const int allocations_per_thread = 100;
124124
std::vector<std::thread> threads;
125125
std::atomic<int> success_count{0};
126126
std::atomic<int> failure_count{0};
127127

128-
for (int i = 0; i < num_threads; ++i) {
128+
for (int i = 0; i < num_threads_; ++i) {
129129
threads.emplace_back([&, i]() {
130130
try {
131131
for (int j = 0; j < allocations_per_thread; ++j) {
132132
// Allocate memory of random size
133133
size_t size = 64 + (j % 1024);
134+
spdlog::debug("Thread {}: ConcurrentAllocations: Allocating memory of size {}", i, size);
134135
auto memory = context::allocateMemory(size);
136+
spdlog::debug("Thread {}: ConcurrentAllocations: Memory allocated successfully", i);
135137
if (memory && memory->size() == size) {
136138
success_count++;
137139
} else {
@@ -152,7 +154,7 @@ TestResult ConcurrencyTest::testConcurrentAllocations() {
152154
thread.join();
153155
}
154156

155-
int total_expected = num_threads * allocations_per_thread;
157+
int total_expected = num_threads_ * allocations_per_thread;
156158
if (success_count.load() != total_expected) {
157159
std::cerr << "Concurrent allocation test failed: expected " << total_expected
158160
<< " successes, got " << success_count.load()
@@ -166,7 +168,6 @@ TestResult ConcurrencyTest::testConcurrentAllocations() {
166168

167169
TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
168170
return measureTime("ConcurrentDeviceSwitching", [this]() -> bool {
169-
const int num_threads = 4;
170171
std::vector<std::thread> threads;
171172
std::atomic<int> success_count{0};
172173
std::atomic<int> failure_count{0};
@@ -185,7 +186,7 @@ TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
185186
return true;
186187
}
187188

188-
for (int i = 0; i < num_threads; ++i) {
189+
for (int i = 0; i < num_threads_; ++i) {
189190
threads.emplace_back([&, i, devices]() {
190191
try {
191192
for (int j = 0; j < 50; ++j) {
@@ -239,22 +240,23 @@ TestResult ConcurrencyTest::testConcurrentDeviceSwitching() {
239240
TestResult ConcurrencyTest::testMemoryAllocationRace() {
240241
return measureTime("MemoryAllocationRace", [this]() -> bool {
241242
const int num_threads = 16;
242-
const int allocations_per_thread = 1000;
243+
const int allocations_per_thread = 1024;
243244
std::vector<std::thread> threads;
244245
std::atomic<int> success_count{0};
245246
std::atomic<int> failure_count{0};
246-
std::vector<std::shared_ptr<Memory>> all_allocations;
247+
std::vector<Memory> all_allocations;
247248
std::mutex allocations_mutex;
248249

249250
for (int i = 0; i < num_threads; ++i) {
250251
threads.emplace_back([&, i]() {
251-
std::vector<std::shared_ptr<Memory>> thread_allocations;
252+
std::vector<Memory> thread_allocations;
252253
try {
253254
for (int j = 0; j < allocations_per_thread; ++j) {
254255
size_t size = 64 + (j % 1024);
255256
auto memory = context::allocateMemory(size);
256257
if (memory) {
257-
thread_allocations.push_back(memory);
258+
// Copy the Memory object - reference counting will handle the lifecycle
259+
thread_allocations.push_back(*memory);
258260
success_count++;
259261
} else {
260262
failure_count++;
@@ -264,9 +266,11 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {
264266
if (j % 10 == 0 && !thread_allocations.empty()) {
265267
thread_allocations.pop_back();
266268
}
269+
spdlog::debug("Thread {} iteration {}: Memory allocation race: Allocated memory of size {} done", i, j, size);
267270
}
268271

269-
// Store remaining allocations
272+
// Store remaining allocations - copying Memory objects with reference counting
273+
// prevents double-free while maintaining the original test logic
270274
std::lock_guard<std::mutex> lock(allocations_mutex);
271275
all_allocations.insert(all_allocations.end(),
272276
thread_allocations.begin(),
@@ -284,7 +288,7 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {
284288

285289
// Verify all allocations are valid
286290
for (const auto &memory : all_allocations) {
287-
if (!memory || !memory->data()) {
291+
if (!memory.data()) {
288292
std::cerr << "Invalid memory allocation found" << std::endl;
289293
return false;
290294
}
@@ -297,6 +301,7 @@ TestResult ConcurrencyTest::testMemoryAllocationRace() {
297301
return false;
298302
}
299303

304+
spdlog::debug("Memory allocation race test: All allocations: {}", all_allocations.size());
300305
return true;
301306
});
302307
}
@@ -778,7 +783,7 @@ TestResult StressTest::run() {
778783
TestResult StressTest::testHighFrequencyAllocations() {
779784
return measureTime("HighFrequencyAllocations", [this]() -> bool {
780785
try {
781-
const int num_allocations = 100000;
786+
const int num_allocations = iterations_;
782787
std::vector<std::shared_ptr<Memory>> memories;
783788
memories.reserve(num_allocations);
784789

src/infinicore-test/memory_test.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,12 @@ class BasicMemoryTest : public MemoryTestFramework {
157157

158158
class ConcurrencyTest : public MemoryTestFramework {
159159
public:
160+
ConcurrencyTest(int num_threads = 4) : num_threads_(num_threads) {}
160161
TestResult run() override;
161162
std::string getName() const override { return "ConcurrencyTest"; }
162163

163164
private:
165+
int num_threads_;
164166
TestResult testConcurrentAllocations();
165167
TestResult testConcurrentDeviceSwitching();
166168
TestResult testMemoryAllocationRace();
@@ -201,10 +203,12 @@ class PerformanceTest : public MemoryTestFramework {
201203

202204
class StressTest : public MemoryTestFramework {
203205
public:
206+
StressTest(int iterations = 1000) : iterations_(iterations) {}
204207
TestResult run() override;
205208
std::string getName() const override { return "StressTest"; }
206209

207210
private:
211+
int iterations_;
208212
TestResult testHighFrequencyAllocations();
209213
TestResult testLargeMemoryAllocations();
210214
TestResult testCrossDeviceStress();

src/infinicore/context/allocators/device_caching_allocator.cc

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,40 @@ DeviceCachingAllocator::DeviceCachingAllocator(Device device) : MemoryAllocator(
99

1010
std::byte *DeviceCachingAllocator::allocate(size_t size) {
1111
void *ptr = nullptr;
12+
spdlog::debug("DeviceCachingAllocator::allocate() called for size={}", size);
13+
14+
// Update statistics before allocation
15+
stats_.allocation[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
16+
stats_.requested_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
17+
1218
INFINICORE_CHECK_ERROR(infinirtMallocAsync(&ptr, size, context::getStream()));
19+
20+
// Update statistics after successful allocation
21+
stats_.segment[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
22+
stats_.allocated_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
23+
stats_.reserved_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
24+
stats_.active[static_cast<size_t>(StatType::AGGREGATE)].increase(1);
25+
stats_.active_bytes[static_cast<size_t>(StatType::AGGREGATE)].increase(size);
26+
stats_.num_device_alloc++;
27+
28+
spdlog::debug("DeviceCachingAllocator::allocate() returned memory={}", static_cast<void *>(ptr));
1329
return (std::byte *)ptr;
1430
}
1531

1632
void DeviceCachingAllocator::deallocate(std::byte *ptr) {
33+
spdlog::debug("DeviceCachingAllocator::deallocate() called for memory={}", static_cast<void *>(ptr));
34+
35+
// Update statistics before deallocation
36+
stats_.active[static_cast<size_t>(StatType::AGGREGATE)].decrease(1);
37+
// Note: We don't know the exact size being deallocated here, so we can't update
38+
// active_bytes, allocated_bytes, etc. This is a limitation of the current design.
39+
// In a more sophisticated implementation, we would track the size of each allocation.
40+
1741
INFINICORE_CHECK_ERROR(infinirtFreeAsync(ptr, context::getStream()));
42+
43+
// Update statistics after successful deallocation
44+
stats_.num_device_free++;
45+
46+
spdlog::debug("DeviceCachingAllocator::deallocate() returned");
1847
}
1948
} // namespace infinicore

0 commit comments

Comments
 (0)