Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@
# add container sources and headers to common library target
#
set(containers_headers
ring_buffer.hpp c_array.hpp operators.hpp record_header_buffer.hpp ring_buffer.hpp
small_vector.hpp stable_vector.hpp static_vector.hpp)
ring_buffer.hpp
c_array.hpp
operators.hpp
pool.hpp
pool_object.hpp
record_header_buffer.hpp
ring_buffer.hpp
small_vector.hpp
stable_vector.hpp
static_vector.hpp)
set(containers_sources ring_buffer.cpp record_header_buffer.cpp ring_buffer.cpp
small_vector.cpp)

Expand Down
204 changes: 204 additions & 0 deletions projects/rocprofiler-sdk/source/lib/common/container/pool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// MIT License
//
// Copyright (c) 2026 Advanced Micro Devices, Inc. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

#pragma once

#include "lib/common/container/pool_object.hpp"
#include "lib/common/container/stable_vector.hpp"
#include "lib/common/defines.hpp"
#include "lib/common/demangle.hpp"
#include "lib/common/logging.hpp"

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <mutex>
#include <optional>
#include <queue>
#include <shared_mutex>
#include <stdexcept>
#include <utility>

namespace rocprofiler
{
namespace common
{
namespace container
{
template <typename Tp>
struct pool
{
using size_type = size_t;

// template <typename... Args>
// explicit pool(Args&&... args)
// : m_pool{std::forward<Args>(args)...}
// {}

template <typename FuncT, typename... Args>
explicit pool(std::piecewise_construct_t, size_type count, FuncT&& ctor, Args&&... args)
: m_count{count}
{
m_function = [this,
_ctor = std::forward<FuncT>(ctor),
_args_tuple = std::make_tuple(std::forward<Args>(args)...)]() {
for(size_type i = 0; i < m_count; ++i)
{
auto idx = m_pool.size();
m_pool.emplace_back(idx, false, this);
std::apply(
[&](auto&&... unpacked_args) {
_ctor(m_pool[idx].get(),
std::forward<decltype(unpacked_args)>(unpacked_args)...);
},
_args_tuple);
m_available.push(idx);
}
};

m_function();
}

pool() = default;
~pool() = default;
pool(const pool&) = delete;
pool(pool&&) noexcept = default;
pool& operator=(const pool&) = delete;
pool& operator=(pool&&) noexcept = default;

// get an object from the pool. if all objects are in use, a new one will be created and added
// to the pool
pool_object<Tp>& acquire();
void release(size_type idx);

template <typename FuncT, typename... Args>
pool_object<Tp>& acquire(FuncT&& ctor, Args&&... args);

void report_reuse()
{
ROCP_WARNING << fmt::format("Pool of type {}: Total pool size: {}. Reused objects: {}. "
"Released objects: {}. New batches: {}.",
cxx_demangle(typeid(Tp).name()),
m_pool.size(),
m_reused.load(),
m_released.load(),
m_new_batch.load());
}

private:
size_type m_count = 256;
std::function<void()> m_function = nullptr;
mutable std::shared_mutex m_pool_mtx = {};
stable_vector<pool_object<Tp>, 32> m_pool = {};
mutable std::shared_mutex m_available_mtx = {};
std::queue<size_type> m_available = {};
std::atomic<size_type> m_released = 0;
std::atomic<size_type> m_reused = 0;
std::atomic<size_type> m_new_batch = 0;
};

template <typename Tp>
pool_object<Tp>&
pool<Tp>::acquire()
{
auto _idx = std::optional<size_type>{};
{
auto _read_lk = std::shared_lock<std::shared_mutex>{m_available_mtx};
if(!m_available.empty())
{
_read_lk.unlock();
auto _write_lk = std::unique_lock<std::shared_mutex>{m_available_mtx};
_idx = m_available.front();
m_available.pop();
if(m_released > 0)
{
m_reused++;
m_released--;
}
}
}

if(_idx.has_value())
{
auto _read_lk = std::shared_lock<std::shared_mutex>{m_available_mtx};
auto& _obj = m_pool.at(_idx.value());
Comment on lines +146 to +147
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool::acquire() reads from m_pool without taking m_pool_mtx (it only holds m_available_mtx). Another thread can enter the "add a new batch" path and mutate m_pool under m_pool_mtx, causing a data race / invalid reference.

Take a shared lock on m_pool_mtx when accessing m_pool (and similarly in release() when calling m_pool.at(idx)), so reads and batch growth are properly synchronized.

Suggested change
auto _read_lk = std::shared_lock<std::shared_mutex>{m_available_mtx};
auto& _obj = m_pool.at(_idx.value());
// Protect read access to m_pool with a shared lock on m_pool_mtx
auto _pool_read_lk = std::shared_lock<std::shared_mutex>{m_pool_mtx};
auto& _obj = m_pool.at(_idx.value());

Copilot uses AI. Check for mistakes.
ROCP_FATAL_IF(!_obj.acquire()) << fmt::format(
"Pool object at index {} was expected to be available but was not", _idx.value());
return _obj;
}

// add a new batch
{
auto _write_pool_lk = std::unique_lock<std::shared_mutex>{m_pool_mtx};
auto _write_avail_lk = std::unique_lock<std::shared_mutex>{m_available_mtx};
ROCP_WARNING << fmt::format(
"Pool of type {} exhausted. Creating new batch of {} objects. New pool size: {}",
cxx_demangle(typeid(Tp).name()),
m_count,
m_pool.size() + m_count);
m_new_batch++;
m_function();
}

return acquire();
// auto _idx_v = m_pool.size();
// auto& _ref = m_pool.emplace_back(_idx_v, true, this);
// ROCP_INFO << fmt::format("Pool of type {} exhausted. Creating new object. New pool size: {}",
// typeid(Tp).name(),
// m_pool.size());
// return _ref;
}

template <typename Tp>
void
pool<Tp>::release(size_type idx)
{
if(idx < m_pool.size())
{
auto _write_lk = std::unique_lock<std::shared_mutex>{m_available_mtx};
ROCP_FATAL_IF(m_pool.at(idx).in_use())
<< fmt::format("Pool object at index {} was expected to be not in use", idx);
// ROCP_WARNING << fmt::format(
// "Releasing object at index {} back to pool of type {}", idx, typeid(Tp).name());
m_available.push(idx);
m_released++;
}
}

// get an object from the pool. if all objects are in use, a new one will be created and added to
// the pool
template <typename Tp>
template <typename FuncT, typename... Args>
pool_object<Tp>&
pool<Tp>::acquire(FuncT&& ctor, Args&&... args)
{
auto& _ref = acquire();
ctor(_ref.get(), std::forward<Args>(args)...);
return _ref;
}
} // namespace container
} // namespace common
} // namespace rocprofiler
121 changes: 121 additions & 0 deletions projects/rocprofiler-sdk/source/lib/common/container/pool_object.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// MIT License
//
// Copyright (c) 2026 Advanced Micro Devices, Inc. All Rights Reserved.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

