22using System ;
33using System . Collections . Concurrent ;
44using System . Collections . Generic ;
5+ using System . Diagnostics ;
56using System . Drawing ;
67using System . Linq ;
78using System . Runtime . CompilerServices ;
1112namespace NetworkLibrary
1213{
1314 /*
14- * This behaves like ArrayPool<>. The butckets are TLS thx to concurrent bag. ( ThreadLocal<ThreadLocalList> m_locals )
15+ * Concurrent bag has Tls list ( ThreadLocal<ThreadLocalList> m_locals )
16+ * each bucket holds a set of weak references to byte arrays
17+ * this arrays are pooled and resuable and we preserve the peak memory usage by this.
18+ * If application calls the GC gen2 collect some of this weak references are cleared,
19+ * this way we trim the pools automatically if they are not referenced by the application.
20+ *
21+ * you can also configure the pool to auto GC collect(also does gen2) if the application is mostly idle and
22+ * we reached to some threshold on workingset memory.
1523 */
1624 public class BufferPool
1725 {
1826 public static bool ForceGCOnCleanup = true ;
27+ public static int MaxMemoryBeforeForceGc = 100000000 ;
1928 public const int MaxBufferSize = 1073741824 ;
2029 public const int MinBufferSize = 256 ;
21-
22- private static readonly ConcurrentBag < byte [ ] > [ ] bufferBuckets = new ConcurrentBag < byte [ ] > [ 32 ] ;
30+ private static readonly ConcurrentBag < WeakReference < byte [ ] > > weakReferencePool = new ConcurrentBag < WeakReference < byte [ ] > > ( ) ;
31+ private static readonly ConcurrentBag < WeakReference < byte [ ] > > [ ] bufferBuckets = new ConcurrentBag < WeakReference < byte [ ] > > [ 32 ] ;
2332 private static SortedDictionary < int , int > bucketCapacityLimits = new SortedDictionary < int , int > ( )
2433 {
2534 { 256 , 10000 } ,
@@ -47,6 +56,8 @@ public class BufferPool
4756 { 1073741824 , 0 }
4857
4958 } ;
59+ static readonly Process process = Process . GetCurrentProcess ( ) ;
60+ static ManualResetEvent autoGcHandle = new ManualResetEvent ( false ) ;
5061
5162 static BufferPool ( )
5263 {
@@ -56,59 +67,48 @@ static BufferPool()
5667 thread . Start ( ) ;
5768 }
5869
70+ /// <summary>
71+ /// Starts a task where GC.Collect() is called
72+ /// if application consumed less than %1 proccessor time and memory is above threashold
73+ /// </summary>
74+ public static void StartCollectGcOnIdle ( )
75+ {
76+ autoGcHandle . Set ( ) ;
77+ }
78+
79+ /// <summary>
80+ /// Stops a task where GC.Collect() is called
81+ /// if application consumed less than %1 proccessor time and memory is above threashold
82+ /// </summary>
83+ public static void StopCollectGcOnIdle ( )
84+ {
85+ autoGcHandle . Reset ( ) ;
86+ }
87+
5988 // creates bufferBuckets structure
6089 private static void Init ( )
6190 {
6291 //bufferBuckets = new ConcurrentDictionary<int, ConcurrentBag<byte[]>>();
6392 for ( int i = 8 ; i < 31 ; i ++ )
6493 {
65- bufferBuckets [ i ] = new ConcurrentBag < byte [ ] > ( ) ;
94+ bufferBuckets [ i ] = new ConcurrentBag < WeakReference < byte [ ] > > ( ) ;
6695 }
6796 }
6897
6998 private static void MaintainMemory ( )
7099 {
71- // Check each bucket periodically if the free capacity limit is exceeded
72- // Dump excess amount
100+ var lastTime = process . TotalProcessorTime ;
73101 while ( true )
74102 {
103+ autoGcHandle . WaitOne ( ) ;
75104 Thread . Sleep ( 10000 ) ;
76- try
77- {
78- for ( int k = 0 ; k < bufferBuckets . Length ; k ++ )
79- {
80- var size = GetBucketSize ( k ) ;
81- var bag = bufferBuckets [ k ] ;
82- if ( bag == null ) continue ;
83-
84- if ( bucketCapacityLimits [ size ] < bag . Count )
85- {
86- // check 5 times slowly to make sure buffer bucket is not hot.
87- for ( int i = 0 ; i < 5 ; i ++ )
88- {
89- Thread . Sleep ( 100 ) ;
90- if ( bucketCapacityLimits [ size ] >= bag . Count ) continue ;
91- }
92-
93- // trim slowly
94- while ( bag . Count > bucketCapacityLimits [ size ] )
95- {
96- bag . TryTake ( out var buffer ) ;
97- buffer = null ;
98- Thread . Sleep ( 1 ) ;
99-
100- }
101- Thread . Sleep ( 100 ) ;
102- if ( ForceGCOnCleanup )
103- GC . Collect ( ) ;
104- }
105- }
106- }
107- catch ( Exception e )
108- {
109- MiniLogger . Log ( MiniLogger . LogLevel . Error , "Buffer manager encountered an error: " + e . Message ) ;
110- }
105+ var currentProcTime = process . TotalProcessorTime ;
106+ var deltaT = ( lastTime - currentProcTime ) . TotalMilliseconds ;
107+ lastTime = currentProcTime ;
111108
109+ if ( deltaT < 100 && process . WorkingSet64 < MaxMemoryBeforeForceGc )
110+ GC . Collect ( ) ;
111+ process . Refresh ( ) ;
112112 }
113113
114114 }
@@ -123,18 +123,25 @@ private static void MaintainMemory()
123123 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
124124 public static byte [ ] RentBuffer ( int size )
125125 {
126+ byte [ ] buffer ;
126127 if ( MaxBufferSize < size )
127128 throw new InvalidOperationException (
128129 string . Format ( "Unable to rent buffer bigger than max buffer size: {0}" , MaxBufferSize ) ) ;
129130 if ( size <= MinBufferSize ) return new byte [ size ] ;
130131
131132 int idx = GetBucketIndex ( size ) ;
132- if ( ! bufferBuckets [ idx ] . TryTake ( out byte [ ] buffer ) )
133+
134+ while ( bufferBuckets [ idx ] . TryTake ( out WeakReference < byte [ ] > bufferRef ) )
133135 {
134- buffer = new byte [ GetBucketSize ( idx ) ] ;
136+ if ( bufferRef . TryGetTarget ( out buffer ) )
137+ {
138+ weakReferencePool . Add ( bufferRef ) ;
139+ return buffer ;
140+ }
135141 }
142+ buffer = new byte [ GetBucketSize ( idx ) ] ;
143+ return buffer ;
136144
137- return buffer ;
138145 }
139146
140147 /// <summary>
@@ -147,45 +154,17 @@ public static void ReturnBuffer(byte[] buffer)
147154 if ( buffer . Length <= MinBufferSize ) return ;
148155
149156 int idx = GetBucketIndex ( buffer . Length ) ;
150- bufferBuckets [ idx - 1 ] . Add ( buffer ) ;
151- }
152-
153- /// <summary>
154- /// Sets Bucket size
155- /// </summary>
156- /// <param name="bucketNumber"></param>
157- /// <param name="amount"></param>
158- /// <exception cref="InvalidOperationException"></exception>
159- public static void SetBucketLimit ( int bucketNumber , int amount )
160- {
161- if ( bucketNumber >= 31 || bucketNumber < 8 )
162- throw new InvalidOperationException ( "Bucket number needs to be between 8 and 31 (inclusive)" ) ;
163-
164- int size = GetBucketSize ( bucketNumber ) ;
165- bucketCapacityLimits [ size ] = amount ;
166- }
167-
168- /// <summary>
169- /// Sets Bucket size
170- /// </summary>
171- /// <param name="bucketSize"></param>
172- /// <param name="amount"></param>
173- /// <exception cref="InvalidOperationException"></exception>
174- public static void SetBucketLimitBySize ( int bucketSize , int amount )
175- {
176- if ( bucketSize > 1073741824 || bucketSize < 256 )
177- throw new InvalidOperationException ( "Bucket number needs to be between 8 and 31 (inclusive)" ) ;
178-
179- if ( bucketCapacityLimits . ContainsKey ( bucketSize ) )
180- bucketCapacityLimits [ bucketSize ] = amount ;
157+ if ( weakReferencePool . TryTake ( out var wr ) )
158+ {
159+ wr . SetTarget ( buffer ) ;
160+ bufferBuckets [ idx - 1 ] . Add ( wr ) ;
181161
182- int idx = GetBucketIndex ( bucketSize ) ;
183- int size = GetBucketSize ( idx ) ;
184- bucketCapacityLimits [ size ] = amount ;
162+ }
163+ else
164+ bufferBuckets [ idx - 1 ] . Add ( new WeakReference < byte [ ] > ( buffer ) ) ;
165+ buffer = null ;
185166 }
186167
187-
188-
189168 [ MethodImpl ( MethodImplOptions . AggressiveInlining ) ]
190169 private static int GetBucketSize ( int bucketIndex )
191170 {
0 commit comments