Skip to content

Commit 740f7f2

Browse files
authored
Make ObjectPoolAllocator from ObjectPool (#8)
* Make ObjectPoolAllocator from ObjectPool
1 parent 217ceba commit 740f7f2

File tree

10 files changed

+139
-108
lines changed

10 files changed

+139
-108
lines changed

worker/include/RTC/RTCP/CompoundPacket.hpp

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
#define MS_RTC_RTCP_COMPOUND_PACKET_HPP
33

44
#include "common.hpp"
5+
#include "Utils.hpp"
56
#include "RTC/RTCP/ReceiverReport.hpp"
67
#include "RTC/RTCP/Sdes.hpp"
78
#include "RTC/RTCP/SenderReport.hpp"
@@ -15,7 +16,14 @@ namespace RTC
1516
class CompoundPacket
1617
{
1718
public:
18-
using UniquePtr = std::unique_ptr<CompoundPacket>;
19+
struct CompoundPacketDeleter
20+
{
21+
void operator()(CompoundPacket* packet) const;
22+
};
23+
24+
using UniquePtr = std::unique_ptr<CompoundPacket, CompoundPacketDeleter>;
25+
using Allocator = Utils::ObjectPoolAllocator<CompoundPacket>;
26+
using AllocatorTraits = std::allocator_traits<Allocator>;
1927
static UniquePtr Create();
2028

2129
public:
@@ -57,11 +65,11 @@ namespace RTC
5765
private:
5866
// Use `CompoundPacket::Create()` instead
5967
CompoundPacket() = default;
60-
// Use `CompoundPacket::ReturnIntoPool()` instead
68+
// Used by CompoundPacketDeleter
6169
~CompoundPacket() = default;
6270

63-
friend struct std::default_delete<RTC::RTCP::CompoundPacket>;
64-
static void ReturnIntoPool(CompoundPacket* packet);
71+
friend struct CompoundPacketDeleter;
72+
friend AllocatorTraits;
6573

6674
private:
6775
uint8_t* header{ nullptr };
@@ -77,11 +85,18 @@ namespace RTC
7785
namespace std
7886
{
7987
template<>
80-
struct default_delete<RTC::RTCP::CompoundPacket>
88+
struct allocator_traits<RTC::RTCP::CompoundPacket::Allocator>
8189
{
82-
void operator()(RTC::RTCP::CompoundPacket* ptr) const
90+
template<typename... Args>
91+
static void construct(
92+
RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p, Args&&... args)
93+
{
94+
new (p) RTC::RTCP::CompoundPacket(forward<Args>(args)...);
95+
}
96+
97+
static void destroy(RTC::RTCP::CompoundPacket::Allocator& a, RTC::RTCP::CompoundPacket* p)
8398
{
84-
RTC::RTCP::CompoundPacket::ReturnIntoPool(ptr);
99+
p->~CompoundPacket();
85100
}
86101
};
87102
}; // namespace std

worker/include/RTC/RtpPacket.hpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@ namespace RTC
2424
public:
2525
using RtpPacketBuffer = std::array<uint8_t, MtuSize + 100>;
2626
using SharedPtr = std::shared_ptr<RtpPacket>;
27+
using Allocator = Utils::ObjectPoolAllocator<RtpPacket>;
28+
using AllocatorTraits = std::allocator_traits<Allocator>;
29+
// Memory to hold the cloned packet (with extra space for RTX encoding).
30+
using BufferAllocator = Utils::ObjectPoolAllocator<RtpPacket::RtpPacketBuffer>;
31+
using BufferAllocatorTraits = std::allocator_traits<BufferAllocator>;
2732

2833
/* Struct for RTP header. */
2934
struct Header
@@ -135,7 +140,7 @@ namespace RTC
135140

136141
static SharedPtr Parse(const uint8_t* data, size_t len);
137142

138-
private:
143+
public:
139144
RtpPacket(
140145
Header* header,
141146
HeaderExtension* headerExtension,
@@ -609,7 +614,7 @@ namespace RTC
609614
void ShiftPayload(size_t payloadOffset, size_t shift, bool expand = true);
610615

611616
private:
612-
friend SharedPtr;
617+
friend AllocatorTraits;
613618

614619
void ParseExtensions();
615620

@@ -643,5 +648,4 @@ namespace RTC
643648
RtpPacketBuffer* buffer{ nullptr };
644649
};
645650
} // namespace RTC
646-
647651
#endif

worker/include/RTC/RtpStreamSend.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ namespace RTC
2020
public:
2121
struct StorageItem
2222
{
23+
using Allocator = Utils::ObjectPoolAllocator<RtpStreamSend::StorageItem>;
24+
using AllocatorTraits = std::allocator_traits<Allocator>;
25+
2326
void Dump() const;
2427

2528
// Original packet.

worker/include/RTC/Transport.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ namespace RTC
6161
#else
6262
struct OnSendCallbackCtx
6363
{
64+
using Allocator = Utils::ObjectPoolAllocator<Transport::OnSendCallbackCtx>;
65+
using AllocatorTraits = std::allocator_traits<Allocator>;
66+
6467
RTC::TransportCongestionControlClient* tccClient;
6568
webrtc::RtpPacketSendInfo packetInfo;
6669
};

worker/include/Utils.hpp

Lines changed: 53 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -351,51 +351,82 @@ namespace Utils
351351
}
352352
};
353353

