22
22
import java .util .HashMap ;
23
23
import java .util .List ;
24
24
import java .util .ArrayList ;
25
+ import java .util .Set ;
25
26
26
27
import java .util .stream .Collectors ;
27
28
38
39
import com .redhat .lightblue .query .QueryExpression ;
39
40
import com .redhat .lightblue .query .NaryLogicalOperator ;
40
41
42
+ import com .redhat .lightblue .metadata .CompositeMetadata ;
43
+
44
+ import com .redhat .lightblue .assoc .BindQuery ;
45
+ import com .redhat .lightblue .assoc .QueryFieldInfo ;
46
+ import com .redhat .lightblue .assoc .AnalyzeQuery ;
47
+
48
+ import com .redhat .lightblue .mindex .MemDocIndex ;
49
+ import com .redhat .lightblue .mindex .GetIndexKeySpec ;
50
+ import com .redhat .lightblue .mindex .GetIndexLookupSpec ;
51
+ import com .redhat .lightblue .mindex .KeySpec ;
52
+ import com .redhat .lightblue .mindex .LookupSpec ;
53
+
54
+ import com .redhat .lightblue .eval .QueryEvaluator ;
55
+
56
+ import com .redhat .lightblue .util .Path ;
57
+ import com .redhat .lightblue .util .JsonDoc ;
58
+
41
59
/**
42
60
* There are two sides to an Assemble step: Assemble gets results from the
43
61
* source, and for each of those documents, it runs the associated queries on
@@ -48,6 +66,13 @@ public class Assemble extends Step<ResultDocument> {
48
66
49
67
private static final Logger LOGGER = LoggerFactory .getLogger (Assemble .class );
50
68
69
+ /**
70
+ * This is used for testing. This is the threshold for the number
71
+ * of slots above which we'll use the memory indexing. If we have
72
+ * slots fewer than this, then we don't use in-memory index.
73
+ */
74
+ public static int MEM_INDEX_THRESHOLD =16 ;
75
+
51
76
private final ExecutionBlock [] destinationBlocks ;
52
77
private final Source <ResultDocument > source ;
53
78
private Map <ExecutionBlock , Assemble > destinations ;
@@ -95,6 +120,9 @@ public StepResult<ResultDocument> getResults(ExecutionContext ctx) {
95
120
return StepResult .EMPTY ;
96
121
}
97
122
123
+ // Assemble results: retrieve results from associated
124
+ // execution blocks, and insert them into sourceResults
125
+ // documents
98
126
List <Future > assemblers = new ArrayList <>();
99
127
for (Map .Entry <ExecutionBlock , Assemble > destination : destinations .entrySet ()) {
100
128
AssociationQuery aq = destination .getKey ().getAssociationQueryForEdge (block );
@@ -141,7 +169,7 @@ public DocAndQ(ResultDocument doc) {
141
169
}
142
170
}
143
171
144
- private static class BatchAssembler {
172
+ private class BatchAssembler {
145
173
private List <DocAndQ > docs = new ArrayList <>();
146
174
private List <QueryExpression > queries = new ArrayList <>();
147
175
private final int batchSize ;
@@ -181,8 +209,27 @@ public void commit() {
181
209
combinedQuery = null ;
182
210
}
183
211
List <ResultDocument > destResults = dest .getResultList (combinedQuery , ctx );
212
+ int numSlots =0 ;
213
+ for (ResultDocument doc :destResults ) {
214
+ List <ChildSlot > slots =doc .getSlots ().get (aq .getReference ());
215
+ if (slots !=null )
216
+ numSlots +=slots .size ();
217
+ }
218
+ // Try to build an index from results
219
+ MemDocIndex docIndex =null ;
220
+ if (aq .getQuery ()!=null &&numSlots >MEM_INDEX_THRESHOLD ) {
221
+ KeySpec keySpec =aq .getIndexKeySpec ();
222
+ LOGGER .debug ("In-memory index key spec:{}" ,keySpec );
223
+ if (keySpec !=null ) {
224
+ // There is a key spec, meaning we can index the docs
225
+ docIndex =new MemDocIndex (keySpec );
226
+ for (ResultDocument child :destResults ) {
227
+ docIndex .add (child .getDoc ());
228
+ }
229
+ }
230
+ }
184
231
for (DocAndQ parentDocAndQ : docs ) {
185
- Searches . associateDocs (parentDocAndQ .doc , destResults , aq );
232
+ associateDocs (parentDocAndQ .doc , destResults , aq , docIndex );
186
233
}
187
234
}
188
235
docs = new ArrayList <>();
@@ -207,6 +254,121 @@ private JsonNode toJson(Step.ToJsonCb<Step> scb,Step.ToJsonCb<ExecutionBlock> bc
207
254
}
208
255
return o ;
209
256
}
257
+
258
+ /**
259
+ * Associates child documents obtained from 'aq' to all the slots in the
260
+ * parent document
261
+ */
262
+ public void associateDocs (ResultDocument parentDoc ,
263
+ List <ResultDocument > childDocs ,
264
+ AssociationQuery aq ,
265
+ MemDocIndex childIndex ) {
266
+ if (!childDocs .isEmpty ()) {
267
+ CompositeMetadata childMetadata = childDocs .get (0 ).getBlock ().getMetadata ();
268
+ List <ChildSlot > slots = parentDoc .getSlots ().get (aq .getReference ());
269
+ for (ChildSlot slot : slots ) {
270
+ BindQuery binders = parentDoc .getBindersForSlot (slot , aq );
271
+ // No binders means all child docs will be added to the parent
272
+ // aq.always==true means query is always true, so add everything to the parent
273
+ if (binders .getBindings ().isEmpty ()||(aq .getAlways ()!=null && aq .getAlways ()) ) {
274
+ associateAllDocs (parentDoc ,childDocs ,slot .getSlotFieldName ());
275
+ } else if (aq .getAlways ()==null ||!aq .getAlways ()) { // If query is not always false
276
+ if (childIndex ==null )
277
+ associateDocs (childMetadata ,parentDoc ,slot .getSlotFieldName (),binders ,childDocs ,aq .getQuery ());
278
+ else
279
+ associateDocsWithIndex (childMetadata ,parentDoc ,slot .getSlotFieldName (),binders ,childDocs ,aq ,childIndex );
280
+ }
281
+ }
282
+ }
283
+ }
284
+
285
+ private static void associateAllDocs (ResultDocument parentDoc ,List <ResultDocument > childDocs ,Path fieldName ) {
286
+ ArrayNode destNode =ensureDestNodeExists (parentDoc ,null ,fieldName );
287
+ for (ResultDocument d : childDocs ) {
288
+ destNode .add (d .getDoc ().getRoot ());
289
+ }
290
+ }
291
+
292
+ private static ArrayNode ensureDestNodeExists (ResultDocument doc ,ArrayNode destNode ,Path fieldName ) {
293
+ if (destNode == null ) {
294
+ destNode = JsonNodeFactory .instance .arrayNode ();
295
+ doc .getDoc ().modify (fieldName , destNode , true );
296
+ }
297
+ return destNode ;
298
+ }
299
+
300
+ /**
301
+ * Associate child documents with their parents. The association query is
302
+ * for the association from the child to the parent, so caller must flip it
303
+ * before sending it in if necessary. The caller also make sure parentDocs
304
+ * is a unique stream.
305
+ *
306
+ * @param parentDoc The parent document
307
+ * @param parentSlot The slot in parent docuemnt to which the results will
308
+ * be attached
309
+ * @param childDocs The child documents
310
+ * @param aq The association query from parent to child. This may not be the
311
+ * same association query between the blocks. If the child block is before
312
+ * the parent block, a new aq must be constructed for the association from
313
+ * the parent to the child
314
+ */
315
+ public static void associateDocs (CompositeMetadata childMetadata ,
316
+ ResultDocument parentDoc ,
317
+ Path destFieldName ,
318
+ BindQuery binders ,
319
+ List <ResultDocument > childDocs ,
320
+ QueryExpression query ) {
321
+ LOGGER .debug ("Associating docs" );
322
+ QueryExpression boundQuery = binders .iterate (query );
323
+ LOGGER .debug ("Association query:{}" , boundQuery );
324
+ QueryEvaluator qeval = QueryEvaluator .getInstance (boundQuery , childMetadata );
325
+ ArrayNode destNode =null ;
326
+ for (ResultDocument childDoc : childDocs ) {
327
+ if (qeval .evaluate (childDoc .getDoc ()).getResult ()) {
328
+ destNode =ensureDestNodeExists (parentDoc ,destNode ,destFieldName );
329
+ destNode .add (childDoc .getDoc ().getRoot ());
330
+ }
331
+ }
332
+ }
333
+
334
+ private void associateDocsWithIndex (CompositeMetadata childMetadata ,
335
+ ResultDocument parentDoc ,
336
+ Path destFieldName ,
337
+ BindQuery binders ,
338
+ List <ResultDocument > childDocs ,
339
+ AssociationQuery aq ,
340
+ MemDocIndex childIndex ) {
341
+ LOGGER .debug ("Associating docs using index" );
342
+ QueryExpression boundQuery = binders .iterate (aq .getQuery ());
343
+ LOGGER .debug ("Association query:{}" , boundQuery );
344
+ QueryEvaluator qeval = QueryEvaluator .getInstance (boundQuery , childMetadata );
345
+ AnalyzeQuery analyzer =new AnalyzeQuery (block .rootMd ,aq .getReference ());
346
+ analyzer .iterate (boundQuery );
347
+ List <QueryFieldInfo > qfi =analyzer .getFieldInfo ();
348
+ GetIndexLookupSpec gils =new GetIndexLookupSpec (qfi );
349
+ LookupSpec ls =gils .iterate (boundQuery );
350
+ LOGGER .debug ("Lookup spec:" +ls );
351
+ List <ResultDocument > docs =reorder (childDocs ,childIndex .find (ls ));
352
+ ArrayNode destNode =null ;
353
+ for (ResultDocument childDoc : childDocs ) {
354
+ if (qeval .evaluate (childDoc .getDoc ()).getResult ()) {
355
+ destNode =ensureDestNodeExists (parentDoc ,destNode ,destFieldName );
356
+ destNode .add (childDoc .getDoc ().getRoot ());
357
+ }
358
+ }
359
+ }
360
+
361
+ /**
362
+ * Returns the documents in foundList in the order of originalList
363
+ */
364
+ private static List <ResultDocument > reorder (List <ResultDocument > originalList ,Set <JsonDoc > foundList ) {
365
+ List <ResultDocument > ret =new ArrayList <>(foundList .size ());
366
+ for (ResultDocument d :originalList ) {
367
+ if (foundList .contains (d .getDoc ()))
368
+ ret .add (d );
369
+ }
370
+ return ret ;
371
+ }
210
372
211
373
@ Override
212
374
public JsonNode toJson () {
0 commit comments