#pragma once

#include "lib/common/defines.hpp"
#include "lib/common/logging.hpp"
#include "rocprofiler-sdk/cxx/utility.hpp"

#include <fmt/format.h>
#include <fmt/ranges.h>

#include <atomic>
#include <cstddef>
#include <cstdint>

namespace rocprofiler
{
namespace common
{
namespace container
{
template <typename Tp>
struct pool;

template <typename Tp>
struct pool_object
{
using pool_type = pool<Tp>;

pool_object(size_t idx, bool in_use, pool_type* pool)
: m_in_use{in_use}
, m_index{idx}
, m_pool{pool}
{}

pool_object() = default;
~pool_object() = default;
pool_object(pool_object&&) noexcept = default;
pool_object& operator=(pool_object&&) noexcept = default;

// pool_object(const pool_object& rhs) = delete;
// pool_object& operator=(const pool_object& rhs) = delete;

pool_object(const pool_object& rhs)
: m_object{rhs.m_object}
, m_in_use{rhs.m_in_use.load(std::memory_order_relaxed)}
, m_index{rhs.m_index}
, m_pool{rhs.m_pool}
{}

pool_object& operator=(const pool_object& rhs)
{
if(this != &rhs)
{
m_object = rhs.m_object;
m_in_use.store(rhs.m_in_use.load(std::memory_order_relaxed), std::memory_order_relaxed);
m_index = rhs.m_index;
m_pool = rhs.m_pool;
}
return *this;
}

bool acquire();
bool release();
bool in_use() const { return m_in_use.load(std::memory_order_relaxed); }

Tp& get() { return m_object; }
const Tp& get() const { return m_object; }

auto index() const { return m_index; }
auto index(size_t index) { m_index = index; }

private:
Tp m_object = {};
std::atomic<bool> m_in_use = false;
size_t m_index = 0;
pool_type* m_pool = nullptr;
};

template <typename Tp>
bool
pool_object<Tp>::acquire()
{
bool expected = false;
return m_in_use.compare_exchange_strong(expected, true);
}

template <typename Tp>
bool
pool_object<Tp>::release()
{
bool expected = true;
auto val = m_in_use.compare_exchange_strong(expected, false);

if(m_pool) m_pool->release(m_index);
Copy link

Copilot AI Mar 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pool_object::release() calls m_pool->release(m_index) even when the compare_exchange_strong fails (i.e., the object wasn't actually in use). That can push the same index into the available queue multiple times and lead to multiple threads acquiring the same pooled object.

Only return the index to the pool when the state transition from in-use -> free succeeds.

Suggested change
if(m_pool) m_pool->release(m_index);
if(val && m_pool) m_pool->release(m_index);

Copilot uses AI. Check for mistakes.

return val;
}
} // namespace container
} // namespace common
} // namespace rocprofiler
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

#include <algorithm>
#include <atomic>
#include <cstring>
#include <new>

namespace rocprofiler::common::container
Expand Down Expand Up @@ -106,12 +107,13 @@ record_header_buffer::clear()
{
auto _sz = m_buffer.capacity();
if(!m_buffer.clear(std::nothrow_t{})) return 0;
std::for_each(m_headers.begin(), m_headers.end(), [](auto& itr) {
rocprofiler_record_header_t record = {};
record.hash = 0;
record.payload = nullptr;
itr = record;
});
// Only clear the used portion of m_headers (first _n elements)
// m_index is atomically incremented during every emplace, so it should
// indicate the number of used elements.
if(_n > 0)
{
std::memset(m_headers.data(), 0, _n * sizeof(rocprofiler_record_header_t));
}
rocprofiler_record_header_t record = {};
record.hash = 0;
record.payload = nullptr;
Expand Down
Loading
Loading