Skip to content

Commit b3c6212

Browse files
bursauxaepugh
andauthored
SOLR-16138: Throw when streaming and all cores are down (#784)
Co-authored-by: Eric Pugh <[email protected]>
1 parent bcb9f14 commit b3c6212

File tree

3 files changed

+154
-5
lines changed

3 files changed

+154
-5
lines changed

solr/CHANGES.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ Improvements
102102

103103
* SOLR-17058: Add 'distrib.statsCache' parameter to disable distributed stats requests at query time. (Wei Wang, Mikhail Khludnev)
104104

105+
* SOLR-16138: Throw a exception when issuing a streaming expression and all cores are down instead of returning 0 documents. (Antoine Bursaux via Eric Pugh)
106+
105107
Optimizations
106108
---------------------
107109
* SOLR-17144: Close searcherExecutor thread per core after 1 minute (Pierre Salagnac, Christine Poerschke)

solr/solrj-streaming/src/java/org/apache/solr/client/solrj/io/stream/CloudSolrStream.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionValue;
5151
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
5252
import org.apache.solr.common.cloud.ClusterState;
53+
import org.apache.solr.common.cloud.Replica;
5354
import org.apache.solr.common.cloud.Slice;
5455
import org.apache.solr.common.params.ModifiableSolrParams;
5556
import org.apache.solr.common.params.SolrParams;
@@ -381,15 +382,20 @@ protected void constructStreams() throws IOException {
381382
final Stream<SolrStream> streamOfSolrStream;
382383
if (streamContext != null && streamContext.get("shards") != null) {
383384
// stream of shard url with core
384-
streamOfSolrStream =
385-
getShards(this.zkHost, this.collection, this.streamContext, mParams).stream()
386-
.map(s -> new SolrStream(s, mParams));
385+
final List<String> shards =
386+
getShards(this.zkHost, this.collection, this.streamContext, mParams);
387+
if (shards.isEmpty())
388+
throw new IOException("No shards available from ZooKeeper: " + this.zkHost);
389+
streamOfSolrStream = shards.stream().map(s -> new SolrStream(s, mParams));
387390
} else {
388391
// stream of replicas to reuse the same SolrHttpClient per baseUrl
389392
// avoids re-parsing data we already have in the replicas
393+
final List<Replica> replicas =
394+
getReplicas(this.zkHost, this.collection, this.streamContext, mParams);
395+
if (replicas.isEmpty())
396+
throw new IOException("No replicas available from ZooKeeper: " + this.zkHost);
390397
streamOfSolrStream =
391-
getReplicas(this.zkHost, this.collection, this.streamContext, mParams).stream()
392-
.map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
398+
replicas.stream().map(r -> new SolrStream(r.getBaseUrl(), mParams, r.getCoreName()));
393399
}
394400

395401
streamOfSolrStream.forEach(
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.client.solrj.io.stream;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.TimeoutException;
24+
import org.apache.solr.SolrTestCaseJ4;
25+
import org.apache.solr.client.solrj.io.Tuple;
26+
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
27+
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionNamedParameter;
28+
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
29+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
30+
import org.apache.solr.cloud.SolrCloudTestCase;
31+
import org.apache.solr.common.SolrException;
32+
import org.apache.solr.common.cloud.Replica;
33+
import org.apache.solr.common.params.CommonParams;
34+
import org.apache.solr.common.params.MultiMapSolrParams;
35+
import org.junit.BeforeClass;
36+
import org.junit.Test;
37+
38+
/** Tests behaviors of CloudSolrStream when the cluster is behaving badly. */
39+
@SolrTestCaseJ4.SuppressSSL
40+
public class BadClusterTest extends SolrCloudTestCase {
41+
42+
private static final String collection = "streams";
43+
private static final String id = "id";
44+
45+
private static final StreamFactory streamFactory =
46+
new StreamFactory().withFunctionName("search", CloudSolrStream.class);
47+
48+
private static String zkHost;
49+
50+
@BeforeClass
51+
public static void configureCluster() throws Exception {
52+
configureCluster(1)
53+
.addConfig(
54+
"conf",
55+
getFile("solrj")
56+
.toPath()
57+
.resolve("solr")
58+
.resolve("configsets")
59+
.resolve("streaming")
60+
.resolve("conf"))
61+
.configure();
62+
63+
CollectionAdminRequest.createCollection(collection, "conf", 1, 1)
64+
.process(cluster.getSolrClient());
65+
cluster.waitForActiveCollection(collection, 1, 1);
66+
67+
zkHost = cluster.getZkServer().getZkAddress();
68+
streamFactory.withCollectionZkHost(collection, zkHost);
69+
}
70+
71+
// test order is important because the cluster progressively gets worse, but it is only created
72+
// once in BeforeClass as in other tests
73+
// ordering can not be strictly enforced with JUnit annotations because of parallel executions, so
74+
// we have this aggregated test instead
75+
@Test
76+
public void testBadCluster() throws Exception {
77+
testEmptyCollection();
78+
testAllNodesDown();
79+
testClusterShutdown();
80+
}
81+
82+
private void testEmptyCollection() throws Exception {
83+
CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
84+
assertEquals(0, getTuples(stream).size());
85+
}
86+
87+
private void testAllNodesDown() throws Exception {
88+
89+
CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
90+
cluster.expireZkSession(cluster.getReplicaJetty(getReplicas().get(0)));
91+
92+
try {
93+
getTuples(stream);
94+
fail("Expected IOException");
95+
} catch (IOException ioe) {
96+
}
97+
}
98+
99+
private void testClusterShutdown() throws Exception {
100+
101+
CloudSolrStream stream = new CloudSolrStream(buildSearchExpression(), streamFactory);
102+
cluster.shutdown();
103+
104+
try {
105+
getTuples(stream);
106+
fail("Expected IOException: SolrException: TimeoutException");
107+
} catch (IOException ioe) {
108+
SolrException se = (SolrException) ioe.getCause();
109+
TimeoutException te = (TimeoutException) se.getCause();
110+
assertNotNull(te);
111+
}
112+
}
113+
114+
private StreamExpression buildSearchExpression() {
115+
StreamExpression expression = new StreamExpression("search");
116+
expression.addParameter(collection);
117+
expression.addParameter(new StreamExpressionNamedParameter(CommonParams.Q, "*:*"));
118+
expression.addParameter(new StreamExpressionNamedParameter(CommonParams.FL, id));
119+
expression.addParameter(new StreamExpressionNamedParameter(CommonParams.SORT, id + " asc"));
120+
return expression;
121+
}
122+
123+
private List<Replica> getReplicas() throws IOException {
124+
return TupleStream.getReplicas(zkHost, collection, null, new MultiMapSolrParams(Map.of()));
125+
}
126+
127+
private List<Tuple> getTuples(TupleStream tupleStream) throws IOException {
128+
tupleStream.open();
129+
List<Tuple> tuples = new ArrayList<>();
130+
for (; ; ) {
131+
Tuple t = tupleStream.read();
132+
if (t.EOF) {
133+
break;
134+
} else {
135+
tuples.add(t);
136+
}
137+
}
138+
tupleStream.close();
139+
return tuples;
140+
}
141+
}

0 commit comments

Comments
 (0)