2121
2222import java .util .Collection ;
2323import java .util .concurrent .atomic .AtomicBoolean ;
24+ import java .util .concurrent .atomic .LongAdder ;
2425
2526import static org .elasticsearch .test .hamcrest .ElasticsearchAssertions .assertAcked ;
2627import static org .elasticsearch .xpack .esql .EsqlTestUtils .as ;
2728import static org .elasticsearch .xpack .esql .EsqlTestUtils .getValuesList ;
29+ import static org .hamcrest .Matchers .greaterThan ;
2830import static org .hamcrest .Matchers .hasSize ;
2931
3032public class DataNodeRequestSenderIT extends AbstractEsqlIntegTestCase {
@@ -34,6 +36,66 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
3436 return CollectionUtils .appendToCopy (super .nodePlugins (), MockTransportService .TestPlugin .class );
3537 }
3638
39+ public void testSearchWhileRelocating () throws InterruptedException {
40+ internalCluster ().ensureAtLeastNumDataNodes (3 );
41+ var primaries = randomIntBetween (1 , 10 );
42+ var replicas = randomIntBetween (0 , 1 );
43+
44+ indicesAdmin ().prepareCreate ("index-1" ).setSettings (indexSettings (primaries , replicas )).get ();
45+
46+ var docs = randomIntBetween (10 , 100 );
47+ var bulk = client ().prepareBulk ("index-1" ).setRefreshPolicy (WriteRequest .RefreshPolicy .IMMEDIATE );
48+ for (int i = 0 ; i < docs ; i ++) {
49+ bulk .add (new IndexRequest ().source ("key" , "value-1" ));
50+ }
51+ bulk .get ();
52+
53+ // start background searches
54+ var stopped = new AtomicBoolean (false );
55+ var queries = new LongAdder ();
56+ var threads = new Thread [randomIntBetween (1 , 5 )];
57+ for (int i = 0 ; i < threads .length ; i ++) {
58+ threads [i ] = new Thread (() -> {
59+ while (stopped .get () == false ) {
60+ try (EsqlQueryResponse resp = run ("FROM index-1" )) {
61+ assertThat (getValuesList (resp ), hasSize (docs ));
62+ }
63+ queries .increment ();
64+ }
65+ });
66+ }
67+ for (Thread thread : threads ) {
68+ thread .start ();
69+ }
70+
71+ // start shard movements
72+ var rounds = randomIntBetween (1 , 10 );
73+ var names = internalCluster ().getNodeNames ();
74+ for (int i = 0 ; i < rounds ; i ++) {
75+ for (String name : names ) {
76+ client ().admin ()
77+ .cluster ()
78+ .prepareUpdateSettings (TEST_REQUEST_TIMEOUT , TEST_REQUEST_TIMEOUT )
79+ .setPersistentSettings (Settings .builder ().put ("cluster.routing.allocation.exclude._name" , name ))
80+ .get ();
81+ ensureGreen ("index-1" );
82+ Thread .yield ();
83+ }
84+ }
85+
86+ stopped .set (true );
87+ for (Thread thread : threads ) {
88+ thread .join (10_000 );
89+ }
90+
91+ client ().admin ()
92+ .cluster ()
93+ .prepareUpdateSettings (TEST_REQUEST_TIMEOUT , TEST_REQUEST_TIMEOUT )
94+ .setPersistentSettings (Settings .builder ().putNull ("cluster.routing.allocation.exclude._name" ))
95+ .get ();
96+ assertThat (queries .sum (), greaterThan ((long ) threads .length ));
97+ }
98+
3799 public void testRetryOnShardMovement () {
38100 internalCluster ().ensureAtLeastNumDataNodes (2 );
39101
0 commit comments