Skip to content

Commit da62cc0

Browse files
author
Matthew Sackman
committed
merging bug 22298 into default
2 parents b390a9c + bb0dd19 commit da62cc0

File tree

4 files changed

+410
-46
lines changed

4 files changed

+410
-46
lines changed

projects/client/RabbitMQ.Client/src/client/impl/ConnectionBase.cs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ public abstract class ConnectionBase : IConnection
9898
public MainSession m_session0;
9999
public ModelBase m_model0;
100100

101-
public readonly SessionManager m_sessionManager;
101+
public SessionManager m_sessionManager;
102102

103103
public volatile bool m_running = true;
104104

@@ -126,7 +126,7 @@ public ConnectionBase(ConnectionParameters parameters,
126126
m_parameters = parameters;
127127
m_frameHandler = frameHandler;
128128

129-
m_sessionManager = new SessionManager(this);
129+
m_sessionManager = new SessionManager(this, 0);
130130
m_session0 = new MainSession(this);
131131
m_session0.Handler = new MainSession.SessionCloseDelegate(NotifyReceivedCloseOk);
132132
m_model0 = (ModelBase)Protocol.CreateModel(m_session0);
@@ -228,10 +228,6 @@ public ushort ChannelMax
228228
{
229229
return m_sessionManager.ChannelMax;
230230
}
231-
set
232-
{
233-
m_sessionManager.ChannelMax = value;
234-
}
235231
}
236232

237233
public uint FrameMax
@@ -976,7 +972,7 @@ public void Open(bool insist)
976972

977973
ushort channelMax = (ushort) NegotiatedMaxValue(m_parameters.RequestedChannelMax,
978974
connectionTune.m_channelMax);
979-
ChannelMax = channelMax;
975+
m_sessionManager = new SessionManager(this, channelMax);
980976

981977
uint frameMax = NegotiatedMaxValue(m_parameters.RequestedFrameMax,
982978
connectionTune.m_frameMax);

projects/client/RabbitMQ.Client/src/client/impl/SessionManager.cs

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -68,24 +68,15 @@ public class SessionManager
6868
{
6969
private readonly Hashtable m_sessionMap = new Hashtable();
7070
private readonly ConnectionBase m_connection;
71-
private ushort m_channelMax = 0;
71+
private readonly IntAllocator Ints;
72+
public readonly ushort ChannelMax;
7273
private bool m_autoClose = false;
7374

74-
public SessionManager(ConnectionBase connection)
75+
public SessionManager(ConnectionBase connection, ushort channelMax)
7576
{
7677
m_connection = connection;
77-
}
78-
79-
public ushort ChannelMax
80-
{
81-
get
82-
{
83-
return m_channelMax;
84-
}
85-
set
86-
{
87-
m_channelMax = value;
88-
}
78+
ChannelMax = (channelMax == 0) ? ushort.MaxValue : channelMax;
79+
Ints = new IntAllocator(1, ChannelMax);
8980
}
9081

9182
public bool AutoClose
@@ -121,30 +112,36 @@ public ISession Create()
121112
{
122113
lock (m_sessionMap)
123114
{
124-
int channelNumber = Allocate();
115+
int channelNumber = Ints.Allocate();
125116
if (channelNumber == -1)
126117
{
127118
throw new ChannelAllocationException();
128119
}
129-
return Create(channelNumber);
120+
return CreateInternal(channelNumber);
130121
}
131122
}
132123

133124
public ISession Create(int channelNumber)
134125
{
135-
ISession session;
136126
lock (m_sessionMap)
137127
{
138-
if (m_sessionMap.ContainsKey(channelNumber))
128+
if (!Ints.Reserve(channelNumber))
139129
{
140130
throw new ChannelAllocationException(channelNumber);
141131
}
142-
session = new Session(m_connection, channelNumber);
132+
return CreateInternal(channelNumber);
133+
}
134+
}
135+
136+
public ISession CreateInternal(int channelNumber)
137+
{
138+
lock(m_sessionMap)
139+
{
140+
ISession session = new Session(m_connection, channelNumber);
143141
session.SessionShutdown += new SessionShutdownEventHandler(HandleSessionShutdown);
144-
//Console.WriteLine("SessionManager adding session "+session);
145142
m_sessionMap[channelNumber] = session;
143+
return session;
146144
}
147-
return session;
148145
}
149146

150147
///<summary>Replace an active session slot with a new ISession
@@ -165,30 +162,13 @@ public ISession Swap(int channelNumber, ISession replacement) {
165162
}
166163
}
167164

