Skip to content

Commit 50fb639

Browse files
committed
Http Sync Connection calls no longer rely on Tasks so they use less resources
1 parent ff66186 commit 50fb639

File tree

7 files changed

+99
-14
lines changed

7 files changed

+99
-14
lines changed

src/Nest.Tests.Integration/ElasticsearchConfiguration.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ public static IConnectionSettings Settings(int? port = null)
1212
{
1313
var host = Test.Default.Host;
1414
if (port == null && Process.GetProcessesByName("fiddler").HasAny())
15-
host = "local.localghost.io";
15+
host = "ipv4.fiddler";
1616

1717
var uri = new UriBuilder("http", host, port.GetValueOrDefault(9200)).Uri;
1818

src/Nest.Tests.Integration/Nest.Tests.Integration.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@
143143
<DesignTimeSharedInput>True</DesignTimeSharedInput>
144144
<DependentUpon>Test.settings</DependentUpon>
145145
</Compile>
146+
<Compile Include="Warmers\IndicesWarmersTests.cs" />
147+
<Compile Include="Warmers\WarmersTests.cs" />
146148
</ItemGroup>
147149
<ItemGroup>
148150
<ProjectReference Include="..\Nest.Connection.Thrift\Nest.Connection.Thrift.csproj">

src/Nest/Domain/Connection/Connection.cs

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -163,14 +163,39 @@ private HttpWebRequest CreateWebRequest(string path, string method)
163163
protected virtual ConnectionStatus DoSynchronousRequest(HttpWebRequest request, string data = null)
164164
{
165165
var timeout = this._ConnectionSettings.Timeout;
166-
var task = this.DoAsyncRequest(request, data);
167-
task.Wait(timeout);
168-
if (task.Result == null && task.IsCanceled)
166+
if (data != null)
167+
{
168+
using (var r = request.GetRequestStream())
169+
{
170+
byte[] buffer = Encoding.UTF8.GetBytes(data);
171+
r.Write(buffer, 0, buffer.Length);
172+
}
173+
}
174+
try
175+
{
176+
using (var response = (HttpWebResponse)request.GetResponse())
177+
using (var responseStream = response.GetResponseStream())
178+
using (var streamReader = new StreamReader(responseStream))
179+
{
180+
string result = streamReader.ReadToEnd();
181+
var cs = new ConnectionStatus(result)
182+
{
183+
Request = data,
184+
RequestUrl = request.RequestUri.ToString(),
185+
RequestMethod = request.Method
186+
};
187+
return cs;
188+
}
189+
}
190+
catch (WebException webException)
191+
{
192+
return new ConnectionStatus(webException) { Request = data, RequestUrl = request.RequestUri.ToString(), RequestMethod = request.Method };
193+
}
194+
catch
169195
{
170-
var m = "Operation did not complete before the set timeout of " + timeout + "ms";
171-
return new ConnectionStatus(new TimeoutException(m));
196+
throw;
172197
}
173-
return task.Result;
198+
174199
}
175200

176201
protected virtual Task<ConnectionStatus> DoAsyncRequest(HttpWebRequest request, string data = null)
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading;
4+
using Nest;
5+
using System.Threading.Tasks;
6+
7+
namespace ProtocolLoadTest
8+
{
9+
internal class HttpManualAsyncTester : Tester, ITester
10+
{
11+
public void Run(string indexName, int port, int numMessages, int bufferSize)
12+
{
13+
var settings = this.CreateSettings(indexName, port);
14+
var client = new ElasticClient(settings);
15+
16+
Connect(client, settings);
17+
18+
HttpManualAsyncTester.GenerateAndIndex(client, indexName, numMessages, bufferSize);
19+
}
20+
protected static void GenerateAndIndex(ElasticClient client, string indexName, int numMessages, int bufferSize)
21+
{
22+
// refresh = false is default on elasticsearch's side.
23+
var bulkParms = new SimpleBulkParameters() { Refresh = false };
24+
25+
var msgGenerator = new MessageGenerator();
26+
var tasks = new List<Task>();
27+
var partitionedMessages = msgGenerator.Generate(numMessages).Partition(bufferSize);
28+
Interlocked.Exchange(ref NumSent, 0);
29+
foreach (var messages in partitionedMessages)
30+
{
31+
var t = Task.Factory.StartNew(()=> client.IndexMany(messages, indexName, bulkParms), TaskCreationOptions.LongRunning);
32+
tasks.Add(t);
33+
34+
Interlocked.Add(ref NumSent, bufferSize);
35+
if (NumSent % 10000 == 0)
36+
{
37+
Console.WriteLine("Sent {0:0,0} messages to {1}", NumSent, indexName);
38+
}
39+
}
40+
Task.WaitAll(tasks.ToArray());
41+
}
42+
}
43+
}

src/ProtocolLoadTest/Program.cs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Diagnostics;
33
using System.Threading;
44
using System.Threading.Tasks;
5+
using System.Linq;
56
using Nest;
67

78
namespace ProtocolLoadTest
@@ -21,10 +22,12 @@ class Program
2122
static void Main(string[] args)
2223
{
2324
double httpRate = RunTest<HttpTester>(HTTP_PORT);
25+
double manualAsyncHttpRate = RunTest<HttpManualAsyncTester>(HTTP_PORT);
2426
//double thriftRate = RunTest<ThriftTester>(THRIFT_PORT);
2527

2628
Console.WriteLine();
27-
Console.WriteLine("HTTP: {0:0,0}/s", httpRate);
29+
Console.WriteLine("HTTP (IndexManyAsync): {0:0,0}/s", httpRate);
30+
Console.WriteLine("HTTP (IndexMany + TaskFactory.StartNew): {0:0,0}/s", manualAsyncHttpRate);
2831
//Console.WriteLine("Thrift: {0:0,0}/s", thriftRate);
2932

3033
Console.ReadLine();
@@ -58,9 +61,12 @@ private static double RunTest<T>(int port) where T : ITester
5861

5962
private static void RecreateIndex(string suffix)
6063
{
64+
var host = "localhost";
65+
if (Process.GetProcessesByName("fiddler").Any())
66+
host = "ipv4.fiddler";
6167
string indexName = INDEX_PREFIX + suffix;
6268

63-
var connSettings = new ConnectionSettings(new Uri("http://ipv4.fiddler:9200"))
69+
var connSettings = new ConnectionSettings(new Uri("http://"+host+":9200"))
6470
.SetDefaultIndex(indexName);
6571

6672
var client = new ElasticClient(connSettings);
@@ -80,7 +86,7 @@ private static void RecreateIndex(string suffix)
8086
var indexSettings = new IndexSettings();
8187
indexSettings.NumberOfReplicas = 1;
8288
indexSettings.NumberOfShards = 5;
83-
indexSettings.Add("index.refresh_interval", "10s");
89+
indexSettings.Add("index.refresh_interval", "-1");
8490

8591
var createResponse = client.CreateIndex(indexName, indexSettings);
8692
client.MapFromAttributes<Message>();
@@ -90,7 +96,11 @@ private static void CloseIndex(string suffix)
9096
{
9197
string indexName = INDEX_PREFIX + suffix;
9298

93-
var connSettings = new ConnectionSettings(new Uri("http://ipv4.fiddler:9200"))
99+
var host = "localhost";
100+
if (Process.GetProcessesByName("fiddler").Any())
101+
host = "ipv4.fiddler";
102+
103+
var connSettings = new ConnectionSettings(new Uri("http://" + host + ":9200"))
94104
.SetDefaultIndex(indexName);
95105

96106
var client = new ElasticClient(connSettings);

src/ProtocolLoadTest/ProtocolLoadTest.csproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
<Reference Include="System.Xml" />
6565
</ItemGroup>
6666
<ItemGroup>
67+
<Compile Include="HttpManualAsyncTester.cs" />
6768
<Compile Include="HttpTester.cs" />
6869
<Compile Include="ITester.cs" />
6970
<Compile Include="MessageGenerator.cs" />

src/ProtocolLoadTest/Tester.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44
using System.Threading;
55
using System.Threading.Tasks;
66
using System.Collections.ObjectModel;
7+
using System.Diagnostics;
8+
using System.Linq;
79

810
namespace ProtocolLoadTest
911
{
1012
internal abstract class Tester
1113
{
1214
// Number of messages sent by all ThriftTester instances
13-
private static int NumSent;
15+
protected static int NumSent;
1416

1517
protected ConnectionSettings CreateSettings(string indexName, int port)
1618
{
17-
var uri = new UriBuilder("http", "ipv4.fiddler", port).Uri;
19+
var host = "localhost";
20+
if (Process.GetProcessesByName("fiddler").Any())
21+
host = "ipv4.fiddler";
22+
var uri = new UriBuilder("http", host, port).Uri;
1823
return new ConnectionSettings(uri)
1924
.SetDefaultIndex(indexName)
2025
.SetMaximumAsyncConnections(2);
@@ -40,7 +45,6 @@ protected static void GenerateAndIndex(ElasticClient client, string indexName, i
4045

4146
var msgGenerator = new MessageGenerator();
4247
var tasks = new List<Task>();
43-
4448
var partitionedMessages = msgGenerator.Generate(numMessages).Partition(bufferSize);
4549
Interlocked.Exchange(ref NumSent, 0);
4650
foreach (var messages in partitionedMessages)

0 commit comments

Comments
 (0)