Skip to content

Commit cc151ae

Browse files
committed
UBI goes distrib
Pardon, I barely understand what's going on there.
1 parent c22f9cb commit cc151ae

File tree

2 files changed

+164
-12
lines changed

2 files changed

+164
-12
lines changed

solr/core/src/java/org/apache/solr/handler/component/UBIComponent.java

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -237,24 +237,26 @@ public void process(ResponseBuilder rb) throws IOException {
237237

238238
@Override
239239
public int distributedProcess(ResponseBuilder rb) throws IOException {
240-
241240
SolrParams params = rb.req.getParams();
242241
if (!params.getBool(COMPONENT_NAME, false)) {
243242
return ResponseBuilder.STAGE_DONE;
244243
}
245244

246-
if (rb.stage != ResponseBuilder.STAGE_GET_FIELDS) {
247-
return ResponseBuilder.STAGE_DONE;
245+
if (rb.stage < ResponseBuilder.STAGE_GET_FIELDS) {
246+
return ResponseBuilder.STAGE_GET_FIELDS;
248247
}
249248

250-
doStuff(rb);
249+
if (rb.stage == ResponseBuilder.STAGE_GET_FIELDS) {
250+
doDistribStuff(rb);
251+
return ResponseBuilder.STAGE_DONE;
252+
}
251253

252254
return ResponseBuilder.STAGE_DONE;
253255
}
254256

255257
public void doStuff(ResponseBuilder rb) throws IOException {
256258

257-
// not sure why but sometimes we get it twoice... how can a response have the
259+
// not sure why but sometimes we get it tw(o)ice... how can a response have the
258260
// the same component run twice?
259261
if (rb.rsp.getValues().get("ubi") != null) {
260262
return;
@@ -270,9 +272,9 @@ public void doStuff(ResponseBuilder rb) throws IOException {
270272
ubiQuery.setApplication(params.get(APPLICATION));
271273
if (ubiQuery.getApplication() == null) {
272274
ubiQuery.setApplication(
273-
rb.isDistrib
274-
? rb.req.getCloudDescriptor().getCollectionName()
275-
: searcher.getCore().getName());
275+
rb.isDistrib
276+
? rb.req.getCloudDescriptor().getCollectionName()
277+
: searcher.getCore().getName());
276278
}
277279

278280
String queryAttributes = params.get(QUERY_ATTRIBUTES);
@@ -292,12 +294,52 @@ public void doStuff(ResponseBuilder rb) throws IOException {
292294
}
293295
}
294296
}
297+
}
298+
299+
public void doDistribStuff(ResponseBuilder rb) throws IOException {
300+
301+
// not sure why but sometimes we get it tw(o)ice... how can a response have the
302+
// the same component run twice?
303+
if (rb.rsp.getValues().get("ubi") != null) {
304+
return;
305+
}
306+
SolrParams params = rb.req.getParams();
307+
308+
SolrIndexSearcher searcher = rb.req.getSearcher();
309+
310+
String queryId = params.get(QUERY_ID);
311+
UBIQuery ubiQuery = new UBIQuery(queryId);
312+
313+
ubiQuery.setUserQuery(params.get(USER_QUERY));
314+
ubiQuery.setApplication(params.get(APPLICATION));
315+
if (ubiQuery.getApplication() == null) {
316+
ubiQuery.setApplication(
317+
rb.isDistrib
318+
? rb.req.getCloudDescriptor().getCollectionName()
319+
: searcher.getCore().getName());
320+
}
321+
322+
String queryAttributes = params.get(QUERY_ATTRIBUTES);
323+
324+
if (queryAttributes != null && queryAttributes.toString().startsWith("{")) {
325+
// Look up the original nested JSON format, typically passed in
326+
// via the JSON formatted query.
327+
@SuppressWarnings("rawtypes")
328+
Map jsonProperties = rb.req.getJSON();
329+
if (jsonProperties.containsKey("params")) {
330+
@SuppressWarnings("rawtypes")
331+
Map paramsProperties = (Map) jsonProperties.get("params");
332+
if (paramsProperties.containsKey(QUERY_ATTRIBUTES)) {
333+
@SuppressWarnings("rawtypes")
334+
Map queryAttributesAsMap = (Map) paramsProperties.get(QUERY_ATTRIBUTES);
335+
ubiQuery.setQueryAttributes(queryAttributesAsMap);
336+
}
337+
}
338+
}
295339

296-
ResultContext rc = (ResultContext) rb.rsp.getResponse();
297-
DocList docs = rc.getDocList();
298-
// DocList docs = rb.getResults().docList;
299340

300-
String docIds = extractDocIds(docs, searcher);
341+
//String docIds = extractDocIds(docs, searcher);
342+
String docIds =String.join(",", rb.resultIds.keySet().stream().map(Object::toString).toList());
301343
ubiQuery.setDocIds(docIds);
302344

303345
addUserBehaviorInsightsToResponse(ubiQuery, rb);
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.solr.handler.component;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import org.apache.commons.io.input.ReversedLinesFileReader;
21+
import org.apache.lucene.tests.util.LuceneTestCase;
22+
import org.apache.solr.client.solrj.io.SolrClientCache;
23+
import org.apache.solr.client.solrj.io.Tuple;
24+
import org.apache.solr.client.solrj.io.stream.*;
25+
import org.apache.solr.client.solrj.io.stream.expr.StreamExpression;
26+
import org.apache.solr.client.solrj.io.stream.expr.StreamExpressionParser;
27+
import org.apache.solr.client.solrj.io.stream.expr.StreamFactory;
28+
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
29+
import org.apache.solr.client.solrj.request.UpdateRequest;
30+
import org.apache.solr.client.solrj.response.QueryResponse;
31+
import org.apache.solr.cloud.AbstractDistribZkTestBase;
32+
import org.apache.solr.cloud.SolrCloudTestCase;
33+
import org.apache.solr.common.SolrInputDocument;
34+
import org.apache.solr.common.params.MapSolrParams;
35+
import org.apache.solr.common.params.SolrParams;
36+
import org.apache.solr.core.SolrCore;
37+
import org.apache.solr.embedded.JettySolrRunner;
38+
import org.apache.solr.handler.LoggingStream;
39+
import org.junit.Before;
40+
import org.junit.BeforeClass;
41+
import org.junit.Test;
42+
43+
import java.io.IOException;
44+
import java.nio.charset.StandardCharsets;
45+
import java.nio.file.Path;
46+
import java.time.Instant;
47+
import java.util.*;
48+
49+
@LuceneTestCase.SuppressCodecs({"Lucene3x", "Lucene40", "Lucene41", "Lucene42", "Lucene45"})
50+
public class UBIComponentDistrQueriesTest extends SolrCloudTestCase {
51+
52+
private static final String COLLECTIONORALIAS = "collection1";
53+
private static final int TIMEOUT = DEFAULT_TIMEOUT;
54+
private static final String id = "id";
55+
56+
private static boolean useAlias;
57+
58+
@BeforeClass
59+
public static void setupCluster() throws Exception {
60+
configureCluster(4)
61+
.addConfig(
62+
"conf", TEST_PATH().resolve("configsets").resolve("ubi-enabled").resolve("conf"))
63+
.configure();
64+
65+
String collection;
66+
useAlias = random().nextBoolean();
67+
if (useAlias) {
68+
collection = COLLECTIONORALIAS + "_collection";
69+
} else {
70+
collection = COLLECTIONORALIAS;
71+
}
72+
73+
CollectionAdminRequest.createCollection(collection, "conf", 2, 1)
74+
.process(cluster.getSolrClient());
75+
76+
cluster.waitForActiveCollection(collection, 2, 2);
77+
78+
AbstractDistribZkTestBase.waitForRecoveriesToFinish(
79+
collection, cluster.getZkStateReader(), false, true, TIMEOUT);
80+
if (useAlias) {
81+
CollectionAdminRequest.createAlias(COLLECTIONORALIAS, collection)
82+
.process(cluster.getSolrClient());
83+
}
84+
85+
// -------------------
86+
87+
CollectionAdminRequest.createCollection("ubi_queries", "_default", 1, 1)
88+
.process(cluster.getSolrClient());
89+
90+
cluster.waitForActiveCollection("ubi_queries", 1, 1);
91+
92+
AbstractDistribZkTestBase.waitForRecoveriesToFinish(
93+
"ubi_queries", cluster.getZkStateReader(), false, true, TIMEOUT);
94+
}
95+
96+
@Before
97+
public void cleanIndex() throws Exception {
98+
new UpdateRequest().deleteByQuery("*:*").commit(cluster.getSolrClient(), COLLECTIONORALIAS);
99+
}
100+
101+
@Test
102+
public void testUBIQueryStream() throws Exception {
103+
cluster.getSolrClient(COLLECTIONORALIAS).add(List.of(new SolrInputDocument("id", "1", "subject", "aa"),
104+
new SolrInputDocument("id", "two", "subject", "aa"),
105+
new SolrInputDocument("id", "3", "subject", "aa")));
106+
cluster.getSolrClient(COLLECTIONORALIAS).commit(true, true);
107+
QueryResponse queryResponse = cluster.getSolrClient(COLLECTIONORALIAS).query(new MapSolrParams(Map.of("q", "aa", "rows", "2", "ubi", "true")));
108+
System.out.println(queryResponse);
109+
}
110+
}

0 commit comments

Comments
 (0)