Skip to content

Commit 51caf17

Browse files
Remove redudant children of BroadcastResponse (#104410)
A couple of children of `BroadCastResponse` are completely redundant, adding no extra fields or separate serialization. Removed them and replaced their use by the broadcast response itself.
1 parent af50962 commit 51caf17

File tree

96 files changed

+293
-655
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

96 files changed

+293
-655
lines changed

modules/data-streams/src/internalClusterTest/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.IndicesRequest;
1414
import org.elasticsearch.action.admin.cluster.settings.ClusterGetSettingsAction;
1515
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
16-
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
1716
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
1817
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
1918
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
@@ -30,6 +29,7 @@
3029
import org.elasticsearch.action.datastreams.lifecycle.ErrorEntry;
3130
import org.elasticsearch.action.datastreams.lifecycle.ExplainIndexDataStreamLifecycle;
3231
import org.elasticsearch.action.index.IndexRequest;
32+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3333
import org.elasticsearch.cluster.coordination.StableMasterHealthIndicatorService;
3434
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
3535
import org.elasticsearch.cluster.metadata.DataStream;
@@ -313,7 +313,7 @@ public void testAutomaticForceMerge() throws Exception {
313313
for (int i = 0; i < randomIntBetween(10, 50); i++) {
314314
indexDocs(dataStreamName, randomIntBetween(1, 300));
315315
// Make sure the segments get written:
316-
FlushResponse flushResponse = indicesAdmin().flush(new FlushRequest(toBeRolledOverIndex)).actionGet();
316+
BroadcastResponse flushResponse = indicesAdmin().flush(new FlushRequest(toBeRolledOverIndex)).actionGet();
317317
assertThat(flushResponse.getStatus(), equalTo(RestStatus.OK));
318318
}
319319

modules/data-streams/src/main/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
2020
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeAction;
2121
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
22-
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
2322
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction;
2423
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
2524
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
@@ -33,6 +32,7 @@
3332
import org.elasticsearch.action.downsample.DownsampleAction;
3433
import org.elasticsearch.action.downsample.DownsampleConfig;
3534
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
35+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3636
import org.elasticsearch.action.support.master.AcknowledgedResponse;
3737
import org.elasticsearch.client.internal.Client;
3838
import org.elasticsearch.cluster.ClusterChangedEvent;
@@ -1168,7 +1168,7 @@ private void forceMergeIndex(ForceMergeRequest forceMergeRequest, ActionListener
11681168
logger.info("Data stream lifecycle is issuing a request to force merge index [{}]", targetIndex);
11691169
client.admin().indices().forceMerge(forceMergeRequest, new ActionListener<>() {
11701170
@Override
1171-
public void onResponse(ForceMergeResponse forceMergeResponse) {
1171+
public void onResponse(BroadcastResponse forceMergeResponse) {
11721172
if (forceMergeResponse.getFailedShards() > 0) {
11731173
DefaultShardOperationFailedException[] failures = forceMergeResponse.getShardFailures();
11741174
String message = Strings.format(

modules/data-streams/src/test/java/org/elasticsearch/datastreams/lifecycle/DataStreamLifecycleServiceTests.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
import org.elasticsearch.action.ActionType;
1616
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
1717
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeRequest;
18-
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
1918
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
2019
import org.elasticsearch.action.admin.indices.rollover.MaxAgeCondition;
2120
import org.elasticsearch.action.admin.indices.rollover.RolloverConditions;
@@ -27,6 +26,7 @@
2726
import org.elasticsearch.action.downsample.DownsampleAction;
2827
import org.elasticsearch.action.downsample.DownsampleConfig;
2928
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
29+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
3030
import org.elasticsearch.client.internal.Client;
3131
import org.elasticsearch.cluster.ClusterName;
3232
import org.elasticsearch.cluster.ClusterState;
@@ -578,7 +578,7 @@ public void testForceMerge() throws Exception {
578578
// We want this test method to get fake force merge responses, because this is what triggers a cluster state update
579579
clientDelegate = (action, request, listener) -> {
580580
if (action.name().equals("indices:admin/forcemerge")) {
581-
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
581+
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
582582
}
583583
};
584584
String dataStreamName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
@@ -748,7 +748,7 @@ public void testForceMergeRetries() throws Exception {
748748
clientDelegate = (action, request, listener) -> {
749749
if (action.name().equals("indices:admin/forcemerge")) {
750750
listener.onResponse(
751-
new ForceMergeResponse(
751+
new BroadcastResponse(
752752
5,
753753
5,
754754
1,
@@ -779,7 +779,7 @@ public void testForceMergeRetries() throws Exception {
779779
AtomicInteger forceMergeFailedCount = new AtomicInteger(0);
780780
clientDelegate = (action, request, listener) -> {
781781
if (action.name().equals("indices:admin/forcemerge")) {
782-
listener.onResponse(new ForceMergeResponse(5, 4, 0, List.of()));
782+
listener.onResponse(new BroadcastResponse(5, 4, 0, List.of()));
783783
forceMergeFailedCount.incrementAndGet();
784784
}
785785
};
@@ -800,7 +800,7 @@ public void testForceMergeRetries() throws Exception {
800800
// For the final data stream lifecycle run, we let forcemerge run normally
801801
clientDelegate = (action, request, listener) -> {
802802
if (action.name().equals("indices:admin/forcemerge")) {
803-
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
803+
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
804804
}
805805
};
806806
dataStreamLifecycleService.run(clusterService.state());
@@ -900,7 +900,7 @@ public void testForceMergeDedup() throws Exception {
900900
setState(clusterService, state);
901901
clientDelegate = (action, request, listener) -> {
902902
if (action.name().equals("indices:admin/forcemerge")) {
903-
listener.onResponse(new ForceMergeResponse(5, 5, 0, List.of()));
903+
listener.onResponse(new BroadcastResponse(5, 5, 0, List.of()));
904904
}
905905
};
906906
for (int i = 0; i < 100; i++) {

modules/ingest-geoip/src/test/java/org/elasticsearch/ingest/geoip/GeoIpDownloaderTests.java

Lines changed: 28 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import org.elasticsearch.action.DocWriteResponse;
1818
import org.elasticsearch.action.admin.indices.flush.FlushAction;
1919
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
20-
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
2120
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
2221
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
23-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
2422
import org.elasticsearch.action.index.IndexRequest;
2523
import org.elasticsearch.action.index.IndexResponse;
2624
import org.elasticsearch.action.index.TransportIndexAction;
25+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2726
import org.elasticsearch.cluster.ClusterState;
2827
import org.elasticsearch.cluster.block.ClusterBlockException;
2928
import org.elasticsearch.cluster.block.ClusterBlocks;
@@ -178,28 +177,34 @@ public int read() throws IOException {
178177
}
179178

180179
public void testIndexChunksNoData() throws IOException {
181-
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
180+
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
182181
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
183-
flushResponseActionListener.onResponse(mock(FlushResponse.class));
184-
});
185-
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
186-
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
187-
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
182+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
188183
});
184+
client.addHandler(
185+
RefreshAction.INSTANCE,
186+
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
187+
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
188+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
189+
}
190+
);
189191

190192
InputStream empty = new ByteArrayInputStream(new byte[0]);
191193
assertEquals(0, geoIpDownloader.indexChunks("test", empty, 0, "d41d8cd98f00b204e9800998ecf8427e", 0));
192194
}
193195

194196
public void testIndexChunksMd5Mismatch() {
195-
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
196-
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
197-
flushResponseActionListener.onResponse(mock(FlushResponse.class));
198-
});
199-
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
197+
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
200198
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
201-
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
199+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
202200
});
201+
client.addHandler(
202+
RefreshAction.INSTANCE,
203+
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
204+
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
205+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
206+
}
207+
);
203208

204209
IOException exception = expectThrows(
205210
IOException.class,
@@ -232,14 +237,17 @@ public void testIndexChunks() throws IOException {
232237
assertEquals(chunk + 15, source.get("chunk"));
233238
listener.onResponse(mock(IndexResponse.class));
234239
});
235-
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<FlushResponse> flushResponseActionListener) -> {
236-
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
237-
flushResponseActionListener.onResponse(mock(FlushResponse.class));
238-
});
239-
client.addHandler(RefreshAction.INSTANCE, (RefreshRequest request, ActionListener<RefreshResponse> flushResponseActionListener) -> {
240+
client.addHandler(FlushAction.INSTANCE, (FlushRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
240241
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
241-
flushResponseActionListener.onResponse(mock(RefreshResponse.class));
242+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
242243
});
244+
client.addHandler(
245+
RefreshAction.INSTANCE,
246+
(RefreshRequest request, ActionListener<BroadcastResponse> flushResponseActionListener) -> {
247+
assertArrayEquals(new String[] { GeoIpDownloader.DATABASES_INDEX }, request.indices());
248+
flushResponseActionListener.onResponse(mock(BroadcastResponse.class));
249+
}
250+
);
243251

244252
InputStream big = new ByteArrayInputStream(bigArray);
245253
assertEquals(17, geoIpDownloader.indexChunks("test", big, 15, "a67563dfa8f3cba8b8cff61eb989a749", 0));

modules/reindex/src/main/java/org/elasticsearch/reindex/AbstractAsyncBulkByScrollAction.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import org.elasticsearch.action.DocWriteRequest;
1414
import org.elasticsearch.action.DocWriteResponse;
1515
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
16-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
1716
import org.elasticsearch.action.bulk.BackoffPolicy;
1817
import org.elasticsearch.action.bulk.BulkItemResponse;
1918
import org.elasticsearch.action.bulk.BulkItemResponse.Failure;
@@ -24,6 +23,7 @@
2423
import org.elasticsearch.action.index.IndexRequest;
2524
import org.elasticsearch.action.search.SearchRequest;
2625
import org.elasticsearch.action.support.TransportAction;
26+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2727
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
2828
import org.elasticsearch.common.unit.ByteSizeValue;
2929
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
@@ -554,9 +554,9 @@ void refreshAndFinish(List<Failure> indexingFailures, List<SearchFailure> search
554554
RefreshRequest refresh = new RefreshRequest();
555555
refresh.indices(destinationIndices.toArray(new String[destinationIndices.size()]));
556556
logger.debug("[{}]: refreshing", task.getId());
557-
bulkClient.admin().indices().refresh(refresh, new ActionListener<RefreshResponse>() {
557+
bulkClient.admin().indices().refresh(refresh, new ActionListener<>() {
558558
@Override
559-
public void onResponse(RefreshResponse response) {
559+
public void onResponse(BroadcastResponse response) {
560560
finishHim(null, indexingFailures, searchFailures, timedOut);
561561
}
562562

modules/repository-s3/src/internalClusterTest/java/org/elasticsearch/repositories/s3/S3BlobStoreRepositoryTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
import com.sun.net.httpserver.HttpHandler;
1616

1717
import org.elasticsearch.action.ActionRunnable;
18-
import org.elasticsearch.action.admin.indices.forcemerge.ForceMergeResponse;
1918
import org.elasticsearch.action.support.PlainActionFuture;
19+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2020
import org.elasticsearch.cluster.metadata.RepositoryMetadata;
2121
import org.elasticsearch.cluster.service.ClusterService;
2222
import org.elasticsearch.common.blobstore.BlobContainer;
@@ -191,7 +191,7 @@ public void testAbortRequestStats() throws Exception {
191191
waitForDocs(nbDocs, indexer);
192192
}
193193
flushAndRefresh(index);
194-
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
194+
BroadcastResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
195195
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
196196
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);
197197

@@ -234,7 +234,7 @@ public void testMetrics() throws Exception {
234234
waitForDocs(nbDocs, indexer);
235235
}
236236
flushAndRefresh(index);
237-
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
237+
BroadcastResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
238238
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
239239
assertHitCount(prepareSearch(index).setSize(0).setTrackTotalHits(true), nbDocs);
240240

qa/multi-cluster-search/src/test/java/org/elasticsearch/search/CCSDuelIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
import org.apache.lucene.search.join.ScoreMode;
1818
import org.apache.lucene.tests.util.TimeUnits;
1919
import org.apache.lucene.util.BytesRef;
20-
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
2120
import org.elasticsearch.action.search.SearchRequest;
21+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
2222
import org.elasticsearch.aggregations.pipeline.DerivativePipelineAggregationBuilder;
2323
import org.elasticsearch.client.Request;
2424
import org.elasticsearch.client.Response;
@@ -199,7 +199,7 @@ private void indexDocuments(String idPrefix) throws IOException, InterruptedExce
199199

200200
assertTrue(latch.await(30, TimeUnit.SECONDS));
201201

202-
RefreshResponse refreshResponse = refresh(INDEX_NAME);
202+
BroadcastResponse refreshResponse = refresh(INDEX_NAME);
203203
ElasticsearchAssertions.assertNoFailures(refreshResponse);
204204
}
205205

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/cache/clear/ClearIndicesCacheBlocksIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.admin.indices.cache.clear;
1010

11+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1112
import org.elasticsearch.test.ESIntegTestCase;
1213
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
1314

@@ -33,7 +34,7 @@ public void testClearIndicesCacheWithBlocks() {
3334
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE)) {
3435
try {
3536
enableIndexBlock("test", blockSetting);
36-
ClearIndicesCacheResponse clearIndicesCacheResponse = indicesAdmin().prepareClearCache("test")
37+
BroadcastResponse clearIndicesCacheResponse = indicesAdmin().prepareClearCache("test")
3738
.setFieldDataCache(true)
3839
.setQueryCache(true)
3940
.setFieldDataCache(true)

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/flush/FlushBlocksIT.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.admin.indices.flush;
1010

11+
import org.elasticsearch.action.support.broadcast.BroadcastResponse;
1112
import org.elasticsearch.test.ESIntegTestCase;
1213
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
1314

@@ -44,7 +45,7 @@ public void testFlushWithBlocks() {
4445
)) {
4546
try {
4647
enableIndexBlock("test", blockSetting);
47-
FlushResponse response = indicesAdmin().prepareFlush("test").get();
48+
BroadcastResponse response = indicesAdmin().prepareFlush("test").get();
4849
assertNoFailures(response);
4950
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
5051
} finally {

server/src/internalClusterTest/java/org/elasticsearch/action/admin/indices/forcemerge/ForceMergeBlocksIT.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
package org.elasticsearch.action.admin.indices.forcemerge;
1010

11+
import org.elasticsearch.action.support.broadcast.BaseBroadcastResponse;
1112
import org.elasticsearch.common.settings.Settings;
1213
import org.elasticsearch.test.ESIntegTestCase;
1314
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
@@ -50,7 +51,7 @@ public void testForceMergeWithBlocks() {
5051
for (String blockSetting : Arrays.asList(SETTING_BLOCKS_READ, SETTING_BLOCKS_WRITE, SETTING_READ_ONLY_ALLOW_DELETE)) {
5152
try {
5253
enableIndexBlock("test", blockSetting);
53-
ForceMergeResponse response = indicesAdmin().prepareForceMerge("test").get();
54+
BaseBroadcastResponse response = indicesAdmin().prepareForceMerge("test").get();
5455
assertNoFailures(response);
5556
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
5657
} finally {
@@ -70,7 +71,7 @@ public void testForceMergeWithBlocks() {
7071

7172
// Merging all indices is blocked when the cluster is read-only
7273
try {
73-
ForceMergeResponse response = indicesAdmin().prepareForceMerge().get();
74+
BaseBroadcastResponse response = indicesAdmin().prepareForceMerge().get();
7475
assertNoFailures(response);
7576
assertThat(response.getSuccessfulShards(), equalTo(numShards.totalNumShards));
7677

0 commit comments

Comments
 (0)