168-
///<summary>Find an unused channel number. Must be called
169-
///while holding m_sessionMap lock!</summary>
170-
///<remarks>
171-
/// Returns -1 if no unused channel numbers are available.
172-
///</remarks>
173-
public int Allocate()
174-
{
175-
ushort maxChannels = (m_channelMax == 0) ? ushort.MaxValue : m_channelMax;
176-
for (int candidate = 1; candidate <= maxChannels; candidate++)
177-
{
178-
if (!m_sessionMap.ContainsKey(candidate))
179-
{
180-
return candidate;
181-
}
182-
}
183-
return -1;
184-
}
185-
186165
public void HandleSessionShutdown(ISession session, ShutdownEventArgs reason)
187166
{
188167
//Console.WriteLine("SessionManager removing session "+session);
189168
lock (m_sessionMap)
190169
{
191170
m_sessionMap.Remove(session.ChannelNumber);
171+
Ints.Free(session.ChannelNumber);
192172
CheckAutoClose();
193173
}
194174
}
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
// This source code is dual-licensed under the Apache License, version
2+
// 2.0, and the Mozilla Public License, version 1.1.
3+
//
4+
// The APL v2.0:
5+
//
6+
//---------------------------------------------------------------------------
7+
// Copyright (C) 2007-2009 LShift Ltd., Cohesive Financial
8+
// Technologies LLC., and Rabbit Technologies Ltd.
9+
//
10+
// Licensed under the Apache License, Version 2.0 (the "License");
11+
// you may not use this file except in compliance with the License.
12+
// You may obtain a copy of the License at
13+
//
14+
// http://www.Apache.Org/licenses/LICENSE-2.0
15+
//
16+
// Unless required by applicable law or agreed to in writing, software
17+
// distributed under the License is distributed on an "AS IS" BASIS,
18+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
// See the License for the specific language governing permissions and
20+
// limitations under the License.
21+
//---------------------------------------------------------------------------
22+
//
23+
// The MPL v1.1:
24+
//
25+
//---------------------------------------------------------------------------
26+
// The contents of this file are subject to the Mozilla Public License
27+
// Version 1.1 (the "License"); you may not use this file except in
28+
// compliance with the License. You may obtain a copy of the License at
29+
// http://www.Rabbitmq.Com/mpl.Html
30+
//
31+
// Software distributed under the License is distributed on an "AS IS"
32+
// basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
33+
// License for the specific language governing rights and limitations
34+
// under the License.
35+
//
36+
// The Original Code is The RabbitMQ .NET Client.
37+
//
38+
// The Initial Developers of the Original Code are LShift Ltd,
39+
// Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
40+
//
41+
// Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
42+
// Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
43+
// are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
44+
// Technologies LLC, and Rabbit Technologies Ltd.
45+
//
46+
// Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
47+
// Ltd. Portions created by Cohesive Financial Technologies LLC are
48+
// Copyright (C) 2007-2009 Cohesive Financial Technologies
49+
// LLC. Portions created by Rabbit Technologies Ltd are Copyright
50+
// (C) 2007-2009 Rabbit Technologies Ltd.
51+
//
52+
// All Rights Reserved.
53+
//
54+
// Contributor(s): ______________________________________.
55+
//
56+
//---------------------------------------------------------------------------
57+
58+
using System;
59+
using System.Collections.Generic;
60+
using System.Text;
61+
using System.Diagnostics;
62+
63+
namespace RabbitMQ.Util
64+
{
65+
66+
67+
/**
68+
* A class for allocating integer IDs in a given range.
69+
*/
70+
public class IntAllocator{
71+
72+
private IntervalList Base;
73+
74+
private readonly int[] unsorted;
75+
private int unsortedCount = 0;
76+
77+
/**
78+
* A class representing a list of inclusive intervals
79+
*/
80+
public class IntervalList{
81+
public IntervalList(int start, int end){
82+
this.Start = start;
83+
this.End = end;
84+
}
85+
86+
public int Start;
87+
public int End;
88+
89+
// Invariant: If Next != Null then Next.Start > this.End + 1
90+
public IntervalList Next;
91+
92+
// Destructively merge two IntervalLists.
93+
// Invariant: None of the Intervals in the two lists may overlap
94+
// intervals in this list.
95+
public static IntervalList Merge(IntervalList x, IntervalList y)
96+
{
97+
if(x == null) return y;
98+
if(y == null) return x;
99+
100+
if(x.End > y.Start) return Merge(y, x);
101+
102+
Debug.Assert(x.End != y.Start);
103+
104+
// We now have x, y non-null and x.End < y.Start.
105+
106+
if(y.Start == x.End + 1)
107+
{
108+
// The two intervals adjoin. Merge them into one and then
109+
// merge the tails.
110+
x.End = y.End;
111+
x.Next = Merge(x.Next, y.Next);
112+
return x;
113+
}
114+
115+
// y belongs in the tail of x.
116+
117+
x.Next = Merge(y, x.Next);
118+
return x;
119+
}
120+
121+
public static IntervalList FromArray(int[] xs, int length)
122+
{
123+
Array.Sort(xs);
124+
125+
IntervalList result = null;
126+
IntervalList current = null;
127+
128+
int i = 0;
129+
while(i < length){
130+
int start = i;
131+
while((i < length - 1) && (xs[i + 1] == xs[i] + 1))
132+
i++;
133+
134+
IntervalList interval = new IntervalList(start, i);
135+
136+
if(result == null)
137+
{
138+
result = interval;
139+
current = interval;
140+
}
141+
else
142+
{
143+
current.Next = interval;
144+
current = interval;
145+
}
146+
i++;
147+
}
148+
return result;
149+
}
150+
}
151+
152+
/**
153+
* Creates an IntAllocator allocating integer IDs within the inclusive range [start, end]
154+
*/
155+
public IntAllocator(int start, int end)
156+
{
157+
if(start > end) throw new ArgumentException("illegal range [" + start +", " + end + "]");
158+
159+
// Fairly arbitrary heuristic for a good size for the unsorted set.
160+
unsorted = new int[Math.Max(32, (int)Math.Sqrt(end - start))];
161+
Base = new IntervalList(start, end);
162+
}
163+
164+
/**
165+
* Allocate a fresh integer from the range, or return -1 if no more integers
166+
* are available. This operation is guaranteed to run in O(1)
167+
*/
168+
public int Allocate()
169+
{
170+
if(unsortedCount > 0){
171+
return unsorted[--unsortedCount];
172+
} else if (Base != null) {
173+
int result = Base.Start++;
174+
if(Base.Start == Base.End) Base = Base.Next;
175+
return result;
176+
} else {
177+
return -1;
178+
}
179+
}
180+
181+
private void Flush()
182+
{
183+
if(unsortedCount > 0)
184+
{
185+
Base = IntervalList.Merge(Base, IntervalList.FromArray(unsorted, unsortedCount));
186+
unsortedCount = 0;
187+
}
188+
}
189+
190+
191+
/**
192+
* Make the provided integer available for allocation again. This operation
193+
* runs in amortized O(sqrt(range size)) time: About every sqrt(range size)
194+
* operations will take O(range_size + number of intervals) to complete and
195+
* the rest run in constant time.
196+
*
197+
* No error checking is performed, so if you double Free or Free an integer
198+
* that was not originally Allocated the results are undefined. Sorry.
199+
*/
200+
public void Free(int id)
201+
{
202+
if(unsortedCount >= unsorted.Length)
203+
{
204+
Flush();
205+
}
206+
unsorted[unsortedCount++] = id;
207+
}
208+
209+
public bool Reserve(int id)
210+
{
211+
// We always flush before reserving because the only way to determine
212+
// if an ID is in the unsorted array is through a linear scan. This leads
213+
// us to the potentially expensive situation where there is a large unsorted
214+
// array and we reserve several IDs, incurring the cost of the scan each time.
215+
// Flushing makes sure the array is always empty and does no additional work if
216+
// reserve is called twice.
217+
Flush();
218+
219+
IntervalList current = Base;
220+
221+
while(current != null)
222+
{
223+
if(current.End < id)
224+
{
225+
current = current.Next;
226+
continue;
227+
}
228+
else if(current.Start > id)
229+
{
230+
return false;
231+
}
232+
else if(current.End == id)
233+
{
234+
current.End--;
235+
}
236+
else if(current.Start == id)
237+
{
238+
current.Start++;
239+
}
240+
else
241+
{
242+
// The ID is in the middle of this interval.
243+
// We need to split the interval into two.
244+
IntervalList rest = new IntervalList(id + 1, current.End);
245+
current.End = id - 1;
246+
rest.Next = current.Next;
247+
current.Next = rest;
248+
}
249+
return true;
250+
}
251+
return false;
252+
}
253+
254+
}
255+
}

0 commit comments

Comments
 (0)