21
21
import org .apache .lucene .search .Sort ;
22
22
import org .apache .lucene .search .TotalHits ;
23
23
import org .apache .lucene .tests .util .LuceneTestCase ;
24
- import org .elasticsearch .ExceptionsHelper ;
25
24
import org .elasticsearch .action .ActionListener ;
26
25
import org .elasticsearch .action .ActionRequest ;
27
26
import org .elasticsearch .action .ActionResponse ;
101
100
import org .elasticsearch .common .unit .ByteSizeValue ;
102
101
import org .elasticsearch .common .util .Maps ;
103
102
import org .elasticsearch .common .util .MockBigArrays ;
104
- import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
105
103
import org .elasticsearch .common .util .concurrent .ThreadContext ;
106
104
import org .elasticsearch .common .xcontent .ChunkedToXContent ;
107
105
import org .elasticsearch .common .xcontent .XContentHelper ;
108
106
import org .elasticsearch .core .IOUtils ;
109
107
import org .elasticsearch .core .Nullable ;
110
108
import org .elasticsearch .core .Releasable ;
111
109
import org .elasticsearch .core .TimeValue ;
112
- import org .elasticsearch .core .Tuple ;
113
110
import org .elasticsearch .env .Environment ;
114
111
import org .elasticsearch .env .TestEnvironment ;
115
112
import org .elasticsearch .gateway .PersistedClusterStateService ;
186
183
import java .util .Random ;
187
184
import java .util .Set ;
188
185
import java .util .concurrent .Callable ;
189
- import java .util .concurrent .CopyOnWriteArrayList ;
190
186
import java .util .concurrent .CountDownLatch ;
191
187
import java .util .concurrent .ExecutionException ;
192
188
import java .util .concurrent .Executor ;
212
208
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoFailures ;
213
209
import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertNoTimeout ;
214
210
import static org .hamcrest .Matchers .empty ;
215
- import static org .hamcrest .Matchers .emptyIterable ;
216
211
import static org .hamcrest .Matchers .equalTo ;
217
212
import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
218
213
import static org .hamcrest .Matchers .is ;
@@ -1735,7 +1730,6 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
1735
1730
}
1736
1731
}
1737
1732
Collections .shuffle (builders , random ());
1738
- final CopyOnWriteArrayList <Tuple <IndexRequestBuilder , Exception >> errors = new CopyOnWriteArrayList <>();
1739
1733
List <CountDownLatch > inFlightAsyncOperations = new ArrayList <>();
1740
1734
// If you are indexing just a few documents then frequently do it one at a time. If many then frequently in bulk.
1741
1735
final String [] indicesArray = indices .toArray (new String [] {});
@@ -1744,7 +1738,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
1744
1738
logger .info ("Index [{}] docs async: [{}] bulk: [{}]" , builders .size (), true , false );
1745
1739
for (IndexRequestBuilder indexRequestBuilder : builders ) {
1746
1740
indexRequestBuilder .execute (
1747
- new PayloadLatchedActionListener <>( indexRequestBuilder , newLatch (inFlightAsyncOperations ), errors )
1741
+ new LatchedActionListener < DocWriteResponse >( newLatch (inFlightAsyncOperations )). delegateResponse (( l , e ) -> fail ( e ) )
1748
1742
);
1749
1743
postIndexAsyncActions (indicesArray , inFlightAsyncOperations , maybeFlush );
1750
1744
}
@@ -1771,19 +1765,8 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma
1771
1765
}
1772
1766
}
1773
1767
for (CountDownLatch operation : inFlightAsyncOperations ) {
1774
- operation .await ();
1775
- }
1776
- final List <Exception > actualErrors = new ArrayList <>();
1777
- for (Tuple <IndexRequestBuilder , Exception > tuple : errors ) {
1778
- Throwable t = ExceptionsHelper .unwrapCause (tuple .v2 ());
1779
- if (t instanceof EsRejectedExecutionException ) {
1780
- logger .debug ("Error indexing doc: " + t .getMessage () + ", reindexing." );
1781
- tuple .v1 ().get (); // re-index if rejected
1782
- } else {
1783
- actualErrors .add (tuple .v2 ());
1784
- }
1768
+ safeAwait (operation );
1785
1769
}
1786
- assertThat (actualErrors , emptyIterable ());
1787
1770
if (bogusIds .isEmpty () == false ) {
1788
1771
// delete the bogus types again - it might trigger merges or at least holes in the segments and enforces deleted docs!
1789
1772
for (List <String > doc : bogusIds ) {
@@ -1957,23 +1940,6 @@ protected void addError(Exception e) {}
1957
1940
1958
1941
}
1959
1942
1960
- private class PayloadLatchedActionListener <Response , T > extends LatchedActionListener <Response > {
1961
- private final CopyOnWriteArrayList <Tuple <T , Exception >> errors ;
1962
- private final T builder ;
1963
-
1964
- PayloadLatchedActionListener (T builder , CountDownLatch latch , CopyOnWriteArrayList <Tuple <T , Exception >> errors ) {
1965
- super (latch );
1966
- this .errors = errors ;
1967
- this .builder = builder ;
1968
- }
1969
-
1970
- @ Override
1971
- protected void addError (Exception e ) {
1972
- errors .add (new Tuple <>(builder , e ));
1973
- }
1974
-
1975
- }
1976
-
1977
1943
/**
1978
1944
* Clears the given scroll Ids
1979
1945
*/
0 commit comments