Skip to content

Commit 5a30516

Browse files
committed
Merge branch 'feature/reindex-rethrottle'
2 parents eedd56d + 2e29305 commit 5a30516

File tree

8 files changed

+347
-1
lines changed

8 files changed

+347
-1
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Elasticsearch.Net;
4+
5+
namespace Nest
6+
{
7+
public partial interface IElasticClient
8+
{
9+
/// <summary>
10+
/// Rethrottle an existing reindex or update by query task
11+
/// </summary>
12+
IReindexRethrottleResponse Rethrottle(Func<ReindexRethrottleDescriptor, IReindexRethrottleRequest> selector);
13+
14+
/// <summary>
15+
/// Rethrottle an existing reindex or update by query task
16+
/// </summary>
17+
IReindexRethrottleResponse Rethrottle(IReindexRethrottleRequest request);
18+
19+
/// <summary>
20+
/// Rethrottle an existing reindex or update by query task
21+
/// </summary>
22+
Task<IReindexRethrottleResponse> RethrottleAsync(Func<ReindexRethrottleDescriptor, IReindexRethrottleRequest> selector);
23+
24+
/// <summary>
25+
/// Rethrottle an existing reindex or update by query task
26+
/// </summary>
27+
Task<IReindexRethrottleResponse> RethrottleAsync(IReindexRethrottleRequest request);
28+
}
29+
30+
public partial class ElasticClient
31+
{
32+
/// <summary>
33+
/// Rethrottle an existing reindex or update by query task
34+
/// </summary>
35+
public IReindexRethrottleResponse Rethrottle(Func<ReindexRethrottleDescriptor, IReindexRethrottleRequest> selector) =>
36+
this.Rethrottle(selector.InvokeOrDefault(new ReindexRethrottleDescriptor()));
37+
38+
/// <summary>
39+
/// Rethrottle an existing reindex or update by query task
40+
/// </summary>
41+
public IReindexRethrottleResponse Rethrottle(IReindexRethrottleRequest request) =>
42+
this.Dispatcher.Dispatch<IReindexRethrottleRequest, ReindexRethrottleRequestParameters, ReindexRethrottleResponse>(
43+
request,
44+
(p, d) => this.LowLevelDispatch.ReindexRethrottleDispatch<ReindexRethrottleResponse>(p)
45+
);
46+
47+
/// <summary>
48+
/// Rethrottle an existing reindex or update by query task
49+
/// </summary>
50+
public Task<IReindexRethrottleResponse> RethrottleAsync(Func<ReindexRethrottleDescriptor, IReindexRethrottleRequest> selector) =>
51+
this.RethrottleAsync(selector.InvokeOrDefault(new ReindexRethrottleDescriptor()));
52+
53+
/// <summary>
54+
/// Rethrottle an existing reindex or update by query task
55+
/// </summary>
56+
public Task<IReindexRethrottleResponse> RethrottleAsync(IReindexRethrottleRequest request) =>
57+
this.Dispatcher.DispatchAsync<IReindexRethrottleRequest, ReindexRethrottleRequestParameters, ReindexRethrottleResponse, IReindexRethrottleResponse>(
58+
request,
59+
(p, d) => this.LowLevelDispatch.ReindexRethrottleDispatchAsync<ReindexRethrottleResponse>(p)
60+
);
61+
}
62+
}
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
using System.Collections.Generic;
2+
using Newtonsoft.Json;
3+
4+
namespace Nest
5+
{
6+
public class ReindexNode
7+
{
8+
[JsonProperty("name")]
9+
public string Name { get; internal set; }
10+
11+
[JsonProperty("transport_address")]
12+
public string TransportAddress { get; internal set; }
13+
14+
[JsonProperty("host")]
15+
public string Host { get; internal set; }
16+
17+
[JsonProperty("ip")]
18+
public string Ip { get; internal set; }
19+
20+
[JsonProperty("roles")]
21+
public IEnumerable<string> Roles { get; internal set; }
22+
23+
[JsonProperty("attributes")]
24+
[JsonConverter(typeof(VerbatimDictionaryKeysJsonConverter))]
25+
public Dictionary<string, string> Attributes { get; internal set; }
26+
27+
[JsonProperty("tasks")]
28+
[JsonConverter(typeof(VerbatimDictionaryKeysJsonConverter))]
29+
public Dictionary<TaskId, ReindexTask> Tasks { get; internal set; }
30+
}
31+
32+
33+
public class ReindexTask
34+
{
35+
[JsonProperty("node")]
36+
public string Node { get; internal set; }
37+
38+
[JsonProperty("id")]
39+
public long Id { get; internal set; }
40+
41+
[JsonProperty("type")]
42+
public string Type { get; internal set; }
43+
44+
[JsonProperty("action")]
45+
public string Action { get; internal set; }
46+
47+
[JsonProperty("status")]
48+
public ReindexStatus Status { get; internal set; }
49+
50+
[JsonProperty("description")]
51+
public string Description { get; internal set; }
52+
53+
[JsonProperty("start_time_in_millis")]
54+
public long StartTimeInMilliseconds { get; internal set; }
55+
56+
[JsonProperty("running_time_in_nanos")]
57+
public long RunningTimeInNanoseconds { get; internal set; }
58+
}
59+
60+
public class ReindexStatus
61+
{
62+
[JsonProperty("total")]
63+
public long Total { get; internal set; }
64+
65+
[JsonProperty("updated")]
66+
public long Updated { get; internal set; }
67+
68+
[JsonProperty("created")]
69+
public long Created { get; internal set; }
70+
71+
[JsonProperty("deleted")]
72+
public long Deleted { get; internal set; }
73+
74+
[JsonProperty("batches")]
75+
public long Batches { get; internal set; }
76+
77+
[JsonProperty("version_conflicts")]
78+
public long VersionConflicts { get; internal set; }
79+
80+
[JsonProperty("noops")]
81+
public long Noops { get; internal set; }
82+
83+
[JsonProperty("retries")]
84+
public long Retries { get; internal set; }
85+
86+
[JsonProperty("throttled_millis")]
87+
public long ThrottledInMilliseconds { get; internal set; }
88+
89+
[JsonProperty("requests_per_second")]
90+
public Union<string,long> RequestsPerSecond { get; internal set; }
91+
92+
[JsonProperty("throttled_until_millis")]
93+
public long ThrottledUntilInMilliseconds { get; internal set; }
94+
}
95+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
using System.Collections.Generic;
2+
using Newtonsoft.Json;
3+
4+
namespace Nest
5+
{
6+
[JsonObject(MemberSerialization.OptIn)]
7+
public interface IReindexRethrottleResponse : IResponse
8+
{
9+
[JsonProperty("nodes")]
10+
[JsonConverter(typeof(VerbatimDictionaryKeysJsonConverter))]
11+
Dictionary<string, ReindexNode> Nodes { get; set; }
12+
}
13+
14+
public class ReindexRethrottleResponse : ResponseBase, IReindexRethrottleResponse
15+
{
16+
public Dictionary<string, ReindexNode> Nodes { get; set; }
17+
}
18+
}

src/Nest/Nest.csproj

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,9 @@
537537
<Compile Include="Document\Multiple\ReindexOnServer\ReindexRouting.cs" />
538538
<Compile Include="Document\Multiple\ReindexOnServer\ReindexRoutingJsonConverter.cs" />
539539
<Compile Include="Document\Multiple\ReindexOnServer\ReindexSource.cs" />
540+
<Compile Include="Document\Multiple\ReindexRethrottle\ElasticClient-ReindexRethrottle.cs" />
541+
<Compile Include="Document\Multiple\ReindexRethrottle\ReindexNode.cs" />
542+
<Compile Include="Document\Multiple\ReindexRethrottle\ReindexRethrottleResponse.cs" />
540543
<Compile Include="Document\Multiple\UpdateByQuery\UpdateByQueryRequest.cs" />
541544
<Compile Include="Document\Multiple\UpdateByQuery\UpdateByQueryResponse.cs" />
542545
<Compile Include="Document\Multiple\UpdateByQuery\ElasticClient-UpdateByQuery.cs" />
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using Elasticsearch.Net;
5+
using FluentAssertions;
6+
using Nest;
7+
using Tests.Document.Multiple.Reindex;
8+
using Tests.Document.Multiple.ReindexOnServer;
9+
using Tests.Framework;
10+
using Tests.Framework.Integration;
11+
using Tests.Framework.MockData;
12+
using Xunit;
13+
using static Nest.Infer;
14+
15+
namespace Tests.Document.Multiple.ReindexRethrottle
16+
{
17+
[Collection(TypeOfCluster.Reindex)]
18+
public class ReindexRethrottleReindexApiTests : ReindexRethrottleApiTests
19+
{
20+
public ReindexRethrottleReindexApiTests(ReindexCluster cluster, EndpointUsage usage) : base(cluster, usage)
21+
{
22+
}
23+
24+
protected override void OnBeforeCall(IElasticClient client)
25+
{
26+
var reindex = client.ReindexOnServer(r => r
27+
.Source(s => s
28+
.Index(Index<Project>())
29+
)
30+
.Destination(s => s
31+
.Index(CallIsolatedValue)
32+
.OpType(OpType.Create)
33+
)
34+
.Refresh()
35+
.RequestsPerSecond(1)
36+
.WaitForCompletion(false)
37+
);
38+
39+
reindex.IsValid.Should().BeTrue();
40+
this.ExtendedValue(TaskIdKey, reindex.Task);
41+
}
42+
}
43+
44+
[Collection(TypeOfCluster.Reindex)]
45+
public class ReindexRethrottleUpdateByQueryTests : ReindexRethrottleApiTests
46+
{
47+
public ReindexRethrottleUpdateByQueryTests(ReindexCluster cluster, EndpointUsage usage) : base(cluster, usage)
48+
{
49+
}
50+
51+
protected override void OnBeforeCall(IElasticClient client)
52+
{
53+
var reindex = client.UpdateByQuery<Project>(Nest.Indices.Index<Project>(), Types.Type<Project>(), u => u
54+
.Conflicts(Conflicts.Proceed)
55+
.Query(q => q.MatchAll())
56+
.Script(s => s.Inline("ctx._source.numberOfCommits+10"))
57+
.Refresh()
58+
.RequestsPerSecond(1)
59+
.WaitForCompletion(false)
60+
);
61+
62+
reindex.IsValid.Should().BeTrue();
63+
this.ExtendedValue(TaskIdKey, reindex.Task);
64+
}
65+
}
66+
67+
public abstract class ReindexRethrottleApiTests
68+
: ApiIntegrationTestBase<IReindexRethrottleResponse, IReindexRethrottleRequest, ReindexRethrottleDescriptor, ReindexRethrottleRequest>
69+
{
70+
protected TaskId TaskId => this.RanIntegrationSetup ? this.ExtendedValue<TaskId>(TaskIdKey) : "foo:1";
71+
72+
protected const string TaskIdKey = "taskId";
73+
74+
protected ReindexRethrottleApiTests(ReindexCluster cluster, EndpointUsage usage) : base(cluster, usage) { }
75+
76+
protected override void IntegrationSetup(IElasticClient client, CallUniqueValues values)
77+
{
78+
client.IndexMany(Project.Projects);
79+
client.Refresh(Index<Project>());
80+
}
81+
82+
protected override LazyResponses ClientUsage() => Calls(
83+
fluent: (client, f) => client.Rethrottle(f),
84+
fluentAsync: (client, f) => client.RethrottleAsync(f),
85+
request: (client, r) => client.Rethrottle(r),
86+
requestAsync: (client, r) => client.RethrottleAsync(r)
87+
);
88+
89+
protected override bool ExpectIsValid => true;
90+
protected override int ExpectStatusCode => 200;
91+
protected override HttpMethod HttpMethod => HttpMethod.POST;
92+
93+
protected override string UrlPath => $"/_reindex/{TaskId.NodeId}%3A{TaskId.TaskNumber}/_rethrottle?requests_per_second=0";
94+
95+
protected override bool SupportsDeserialization => false;
96+
97+
protected override Func<ReindexRethrottleDescriptor, IReindexRethrottleRequest> Fluent => d => d
98+
.TaskId(TaskId)
99+
.RequestsPerSecond(0);
100+
101+
protected override ReindexRethrottleRequest Initializer => new ReindexRethrottleRequest(TaskId)
102+
{
103+
RequestsPerSecond = 0,
104+
};
105+
106+
protected override void ExpectResponse(IReindexRethrottleResponse response)
107+
{
108+
response.IsValid.Should().BeTrue();
109+
110+
response.Nodes.Should().NotBeEmpty().And.HaveCount(1);
111+
var node = response.Nodes.First().Value;
112+
113+
node.Name.Should().NotBeNullOrEmpty();
114+
node.TransportAddress.Should().NotBeNullOrEmpty();
115+
node.Host.Should().NotBeNullOrEmpty();
116+
node.Ip.Should().NotBeNullOrEmpty();
117+
node.Roles.Should().NotBeEmpty();
118+
node.Attributes.Should().NotBeEmpty();
119+
120+
node.Tasks.Should().NotBeEmpty().And.HaveCount(1);
121+
122+
node.Tasks.First().Key.Should().Be(TaskId);
123+
124+
var task = node.Tasks.First().Value;
125+
126+
task.Node.Should().NotBeNullOrEmpty().And.Be(TaskId.NodeId);
127+
task.Id.Should().Be(TaskId.TaskNumber);
128+
task.Type.Should().NotBeNullOrEmpty();
129+
task.Action.Should().NotBeNullOrEmpty();
130+
131+
task.Status.RequestsPerSecond.Match(
132+
s => s.Should().Be("unlimited"),
133+
l => l.Should().Be(0)
134+
);
135+
136+
task.StartTimeInMilliseconds.Should().BeGreaterThan(0);
137+
task.RunningTimeInNanoseconds.Should().BeGreaterThan(0);
138+
}
139+
140+
protected override object ExpectJson => null;
141+
}
142+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
using System.Threading.Tasks;
2+
using Nest;
3+
using Tests.Framework;
4+
using Tests.Framework.MockData;
5+
using static Tests.Framework.UrlTester;
6+
7+
namespace Tests.Document.Multiple.ReindexRethrottle
8+
{
9+
public class ReindexRethrottleUrlTests : IUrlTests
10+
{
11+
private readonly TaskId _taskId = "rhtoNesNR4aXVIY2bRR4GQ:13056";
12+
13+
[U] public async Task Urls()
14+
{
15+
await POST($"/_reindex/rhtoNesNR4aXVIY2bRR4GQ%3A13056/_rethrottle")
16+
.Fluent(c => c.Rethrottle(f=>f.TaskId(_taskId)))
17+
.Request(c => c.Rethrottle(new ReindexRethrottleRequest(_taskId)))
18+
.FluentAsync(c => c.RethrottleAsync(f=>f.TaskId(_taskId)))
19+
.RequestAsync(c => c.RethrottleAsync(new ReindexRethrottleRequest(_taskId)))
20+
;
21+
22+
}
23+
}
24+
}

src/Tests/Framework/EndpointTests/ApiTestBase.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public abstract class ApiTestBase<TResponse, TInterface, TDescriptor, TInitializ
2828

2929
protected string CallIsolatedValue => _uniqueValues.Value;
3030
protected T ExtendedValue<T>(string key) where T : class => this._uniqueValues.ExtendedValue<T>(key);
31-
31+
protected void ExtendedValue<T>(string key, T value) where T : class => this._uniqueValues.ExtendedValue(key, value);
3232
protected virtual void IntegrationSetup(IElasticClient client, CallUniqueValues values) { }
3333
protected virtual void OnBeforeCall(IElasticClient client) { }
3434
protected virtual void OnAfterCall(IElasticClient client) { }

src/Tests/Tests.csproj

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,8 @@
210210
<Compile Include="Cat\CatShards\CatShardsUrlTests.cs" />
211211
<Compile Include="Cat\CatThreadPool\CatThreadpoolApiTests.cs" />
212212
<Compile Include="Cat\CatThreadPool\CatThreadPoolUrlTests.cs" />
213+
<Compile Include="Document\Multiple\ReindexRethrottle\ReindexRethrottleApiTests.cs" />
214+
<Compile Include="Document\Multiple\ReindexRethrottle\ReindexRethrottleUrlTests.cs" />
213215
<Compile Include="Framework\EndpointTests\ApiIntegrationTestBase.cs" />
214216
<Compile Include="Framework\EndpointTests\ApiTestBase.cs" />
215217
<Compile Include="Framework\EndpointTests\TestState\AsyncLazy.cs" />

0 commit comments

Comments
 (0)