Skip to content

Commit 52bd012

Browse files
authored
Merge pull request #1048 from WildMeOrg/indexing-queue
Throttled Indexing queue
2 parents afae67a + c58d230 commit 52bd012

File tree

5 files changed

+147
-4
lines changed

5 files changed

+147
-4
lines changed
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
package org.ecocean;
2+
3+
4+
import java.util.ArrayList;
5+
import java.util.Collections;
6+
import java.util.List;
7+
import java.util.Properties;
8+
import java.util.concurrent.ExecutorService;
9+
import java.util.concurrent.Executors;
10+
import org.ecocean.ShepherdProperties;
11+
12+
13+
14+
public class IndexingManager {
15+
16+
//The ExecutorService executes indexing jobs
17+
private final ExecutorService executor;
18+
19+
//The indexingQueue is a List of Strings that represent the UUIDs of Base class-implementing objects
20+
//(Encounter, MarkedIndividual, Annotation, etc.) that need to be indexed or unindexed.
21+
//The queue ensures that overzealous calls from the WildbookLifecycleListener do not cause
22+
//unnecessary, duplicate indexing jobs. The UUIDs of the objects being ndexed are removed
23+
//from the queue once completed.
24+
private List<String> indexingQueue = Collections.synchronizedList(new ArrayList<String>());
25+
26+
public IndexingManager() {
27+
28+
int numAllowedThreads = 4;
29+
Properties props = ShepherdProperties.getProperties("OpenSearch.properties", "", "context0");
30+
if(props!=null) {
31+
String indexingNumAllowedThreads = props.getProperty("indexingNumAllowedThreads");
32+
if(indexingNumAllowedThreads!=null) {
33+
Integer allowThreads = Integer.getInteger(indexingNumAllowedThreads);
34+
if(allowThreads!=null)numAllowedThreads = allowThreads.intValue();
35+
}
36+
}
37+
executor = Executors.newFixedThreadPool(numAllowedThreads);
38+
39+
}
40+
41+
//Returns the indexing queue List of Strings
42+
public List<String> getIndexingQueue() { return indexingQueue; }
43+
44+
/*
45+
* Adds a Base object to the queue for indexing or unindexing
46+
* @Base base The Base-class implementing object to be indexed or unindexed
47+
* @boolean unindex Whether the object is to be indexed or unindexed.
48+
*/
49+
public void addIndexingQueueEntry(Base base, boolean unindex) {
50+
String objectID = base.getId();
51+
Class myClass = base.getClass();
52+
if(!indexingQueue.contains(objectID)) {
53+
indexingQueue.add(objectID);
54+
55+
//IMPORTANT - no persistent objects, such as the passed in Base can be referenced inside this method
56+
Runnable rn = new Runnable() {
57+
public void run() {
58+
Shepherd bgShepherd = new Shepherd("context0");
59+
bgShepherd.setAction("IndexingManager_" + objectID);
60+
bgShepherd.beginDBTransaction();
61+
try {
62+
Base base = (Base)bgShepherd.getPM().getObjectById(myClass, objectID);
63+
if(unindex) {base.opensearchUnindexDeep();}
64+
else{base.opensearchIndexDeep();}
65+
66+
}
67+
catch (Exception e) {
68+
e.printStackTrace();
69+
}
70+
finally {
71+
bgShepherd.rollbackAndClose();
72+
}
73+
74+
//remove from indexing queue
75+
if(indexingQueue.contains(objectID))indexingQueue.remove(objectID);
76+
}
77+
};
78+
79+
executor.execute(rn);
80+
81+
}
82+
83+
}
84+
85+
//Removes an oject's UUID from the queue
86+
public void removeIndexingQueueEntry(String objectID) {
87+
if (indexingQueue.contains(objectID)) {
88+
indexingQueue.remove(objectID);
89+
}
90+
}
91+
92+
//Resets the indexing queue
93+
public void resetIndexingQueuehWithInitialCapacity(int initialCapacity) {
94+
indexingQueue = null;
95+
indexingQueue = Collections.synchronizedList(new ArrayList<String>());
96+
}
97+
98+
public void shutdown() {
99+
if(executor!=null)executor.shutdown();
100+
}
101+
102+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package org.ecocean;
2+
3+
/*
4+
* IndexingManagerFactory ensures that there is a Singleton IndexingManager that handles
5+
* and throttles indexing and unindexing OpenSearch activity as initiated by
6+
* WildbookLifeCycleListener, which follows the DataNucleus object lifecycle.
7+
*/
8+
public class IndexingManagerFactory {
9+
10+
//The sole IndexingManager that should be used for the OpenSearch indexing lifecycles
11+
private static IndexingManager im;
12+
13+
//Returns a threadsafe IndexingManager singleton
14+
public synchronized static IndexingManager getIndexingManager() {
15+
try {
16+
if (im == null) {
17+
im = new IndexingManager();
18+
}
19+
return im;
20+
} catch (Exception jdo) {
21+
jdo.printStackTrace();
22+
System.out.println("I couldn't instantiate an org.ecocean.IndexingManager.");
23+
return null;
24+
}
25+
}
26+
}

src/main/java/org/ecocean/StartupWildbook.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,7 @@ public void contextDestroyed(ServletContextEvent sce) {
318318
TwitterBot.cleanup();
319319
MetricsBot.cleanup();
320320
AcmIdBot.cleanup();
321+
IndexingManagerFactory.getIndexingManager().shutdown();
321322
}
322323

323324
public static void createMatchGraph() {

src/main/java/org/ecocean/WildbookLifecycleListener.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,15 @@ public void preDelete(InstanceLifecycleEvent event) {
2424
Base base = (Base)obj;
2525
System.out.println("WildbookLifecycleListener preDelete() event on " + base);
2626
try {
27-
base.opensearchUnindexDeep();
28-
} catch (IOException ex) {
27+
28+
//old way = direct indexing
29+
//base.opensearchUnindexDeep();
30+
//new way - put indexing in managed queue
31+
IndexingManager im=IndexingManagerFactory.getIndexingManager();
32+
im.addIndexingQueueEntry(base,true);
33+
34+
35+
} catch (Exception ex) {
2936
ex.printStackTrace();
3037
}
3138
}
@@ -61,8 +68,13 @@ public void postStore(InstanceLifecycleEvent event) {
6168
Base base = (Base)obj;
6269
System.out.println("WildbookLifecycleListener postStore() event on " + base);
6370
try {
64-
base.opensearchIndexDeep();
65-
} catch (IOException ex) {
71+
72+
//base.opensearchIndexDeep();
73+
//new way - put indexing in managed queue
74+
IndexingManager im=IndexingManagerFactory.getIndexingManager();
75+
im.addIndexingQueueEntry(base,false);
76+
77+
} catch (Exception ex) {
6678
ex.printStackTrace();
6779
}
6880
} else if (Collaboration.class.isInstance(obj)) {

src/main/resources/bundles/OpenSearch.properties

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@
1111
# these probably should not be adjusted
1212
#searchScrollTime=10m
1313
#searchPitTime=10m
14+
15+
indexingNumAllowedThreads = 20

0 commit comments

Comments
 (0)