Skip to content

Commit 0910c4e

Browse files
committed
rewrote thrift connection to use its own pooling so it behaves more like the http connection
1 parent 66ac200 commit 0910c4e

File tree

8 files changed

+183
-147
lines changed

8 files changed

+183
-147
lines changed

src/Nest.Connection.Thrift/Nest.Connection.Thrift.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@
1111
<RootNamespace>Nest.Connection.Thrift</RootNamespace>
1212
<AssemblyName>Nest.Connection.Thrift</AssemblyName>
1313
<TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
14-
<TargetFrameworkProfile>Client</TargetFrameworkProfile>
14+
<TargetFrameworkProfile>
15+
</TargetFrameworkProfile>
1516
<FileAlignment>512</FileAlignment>
1617
</PropertyGroup>
1718
<PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|x86' ">

src/Nest.Connection.Thrift/ThriftConnection.cs

Lines changed: 99 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -6,42 +6,36 @@
66
using Thrift.Transport;
77
using Nest;
88
using System.Threading.Tasks;
9+
using System.Collections.Concurrent;
10+
using System.Threading;
911

1012
namespace Nest.Thrift
1113
{
1214
// TODO: Cocowalla
1315
// Changed from internal to public for performance testing
1416
public class ThriftConnection : IConnection, IDisposable
15-
{
16-
private readonly Rest.Client _client;
17-
private readonly TProtocol _protocol;
18-
private readonly TTransport _transport;
17+
{
18+
private ConcurrentQueue<Rest.Client> _clients = new ConcurrentQueue<Rest.Client>();
19+
private Semaphore _resourceLock;
20+
private int _timeout;
21+
private int _poolSize;
1922
private bool _disposed;
2023

21-
/// <summary>
22-
///
23-
/// </summary>
2424
public ThriftConnection(IConnectionSettings connectionSettings)
2525
{
26-
Created = DateTime.Now;
27-
var tsocket = new TSocket(connectionSettings.Host, connectionSettings.Port);
28-
_transport = new TBufferedTransport(tsocket, 1024);
29-
_protocol = new TBinaryProtocol(_transport);
30-
_client = new Rest.Client(_protocol);
31-
}
32-
33-
/// <summary>
34-
///
35-
/// </summary>
36-
public DateTime Created { get; private set; }
37-
38-
39-
/// <summary>
40-
///
41-
/// </summary>
42-
public bool IsOpen
43-
{
44-
get { return _transport.IsOpen; }
26+
this._timeout = connectionSettings.Timeout;
27+
this._poolSize = connectionSettings.MaximumAsyncConnections;
28+
29+
this._resourceLock = new Semaphore(_poolSize, _poolSize);
30+
31+
for (var i = 0; i <= connectionSettings.MaximumAsyncConnections; i++)
32+
{
33+
var tsocket = new TSocket(connectionSettings.Host, connectionSettings.Port);
34+
var transport = new TBufferedTransport(tsocket, 1024);
35+
var protocol = new TBinaryProtocol(transport);
36+
var client = new Rest.Client(protocol);
37+
_clients.Enqueue(client);
38+
}
4539
}
4640

4741
#region IConnection Members
@@ -56,10 +50,10 @@ public Task<ConnectionStatus> Get(string path)
5650
restRequest.Headers.Add("Content-Type", "application/json");
5751
return Task.Factory.StartNew<ConnectionStatus>(() =>
5852
{
59-
var result = GetClient().execute(restRequest);
60-
return new ConnectionStatus(DecodeStr(result.Body));
53+
return this.Execute(restRequest);
6154
});
6255
}
56+
6357
public Task<ConnectionStatus> Head(string path)
6458
{
6559
var restRequest = new RestRequest();
@@ -69,9 +63,8 @@ public Task<ConnectionStatus> Head(string path)
6963
restRequest.Headers = new Dictionary<string, string>();
7064
restRequest.Headers.Add("Content-Type", "application/json");
7165
return Task.Factory.StartNew<ConnectionStatus>(()=>
72-
{
73-
var result = GetClient().execute(restRequest);
74-
return new ConnectionStatus(DecodeStr(result.Body));
66+
{
67+
return this.Execute(restRequest);
7568
});
7669
}
7770

@@ -82,9 +75,8 @@ public ConnectionStatus GetSync(string path)
8275
restRequest.Uri = path;
8376

8477
restRequest.Headers = new Dictionary<string, string>();
85-
restRequest.Headers.Add("Content-Type", "application/json");
86-
RestResponse result = GetClient().execute(restRequest);
87-
return new ConnectionStatus(DecodeStr(result.Body));
78+
restRequest.Headers.Add("Content-Type", "application/json");
79+
return this.Execute(restRequest);
8880
}
8981

9082
public ConnectionStatus HeadSync(string path)
@@ -94,9 +86,8 @@ public ConnectionStatus HeadSync(string path)
9486
restRequest.Uri = path;
9587

9688
restRequest.Headers = new Dictionary<string, string>();
97-
restRequest.Headers.Add("Content-Type", "application/json");
98-
RestResponse result = GetClient().execute(restRequest);
99-
return new ConnectionStatus(DecodeStr(result.Body));
89+
restRequest.Headers.Add("Content-Type", "application/json");
90+
return this.Execute(restRequest);
10091
}
10192

10293
public Task<ConnectionStatus> Post(string path, string data)
@@ -113,8 +104,7 @@ public Task<ConnectionStatus> Post(string path, string data)
113104
restRequest.Headers.Add("Content-Type", "application/json");
114105
return Task.Factory.StartNew<ConnectionStatus>(() =>
115106
{
116-
var result = GetClient().execute(restRequest);
117-
return new ConnectionStatus(DecodeStr(result.Body));
107+
return this.Execute(restRequest);
118108
});
119109
}
120110
public Task<ConnectionStatus> Put(string path, string data)
@@ -131,8 +121,7 @@ public Task<ConnectionStatus> Put(string path, string data)
131121
restRequest.Headers.Add("Content-Type", "application/json");
132122
return Task.Factory.StartNew<ConnectionStatus>(() =>
133123
{
134-
var result = GetClient().execute(restRequest);
135-
return new ConnectionStatus(DecodeStr(result.Body));
124+
return this.Execute(restRequest);
136125
});
137126
}
138127
public Task<ConnectionStatus> Delete(string path, string data)
@@ -149,8 +138,7 @@ public Task<ConnectionStatus> Delete(string path, string data)
149138
restRequest.Headers.Add("Content-Type", "application/json");
150139
return Task.Factory.StartNew<ConnectionStatus>(() =>
151140
{
152-
var result = GetClient().execute(restRequest);
153-
return new ConnectionStatus(DecodeStr(result.Body));
141+
return this.Execute(restRequest);
154142
});
155143
}
156144

