Skip to content

Commit d415f69

Browse files
author
Burak Serdar
committed
Deferred processing for hooks
1 parent df0c4c0 commit d415f69

File tree

2 files changed

+124
-128
lines changed

2 files changed

+124
-128
lines changed

crud/src/main/java/com/redhat/lightblue/hooks/HookManager.java

Lines changed: 97 additions & 127 deletions
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,14 @@ public class HookManager {
6161
private final HookResolver resolver;
6262
private final JsonNodeFactory factory;
6363

64-
private final List<HookDocs> queuedHooks = new ArrayList<>();
64+
private final List<QueuedHook> queuedHooks = new ArrayList<>();
6565

66-
private static final class DocHooks {
66+
private static final class HookDocInfo {
6767
private final JsonDoc pre;
6868
private final JsonDoc post;
6969
private final CRUDOperation op;
70-
private final Map<Hook, CRUDHook> hooks;
7170

72-
public DocHooks(DocCtx doc, Map<Hook, CRUDHook> hooks) {
71+
public HookDocInfo(DocCtx doc) {
7372
op = doc.getCRUDOperationPerformed();
7473
// Create a copy of the original version of the document, if non-null
7574
if (op == CRUDOperation.INSERT || op == CRUDOperation.FIND) {
@@ -92,25 +91,71 @@ public DocHooks(DocCtx doc, Map<Hook, CRUDHook> hooks) {
9291
} else {
9392
post = doc.copy();
9493
}
95-
this.hooks = hooks;
9694
}
9795
}
9896

99-
private static final class HookDocs {
100-
private final Hook hook;
101-
private final CRUDHook crudHook;
102-
private final EntityMetadata md;
103-
private final List<HookDoc> docs = new ArrayList<>();
97+
private class HookAndDocs {
98+
final Hook hook;
99+
final EntityMetadata md;
100+
final CRUDHook resolvedHook;
101+
final List<HookDocInfo> docList=new ArrayList<>();;
104102

105-
public HookDocs(Hook hook, CRUDHook crudHook, EntityMetadata md) {
106-
this.hook = hook;
107-
this.crudHook = crudHook;
108-
this.md = md;
103+
HookAndDocs(EntityMetadata md,
104+
Hook hook,
105+
CRUDHook resolvedHook) {
106+
this.md=md;
107+
this.hook=hook;
108+
this.resolvedHook=resolvedHook;
109109
}
110110

111-
@Override
112-
public String toString() {
113-
return "HookDocs [hook=" + hook + ", crudHook=" + crudHook + ", md=" + md + ", docs=" + docs + "]";
111+
void call(String who) {
112+
List<HookDoc> processedDocuments = new ArrayList<>(docList.size());
113+
if (hook.getProjection() != null) {
114+
// Project the docs
115+
Projector projector = Projector.getInstance(hook.getProjection(), md);
116+
for (HookDocInfo doc : docList) {
117+
processedDocuments.add(new HookDoc(md,
118+
project(doc.pre, projector),
119+
project(doc.post, projector),
120+
doc.op,
121+
who));
122+
}
123+
} else {
124+
for (HookDocInfo doc : docList) {
125+
processedDocuments.add(new HookDoc(md,
126+
doc.pre,
127+
doc.post,
128+
doc.op,
129+
who));
130+
}
131+
}
132+
if(!processedDocuments.isEmpty()) {
133+
try {
134+
resolvedHook.processHook(md, hook.getConfiguration(), processedDocuments);
135+
} catch (RuntimeException e) {
136+
if (e.getClass().isAnnotationPresent(StopHookProcessing.class)) {
137+
throw e;
138+
} else {
139+
LOGGER.error("Exception while processing hook of type: " + resolvedHook.getClass(), e);
140+
}
141+
}
142+
}
143+
}
144+
}
145+
146+
private class QueuedHook {
147+
final String who;
148+
final List<HookAndDocs> hooks;
149+
150+
QueuedHook(String who,List<HookAndDocs> hooks) {
151+
this.who=who;
152+
this.hooks=hooks;
153+
}
154+
155+
void call() {
156+
for(HookAndDocs hook:hooks) {
157+
hook.call(who);
158+
}
114159
}
115160
}
116161

@@ -178,143 +223,68 @@ public void queueMediatorHooks(CRUDOperationContext ctx) {
178223
* hooks throws an exception with @StopHookProcessing annotation.
179224
*/
180225
public void callQueuedHooks() {
181-
for (HookDocs hd : queuedHooks) {
182-
List<HookDoc> processedDocuments;
183-
if (hd.hook.getProjection() != null) {
184-
// Project the docs
185-
processedDocuments = new ArrayList<>(hd.docs.size());
186-
Projector projector = Projector.getInstance(hd.hook.getProjection(), hd.md);
187-
for (HookDoc doc : hd.docs) {
188-
processedDocuments.add(new HookDoc(
189-
doc.getEntityMetadata(),
190-
project(doc.getPreDoc(), projector),
191-
project(doc.getPostDoc(), projector),
192-
doc.getCRUDOperation(),
193-
doc.getWho()));
226+
for (QueuedHook q: queuedHooks) {
227+
q.call();
228+
}
229+
clear();
230+
}
231+
232+
233+
private void addDocument(List<HookAndDocs> hooks,DocCtx doc) {
234+
if(!doc.hasErrors()) {
235+
for(HookAndDocs hook:hooks) {
236+
boolean queue=false;
237+
switch(doc.getCRUDOperationPerformed()) {
238+
case INSERT: queue=hook.hook.isInsert();break;
239+
case UPDATE: queue=hook.hook.isUpdate();break;
240+
case DELETE: queue=hook.hook.isDelete();break;
241+
case FIND: queue=hook.hook.isFind();break;
194242
}
195-
} else {
196-
processedDocuments = hd.docs;
197-
}
198-
try {
199-
hd.crudHook.processHook(hd.md, hd.hook.getConfiguration(), processedDocuments);
200-
} catch (RuntimeException e) {
201-
if (e.getClass().isAnnotationPresent(StopHookProcessing.class)) {
202-
throw e;
203-
} else {
204-
LOGGER.error("Exception while processing hook of type: " + hd.crudHook.getClass(), e);
243+
if(queue) {
244+
hook.docList.add(new HookDocInfo(doc));
205245
}
206246
}
207247
}
208-
clear();
209248
}
210249

211250
private void queueHooks(CRUDOperationContext ctx, boolean mediatorHooks) {
212251
LOGGER.debug("queueHooks start mediatorHooks={}", mediatorHooks);
213252
EntityMetadata md = ctx.getEntityMetadata(ctx.getEntityName());
214253
List<Hook> mdHooks = md.getHooks().getHooks();
215254
LOGGER.debug("There are {} hooks in metadata", mdHooks.size());
216-
Map<Hook, CRUDHook> hooks = new HashMap<>();
255+
List<HookAndDocs> hookList=new ArrayList<>();
217256
for (Hook h : mdHooks) {
218257
CRUDHook crudHook = resolver.getHook(h.getName());
219258
if (crudHook == null) {
220259
throw Error.get(CrudConstants.ERR_INVALID_HOOK, h.getName());
221260
}
222261
if ((mediatorHooks && crudHook instanceof MediatorHook)
223262
|| (!mediatorHooks && !(crudHook instanceof MediatorHook))) {
224-
hooks.put(h, crudHook);
263+
hookList.add(new HookAndDocs(md,h,crudHook));
225264
}
226265
}
227-
LOGGER.debug("Hooks are resolved: {}", hooks);
228-
if (!hooks.isEmpty()) {
266+
if(!hookList.isEmpty()) {
267+
// extract the who from the context if possible
268+
String who = null;
269+
if (ctx instanceof OperationContext && ((OperationContext) ctx).getRequest() != null
270+
&& ((OperationContext) ctx).getRequest().getClientId() != null) {
271+
who = ((OperationContext) ctx).getRequest().getClientId().getPrincipal();
272+
}
273+
229274
DocumentStream<DocCtx> documents=ctx.getDocumentStream();
230275
if(documents.canRewind()) {
231-
// If this is the case, then we have access to all the documents, and
232-
// we can call hooks at once
233-
234-
// We don't want to create a separate copy of every
235-
// document for each hook. So, we share the only copy of
236-
// the document between hooks. First we create a list of
237-
// DocHooks. Each element in this list contains a
238-
// document, and all the hooks associated with that
239-
// document. This step also creates copies of the
240-
// documents. Then, we create another list, the HookDocs
241-
// list where each element gives a hook, and all the
242-
// documents that will be passed to that hook.
243-
List<DocHooks> docHooksList = new ArrayList<>();
244-
for (DocumentStream<DocCtx> stream=documents.rewind();stream.hasNext();) {
245-
DocCtx doc=stream.next();
246-
if(!doc.hasErrors()) {
247-
if (doc.getCRUDOperationPerformed() != null) {
248-
Map<Hook, CRUDHook> hooksList = null;
249-
for (Map.Entry<Hook, CRUDHook> hook : hooks.entrySet()) {
250-
boolean queue;
251-
switch (doc.getCRUDOperationPerformed()) {
252-
case INSERT:
253-
queue = hook.getKey().isInsert();
254-
break;
255-
case UPDATE:
256-
queue = hook.getKey().isUpdate();
257-
break;
258-
case DELETE:
259-
queue = hook.getKey().isDelete();
260-
break;
261-
case FIND:
262-
queue = hook.getKey().isFind();
263-
break;
264-
default:
265-
queue = false;
266-
break;
267-
}
268-
if (queue) {
269-
if (hooksList == null) {
270-
hooksList = new HashMap<>();
271-
}
272-
hooksList.put(hook.getKey(), hook.getValue());
273-
}
274-
}
275-
if (hooksList != null) {
276-
docHooksList.add(new DocHooks(doc, hooksList));
277-
}
278-
}
279-
}
276+
DocumentStream<DocCtx> stream=documents.rewind();
277+
while(stream.hasNext()) {
278+
addDocument(hookList,stream.next());
280279
}
281-
282-
LOGGER.debug("List of docs with hooks size={}", docHooksList.size());
283-
// At this point, we have the list of documents, with each
284-
// document containing its hooks. Now we process that, and
285-
// create a list of hooks, each containing the documents
286-
// it will get.
287-
Map<Hook, HookDocs> hookCache = new HashMap<>();
288-
for (DocHooks dh : docHooksList) {
289-
for (Map.Entry<Hook, CRUDHook> hook : dh.hooks.entrySet()) {
290-
HookDocs hd = hookCache.get(hook.getKey());
291-
if (hd == null) {
292-
hd = new HookDocs(hook.getKey(), hook.getValue(), md);
293-
hookCache.put(hook.getKey(), hd);
294-
}
295-
296-
// extract the who from the context if possible
297-
String who = null;
298-
if (ctx instanceof OperationContext && ((OperationContext) ctx).getRequest() != null
299-
&& ((OperationContext) ctx).getRequest().getClientId() != null) {
300-
who = ((OperationContext) ctx).getRequest().getClientId().getPrincipal();
301-
}
302-
303-
hd.docs.add(new HookDoc(
304-
hd.md,
305-
dh.pre, dh.post, dh.op, who));
306-
}
307-
}
308-
LOGGER.debug("Queueing {} hooks", hookCache.size());
309-
// Queue the hooks
310-
queuedHooks.addAll(hookCache.values());
311280
} else {
312-
// Stream is one-way, and cannot be rewound
281+
documents.tee(d->addDocument(hookList,d));
313282
}
314-
}
283+
queuedHooks.add(new QueuedHook(who,hookList));
284+
}
315285
}
316286

317-
private JsonDoc project(JsonDoc doc, Projector p) {
287+
private JsonDoc project(JsonDoc doc, Projector p) {
318288
if (doc == null) {
319289
return null;
320290
} else {

crud/src/test/java/com/redhat/lightblue/hooks/HookManagerTest.java

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import com.redhat.lightblue.crud.CRUDOperation;
5252
import com.redhat.lightblue.crud.Factory;
5353
import com.redhat.lightblue.crud.DocCtx;
54+
import com.redhat.lightblue.crud.ListDocumentStream;
5455

5556
import com.redhat.lightblue.util.test.AbstractJsonNodeTest;
5657
import com.redhat.lightblue.util.JsonDoc;
@@ -86,7 +87,7 @@ public EntityMetadata getEntityMetadata(String name) {
8687
public static abstract class AbstractHook implements CRUDHook {
8788
private final String name;
8889
EntityMetadata md;
89-
HookConfiguration cfg;
90+
HookConfiguration cfg;
9091
List<HookDoc> processed;
9192

9293
public AbstractHook(String n) {
@@ -330,6 +331,31 @@ public void crudFindQueueTest() throws Exception {
330331
Assert.assertNull(mediatorHook.md);
331332
}
332333

334+
335+
@Test
336+
public void crudFindQueueTest_deferredProcessing() throws Exception {
337+
HookManager hooks = new HookManager(resolver, nodeFactory);
338+
TestOperationContext ctx = setupContext(CRUDOperation.FIND);
339+
340+
ctx.setDocumentStream(new ListDocumentStream<DocCtx>(ctx.getInputDocuments()) {
341+
@Override
342+
public boolean canRewind() {return false;}
343+
});
344+
345+
// hook2 should be called
346+
hooks.queueHooks(ctx);
347+
// Suck docs
348+
while(ctx.getDocumentStream().hasNext())
349+
ctx.getDocumentStream().next();
350+
hooks.callQueuedHooks();
351+
352+
Assert.assertNull(hook1.md);
353+
Assert.assertEquals(ctx.md, hook2.md);
354+
Assert.assertTrue(hook2.cfg instanceof TestHook2Config);
355+
Assert.assertEquals(ctx.getInputDocuments().size(), hook2.processed.size());
356+
Assert.assertNull(mediatorHook.md);
357+
}
358+
333359
@Test
334360
public void crudMixedQueueTest() throws Exception {
335361
HookManager hooks = new HookManager(resolver, nodeFactory);

0 commit comments

Comments
 (0)