38
38
import com .redhat .lightblue .query .QueryExpression ;
39
39
import com .redhat .lightblue .query .NaryLogicalOperator ;
40
40
41
+ import com .redhat .lightblue .metadata .CompositeMetadata ;
42
+
41
43
import com .redhat .lightblue .assoc .BindQuery ;
42
44
45
+ import com .redhat .lightblue .mindex .MemDocIndex ;
46
+ import com .redhat .lightblue .mindex .GetIndexKeySpec ;
47
+ import com .redhat .lightblue .mindex .KeySpec ;
48
+
43
49
import com .redhat .lightblue .eval .QueryEvaluator ;
44
50
51
+ import com .redhat .lightblue .util .Path ;
52
+
45
53
/**
46
54
* There are two sides to an Assemble step: Assemble gets results from the
47
55
* source, and for each of those documents, it runs the associated queries on
@@ -99,6 +107,9 @@ public StepResult<ResultDocument> getResults(ExecutionContext ctx) {
99
107
return StepResult .EMPTY ;
100
108
}
101
109
110
+ // Assemble results: retrieve results from associated
111
+ // execution blocks, and insert them into sourceResults
112
+ // documents
102
113
List <Future > assemblers = new ArrayList <>();
103
114
for (Map .Entry <ExecutionBlock , Assemble > destination : destinations .entrySet ()) {
104
115
AssociationQuery aq = destination .getKey ().getAssociationQueryForEdge (block );
@@ -185,8 +196,23 @@ public void commit() {
185
196
combinedQuery = null ;
186
197
}
187
198
List <ResultDocument > destResults = dest .getResultList (combinedQuery , ctx );
199
+ // Try to build an index from results
200
+ MemDocIndex docIndex =null ;
201
+ if (aq .getQuery ()!=null &&destResults .size ()>2 ) {
202
+ // Lets see if we can build a key spec from this query
203
+ GetIndexKeySpec giks =new GetIndexKeySpec (aq .getQueryFieldInfo ());
204
+ KeySpec keySpec =giks .iterate (aq .getQuery ());
205
+ LOGGER .debug ("In-memory index key spec:{}" ,keySpec );
206
+ if (keySpec !=null ) {
207
+ // There is a key spec, meaning we can index the docs
208
+ docIndex =new MemDocIndex (keySpec );
209
+ for (ResultDocument child :destResults ) {
210
+ docIndex .add (child .getDoc ());
211
+ }
212
+ }
213
+ }
188
214
for (DocAndQ parentDocAndQ : docs ) {
189
- associateDocs (parentDocAndQ .doc , destResults , aq );
215
+ associateDocs (parentDocAndQ .doc , destResults , aq , docIndex );
190
216
}
191
217
}
192
218
docs = new ArrayList <>();
@@ -211,20 +237,49 @@ private JsonNode toJson(Step.ToJsonCb<Step> scb,Step.ToJsonCb<ExecutionBlock> bc
211
237
}
212
238
return o ;
213
239
}
214
-
240
+
215
241
/**
216
242
* Associates child documents obtained from 'aq' to all the slots in the
217
243
* parent document
218
244
*/
219
245
public static void associateDocs (ResultDocument parentDoc ,
220
246
List <ResultDocument > childDocs ,
221
- AssociationQuery aq ) {
222
- List <ChildSlot > slots = parentDoc .getSlots ().get (aq .getReference ());
223
- for (ChildSlot slot : slots ) {
224
- associateDocs (parentDoc , slot , childDocs , aq );
247
+ AssociationQuery aq ,
248
+ MemDocIndex childIndex ) {
249
+ if (!childDocs .isEmpty ()) {
250
+ CompositeMetadata childMetadata = childDocs .get (0 ).getBlock ().getMetadata ();
251
+ List <ChildSlot > slots = parentDoc .getSlots ().get (aq .getReference ());
252
+ for (ChildSlot slot : slots ) {
253
+ BindQuery binders = parentDoc .getBindersForSlot (slot , aq );
254
+ // No binders means all child docs will be added to the parent
255
+ // aq.always==true means query is always true, so add everything to the parent
256
+ if (binders .getBindings ().isEmpty ()||(aq .getAlways ()!=null && aq .getAlways ()) ) {
257
+ associateAllDocs (parentDoc ,childDocs ,slot .getSlotFieldName ());
258
+ } else if (aq .getAlways ()==null ||!aq .getAlways ()) { // If query is not always false
259
+ if (childIndex ==null )
260
+ associateDocs (childMetadata ,parentDoc ,slot .getSlotFieldName (),binders ,childDocs ,aq .getQuery ());
261
+ else
262
+ associateDocsWithIndex (childMetadata ,parentDoc ,slot .getSlotFieldName (),binders ,childDocs ,aq ,childIndex );
263
+ }
264
+ }
265
+ }
266
+ }
267
+
268
+ private static void associateAllDocs (ResultDocument parentDoc ,List <ResultDocument > childDocs ,Path fieldName ) {
269
+ ArrayNode destNode =ensureDestNodeExists (parentDoc ,null ,fieldName );
270
+ for (ResultDocument d : childDocs ) {
271
+ destNode .add (d .getDoc ().getRoot ());
225
272
}
226
273
}
227
274
275
+ private static ArrayNode ensureDestNodeExists (ResultDocument doc ,ArrayNode destNode ,Path fieldName ) {
276
+ if (destNode == null ) {
277
+ destNode = JsonNodeFactory .instance .arrayNode ();
278
+ doc .getDoc ().modify (fieldName , destNode , true );
279
+ }
280
+ return destNode ;
281
+ }
282
+
228
283
/**
229
284
* Associate child documents with their parents. The association query is
230
285
* for the association from the child to the parent, so caller must flip it
@@ -240,41 +295,34 @@ public static void associateDocs(ResultDocument parentDoc,
240
295
* the parent block, a new aq must be constructed for the association from
241
296
* the parent to the child
242
297
*/
243
- public static void associateDocs (ResultDocument parentDoc ,
244
- ChildSlot parentSlot ,
298
+ public static void associateDocs (CompositeMetadata childMetadata ,
299
+ ResultDocument parentDoc ,
300
+ Path destFieldName ,
301
+ BindQuery binders ,
245
302
List <ResultDocument > childDocs ,
246
- AssociationQuery aq ) {
247
- if (!childDocs .isEmpty ()) {
248
- LOGGER .debug ("Associating docs" );
249
- ExecutionBlock childBlock = childDocs .get (0 ).getBlock ();
250
- ArrayNode destNode = (ArrayNode ) parentDoc .getDoc ().get (parentSlot .getSlotFieldName ());
251
- BindQuery binders = parentDoc .getBindersForSlot (parentSlot , aq );
252
- // No binders means all child docs will be added to the parent
253
- if (binders .getBindings ().isEmpty ()) {
254
- if (destNode == null ) {
255
- destNode = JsonNodeFactory .instance .arrayNode ();
256
- parentDoc .getDoc ().modify (parentSlot .getSlotFieldName (), destNode , true );
257
- }
258
- for (ResultDocument d : childDocs ) {
259
- destNode .add (d .getDoc ().getRoot ());
260
- }
261
- } else {
262
- QueryExpression boundQuery = binders .iterate (aq .getQuery ());
263
- LOGGER .debug ("Association query:{}" , boundQuery );
264
- QueryEvaluator qeval = QueryEvaluator .getInstance (boundQuery , childBlock .getMetadata ());
265
- for (ResultDocument childDoc : childDocs ) {
266
- if (qeval .evaluate (childDoc .getDoc ()).getResult ()) {
267
- if (destNode == null ) {
268
- destNode = JsonNodeFactory .instance .arrayNode ();
269
- parentDoc .getDoc ().modify (parentSlot .getSlotFieldName (), destNode , true );
270
- }
271
- destNode .add (childDoc .getDoc ().getRoot ());
272
- }
273
- }
303
+ QueryExpression query ) {
304
+ LOGGER .debug ("Associating docs" );
305
+ QueryExpression boundQuery = binders .iterate (query );
306
+ LOGGER .debug ("Association query:{}" , boundQuery );
307
+ QueryEvaluator qeval = QueryEvaluator .getInstance (boundQuery , childMetadata );
308
+ ArrayNode destNode =null ;
309
+ for (ResultDocument childDoc : childDocs ) {
310
+ if (qeval .evaluate (childDoc .getDoc ()).getResult ()) {
311
+ destNode =ensureDestNodeExists (parentDoc ,destNode ,destFieldName );
312
+ destNode .add (childDoc .getDoc ().getRoot ());
274
313
}
275
314
}
276
315
}
277
-
316
+
317
+ private static void associateDocsWithIndex (CompositeMetadata childMetadata ,
318
+ ResultDocument parentDoc ,
319
+ Path destFieldName ,
320
+ BindQuery binders ,
321
+ List <ResultDocument > childDocs ,
322
+ AssociationQuery aq ,
323
+ MemDocIndex childIndex ) {
324
+ LOGGER .debug ("Associating docs using index" );
325
+ }
278
326
279
327
@ Override
280
328
public JsonNode toJson () {
0 commit comments