@@ -165,9 +153,8 @@ public ConnectionStatus PostSync(string path, string data)
165153
restRequest.Body = Encoding.UTF8.GetBytes(data);
166154
}
167155
restRequest.Headers = new Dictionary<string, string>();
168-
restRequest.Headers.Add("Content-Type", "application/json");
169-
RestResponse result = GetClient().execute(restRequest);
170-
return new ConnectionStatus(DecodeStr(result.Body));
156+
restRequest.Headers.Add("Content-Type", "application/json");
157+
return this.Execute(restRequest);
171158
}
172159
public ConnectionStatus PutSync(string path, string data)
173160
{
@@ -180,9 +167,8 @@ public ConnectionStatus PutSync(string path, string data)
180167
restRequest.Body = Encoding.UTF8.GetBytes(data);
181168
}
182169
restRequest.Headers = new Dictionary<string, string>();
183-
restRequest.Headers.Add("Content-Type", "application/json");
184-
RestResponse result = GetClient().execute(restRequest);
185-
return new ConnectionStatus(DecodeStr(result.Body));
170+
restRequest.Headers.Add("Content-Type", "application/json");
171+
return this.Execute(restRequest);
186172
}
187173
public Task<ConnectionStatus> Delete(string path)
188174
{
@@ -194,8 +180,7 @@ public Task<ConnectionStatus> Delete(string path)
194180
restRequest.Headers.Add("Content-Type", "application/json");
195181
return Task.Factory.StartNew<ConnectionStatus>(() =>
196182
{
197-
var result = GetClient().execute(restRequest);
198-
return new ConnectionStatus(DecodeStr(result.Body));
183+
return this.Execute(restRequest);
199184
});
200185
}
201186

@@ -206,9 +191,8 @@ public ConnectionStatus DeleteSync(string path)
206191
restRequest.Uri = path;
207192

208193
restRequest.Headers = new Dictionary<string, string>();
209-
restRequest.Headers.Add("Content-Type", "application/json");
210-
RestResponse result = GetClient().execute(restRequest);
211-
return new ConnectionStatus(DecodeStr(result.Body));
194+
restRequest.Headers.Add("Content-Type", "application/json");
195+
return this.Execute(restRequest);
212196
}
213197
public ConnectionStatus DeleteSync(string path, string data)
214198
{
@@ -221,9 +205,8 @@ public ConnectionStatus DeleteSync(string path, string data)
221205
restRequest.Body = Encoding.UTF8.GetBytes(data);
222206
}
223207
restRequest.Headers = new Dictionary<string, string>();
224-
restRequest.Headers.Add("Content-Type", "application/json");
225-
RestResponse result = GetClient().execute(restRequest);
226-
return new ConnectionStatus(DecodeStr(result.Body));
208+
restRequest.Headers.Add("Content-Type", "application/json");
209+
return this.Execute(restRequest);
227210
}
228211
#endregion
229212