354+
// Simple implementation of object pool only for single objects
355+
// Arrays are allocated as usual
354356
template<typename T>
355-
class ObjectPool
357+
class ObjectPoolAllocator
356358
{
359+
std::shared_ptr<std::vector<T*>> pool_data;
360+
357361
public:
358-
~ObjectPool()
362+
typedef T value_type;
363+
thread_local static Utils::ObjectPoolAllocator<T> Pool;
364+
365+
ObjectPoolAllocator()
359366
{
360-
for (auto ptr : this->pool)
361-
{
362-
std::free(ptr);
363-
}
367+
pool_data = std::shared_ptr<std::vector<T*>>(
368+
new std::vector<T*>(),
369+
[](std::vector<T*>* pool)
370+
{
371+
for (auto* ptr : *pool)
372+
{
373+
std::free(ptr);
374+
}
375+
delete pool;
376+
});
364377
}
365378

366-
// Get pointer to allocated memory. This can be newly allocated memory or re-use of previously
367-
// returned object. Object is not initialized and shouldn't be considered to be in a valid state.
368-
T* Allocate()
379+
template<typename U>
380+
ObjectPoolAllocator(const ObjectPoolAllocator<U>& other)
381+
: pool_data(ObjectPoolAllocator<T>::Pool.pool_data)
369382
{
370-
if (this->pool.empty())
383+
}
384+
385+
~ObjectPoolAllocator()
386+
{
387+
}
388+
389+
T* allocate(size_t n)
390+
{
391+
if (n > 1)
392+
{
393+
return static_cast<T*>(std::malloc(sizeof(T) * n));
394+
}
395+
396+
if (this->pool_data->empty())
371397
{
372398
return static_cast<T*>(std::malloc(sizeof(T)));
373399
}
374400

375-
T* ptr = this->pool.back();
376-
this->pool.pop_back();
401+
T* ptr = this->pool_data->back();
402+
this->pool_data->pop_back();
377403

378404
return ptr;
379405
}
380406

381-
// Return allocated memory into internal pool for future use, make sure to run destructor before
382-
// returning memory, ObjectPool will only de-allocate memory on exit.
383-
void Return(T* ptr)
407+
void deallocate(T* ptr, size_t n)
384408
{
385-
if (ptr)
409+
if (!ptr)
410+
{
411+
return;
412+
}
413+
414+
if (n > 1)
386415
{
387-
#ifdef MS_MEM_POOL_FREE_ON_RETURN
388416
std::free(ptr);
417+
return;
418+
}
419+
420+
#ifdef MS_MEM_POOL_FREE_ON_RETURN
421+
std::free(ptr);
389422
#else
390-
this->pool.push_back(ptr);
423+
this->pool_data->push_back(ptr);
391424
#endif
392-
}
393425
}
394-
395-
private:
396-
std::vector<T*> pool;
397426
};
398427

428+
template<typename T>
429+
thread_local Utils::ObjectPoolAllocator<T> Utils::ObjectPoolAllocator<T>::Pool;
399430
} // namespace Utils
400431

401432
#endif

worker/src/RTC/RTCP/CompoundPacket.cpp

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -8,24 +8,13 @@ namespace RTC
88
{
99
namespace RTCP
1010
{
11-
thread_local static Utils::ObjectPool<CompoundPacket> CompoundPacketPool;
12-
1311
/* Instance methods. */
1412

1513
CompoundPacket::UniquePtr CompoundPacket::Create()
1614
{
17-
auto* packet = CompoundPacketPool.Allocate();
18-
19-
return UniquePtr(new (packet) CompoundPacket());
20-
}
21-
22-
void CompoundPacket::ReturnIntoPool(CompoundPacket* packet)
23-
{
24-
if (packet)
25-
{
26-
packet->~CompoundPacket();
27-
CompoundPacketPool.Return(packet);
28-
}
15+
auto* packet = CompoundPacket::Allocator::Pool.allocate(1);
16+
CompoundPacket::AllocatorTraits::construct(CompoundPacket::Allocator::Pool, packet);
17+
return UniquePtr(packet);
2918
}
3019

3120
void CompoundPacket::Serialize(uint8_t* data)
@@ -163,5 +152,14 @@ namespace RTC
163152

164153
this->xrPacket.AddReport(report);
165154
}
155+
156+
void CompoundPacket::CompoundPacketDeleter::operator()(CompoundPacket* packet) const
157+
{
158+
if (packet)
159+
{
160+
CompoundPacket::AllocatorTraits::destroy(CompoundPacket::Allocator::Pool, packet);
161+
CompoundPacket::Allocator::Pool.deallocate(packet, 1);
162+
}
163+
}
166164
} // namespace RTCP
167165
} // namespace RTC

worker/src/RTC/RtpPacket.cpp

Lines changed: 24 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,6 @@
99