@@ -239,28 +222,6 @@ public void Dispose()
239222

240223
#endregion
241224

242-
/// <summary>
243-
///
244-
/// </summary>
245-
public void Open()
246-
{
247-
if (IsOpen)
248-
return;
249-
250-
_transport.Open();
251-
}
252-
253-
/// <summary>
254-
///
255-
/// </summary>
256-
public void Close()
257-
{
258-
if (!IsOpen)
259-
return;
260-
261-
_transport.Close();
262-
}
263-
264225
/// <summary>
265226
/// Releases unmanaged and - optionally - managed resources
266227
/// </summary>
@@ -270,9 +231,16 @@ protected virtual void Dispose(bool disposing)
270231
if (_disposed)
271232
{
272233
return;
234+
}
235+
236+
foreach (var c in this._clients)
237+
{
238+
if (c != null
239+
&& c.InputProtocol != null
240+
&& c.InputProtocol.Transport != null
241+
&& c.InputProtocol.Transport.IsOpen)
242+
c.InputProtocol.Transport.Close();
273243
}
274-
275-
Close();
276244
_disposed = true;
277245
}
278246

@@ -283,12 +251,56 @@ protected virtual void Dispose(bool disposing)
283251
~ThriftConnection()
284252
{
285253
Dispose(false);
286-
}
287-
288-
private Rest.Client GetClient()
289-
{
290-
Open();
291-
return _client;
254+
}
255+
256+
257+
258+
private ConnectionStatus Execute(RestRequest restRequest)
259+
{
260+
//RestResponse result = GetClient().execute(restRequest);
261+
//
262+
263+
if (!this._resourceLock.WaitOne(this._timeout))
264+
{
265+
var m = "Could not start the thrift operation before the timeout of " + this._timeout + "ms completed while waiting for the semaphore";
266+
return new ConnectionStatus(new TimeoutException(m));
267+
}
268+
try
269+
{
270+
Rest.Client client = null;
271+
if (!this._clients.TryDequeue(out client))
272+
{
273+
var m = string.Format("Could dequeue a thrift client from internal socket pool of size {0}", this._poolSize);
274+
var status = new ConnectionStatus(new Exception(m));
275+
return status;
276+
}
277+
try
278+
{
279+
if (!client.InputProtocol.Transport.IsOpen)
280+
client.InputProtocol.Transport.Open();
281+
282+
var result = client.execute(restRequest);
283+
return new ConnectionStatus(DecodeStr(result.Body));
284+
}
285+
catch
286+
{
287+
throw;
288+
}
289+
finally
290+
{
291+
//make sure we make the client available again.
292+
this._clients.Enqueue(client);
293+
}
294+
295+
}
296+
catch (Exception e)
297+
{
298+
return new ConnectionStatus(e);
299+
}
300+
finally
301+
{
302+
this._resourceLock.Release();
303+
}
292304
}
293305

294306
public string DecodeStr(byte[] bytes)

src/ProtocolLoadTest/HttpTester.cs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,12 @@ internal class HttpTester : Tester, ITester
99
{
1010
public void Run(string indexName, int port, int numMessages, int bufferSize)
1111
{
12-
// refresh = false is default on elasticsearch's side.
13-
var bulkParms = new SimpleBulkParameters() { Refresh = false };
14-
15-
var settings = new ConnectionSettings("localhost", port)
16-
.SetDefaultIndex(indexName);
17-
12+
var settings = this.CreateSettings(indexName, port);
1813
var client = new ElasticClient(settings);
1914

2015
Connect(client, settings);
2116

22-
GenerateAndIndex(indexName, numMessages, bufferSize, bulkParms, client);
17+
GenerateAndIndex(client, indexName, numMessages, bufferSize);
2318
}
2419
}
2520
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Collections.ObjectModel;
4+
using System.Linq;
5+
using System.Text;
6+
7+
namespace ProtocolLoadTest
8+
{
9+
public static class PartitionExtension
10+
{
11+
public static IEnumerable<IEnumerable<T>> Partition<T>(this IEnumerable<T> source, int size)
12+
{
13+
T[] array = null;
14+
int count = 0;
15+
foreach (T item in source)
16+
{
17+
if (array == null)
18+
{
19+
array = new T[size];
20+
}
21+
array[count] = item;
22+
count++;
23+
if (count == size)
24+
{
25+
yield return new ReadOnlyCollection<T>(array);
26+
array = null;
27+
count = 0;
28+
}
29+
}
30+
if (array != null)
31+
{
32+
Array.Resize(ref array, count);
33+
yield return new ReadOnlyCollection<T>(array);
34+
}
35+
}
36+
}
37+
}

0 commit comments

Comments
 (0)