1010
namespace RTC
1111
{
12-
thread_local static Utils::ObjectPool<RtpPacket> RtpPacketPool;
13-
// Memory to hold the cloned packet (with extra space for RTX encoding).
14-
thread_local static Utils::ObjectPool<RtpPacket::RtpPacketBuffer> RtpPacketBufferPool;
15-
1612
/* Class methods. */
1713

1814
RtpPacket::SharedPtr RtpPacket::Parse(const uint8_t* data, size_t len)
@@ -121,21 +117,8 @@ namespace RTC
121117
payloadLength + size_t{ payloadPadding },
122118
"packet's computed size does not match received size");
123119

124-
auto* packet = RtpPacketPool.Allocate();
125-
new (packet) RtpPacket(header, headerExtension, payload, payloadLength, payloadPadding, len);
126-
127-
SharedPtr shared(
128-
packet,
129-
/*Deleter*/
130-
[](RtpPacket* packet)
131-
{
132-
// Call destructor manually since memory was pre-allocated upfront.
133-
packet->~RtpPacket();
134-
// Return packet into object pool for future reuse of memory allocation.
135-
RtpPacketPool.Return(packet);
136-
});
137-
138-
return shared;
120+
return std::allocate_shared<RtpPacket, Utils::ObjectPoolAllocator<RtpPacket>>(
121+
RtpPacket::Allocator::Pool, header, headerExtension, payload, payloadLength, payloadPadding, len);
139122
}
140123

141124
/* Instance methods. */
@@ -166,7 +149,7 @@ namespace RTC
166149
if (this->buffer)
167150
{
168151
this->buffer->~array();
169-
RtpPacketBufferPool.Return(this->buffer);
152+
RtpPacket::BufferAllocator::Pool.deallocate(this->buffer, 1);
170153
this->buffer = nullptr;
171154
}
172155
}
@@ -657,8 +640,8 @@ namespace RTC
657640
{
658641
MS_TRACE();
659642

660-
auto* buffer = RtpPacketBufferPool.Allocate();
661-
new (buffer) RtpPacketBuffer();
643+
auto* buffer = RtpPacket::BufferAllocator::Pool.allocate(1);
644+
RtpPacket::BufferAllocatorTraits::construct(RtpPacket::BufferAllocator::Pool, buffer);
662645

663646
auto* ptr = const_cast<uint8_t*>(buffer->data());
664647
size_t numBytes{ 0 };
@@ -714,35 +697,29 @@ namespace RTC
714697
}
715698

716699
// Create the new RtpPacket instance and return it.
717-
auto* packet = RtpPacketPool.Allocate();
718-
new (packet) RtpPacket(
719-
newHeader, newHeaderExtension, newPayload, this->payloadLength, this->payloadPadding, this->size);
720-
721-
SharedPtr shared(
722-
packet,
723-
/*Deleter*/
724-
[](RtpPacket* packet)
725-
{
726-
// Call destructor manually since memory was pre-allocated upfront.
727-
packet->~RtpPacket();
728-
// Return packet into object pool for future reuse of memory allocation.
729-
RtpPacketPool.Return(packet);
730-
});
700+
SharedPtr shared = std::allocate_shared<RtpPacket, RtpPacket::Allocator>(
701+
RtpPacket::Allocator::Pool,
702+
newHeader,
703+
newHeaderExtension,
704+
newPayload,
705+
this->payloadLength,
706+
this->payloadPadding,
707+
this->size);
731708

732709
// Keep already set extension ids.
733-
packet->midExtensionId = this->midExtensionId;
734-
packet->ridExtensionId = this->ridExtensionId;
735-
packet->rridExtensionId = this->rridExtensionId;
736-
packet->absSendTimeExtensionId = this->absSendTimeExtensionId;
737-
packet->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId;
738-
packet->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC.
739-
packet->frameMarkingExtensionId = this->frameMarkingExtensionId;
740-
packet->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId;
741-
packet->videoOrientationExtensionId = this->videoOrientationExtensionId;
710+
shared->midExtensionId = this->midExtensionId;
711+
shared->ridExtensionId = this->ridExtensionId;
712+
shared->rridExtensionId = this->rridExtensionId;
713+
shared->absSendTimeExtensionId = this->absSendTimeExtensionId;
714+
shared->transportWideCc01ExtensionId = this->transportWideCc01ExtensionId;
715+
shared->frameMarking07ExtensionId = this->frameMarking07ExtensionId; // Remove once RFC.
716+
shared->frameMarkingExtensionId = this->frameMarkingExtensionId;
717+
shared->ssrcAudioLevelExtensionId = this->ssrcAudioLevelExtensionId;
718+
shared->videoOrientationExtensionId = this->videoOrientationExtensionId;
742719
// Clone payload descriptor handler.
743-
packet->payloadDescriptorHandler = this->payloadDescriptorHandler;
720+
shared->payloadDescriptorHandler = this->payloadDescriptorHandler;
744721
// Store allocated buffer.
745-
packet->buffer = buffer;
722+
shared->buffer = buffer;
746723

747724
return shared;
748725
}

0 commit comments

Comments
 (0)